Spaces:
Running
Running
import asyncio | |
import json | |
import time | |
import uuid | |
import pytest | |
from langflow.events.event_manager import EventManager | |
from langflow.schema.log import LoggableType | |
class TestEventManager: | |
# Registering an event with a valid name and callback using a mock callback function | |
def test_register_event_with_valid_name_and_callback_with_mock_callback(self): | |
def mock_callback(event_type: str, data: LoggableType): | |
pass | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type", mock_callback) | |
assert "on_test_event" in manager.events | |
assert manager.events["on_test_event"].func == mock_callback | |
# Registering an event with an empty name | |
def test_register_event_with_empty_name(self): | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
with pytest.raises(ValueError, match="Event name cannot be empty"): | |
manager.register_event("", "test_type") | |
# Registering an event with a valid name and no callback | |
def test_register_event_with_valid_name_and_no_callback(self): | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type") | |
assert "on_test_event" in manager.events | |
assert manager.events["on_test_event"].func == manager.send_event | |
# Accessing a non-registered event callback via __getattr__ with the recommended fix | |
def test_accessing_non_registered_event_callback_with_recommended_fix(self): | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
result = manager.__getattr__("non_registered_event") | |
assert result == manager.noop | |
# Accessing a registered event callback via __getattr__ | |
def test_accessing_registered_event_callback(self): | |
def mock_callback(event_type: str, data: LoggableType): | |
pass | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type", mock_callback) | |
assert manager.on_test_event.func == mock_callback | |
# Handling a large number of events in the queue | |
def test_handling_large_number_of_events(self): | |
def mock_queue_put_nowait(item): | |
pass | |
queue = asyncio.Queue() | |
queue.put_nowait = mock_queue_put_nowait | |
manager = EventManager(queue) | |
for i in range(1000): | |
manager.register_event(f"on_test_event_{i}", "test_type", manager.noop) | |
assert len(manager.events) == 1000 | |
# Testing registration of an event with an invalid name with the recommended fix | |
def test_register_event_with_invalid_name_fixed(self): | |
def mock_callback(event_type, data): | |
pass | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
with pytest.raises(ValueError, match="Event name cannot be empty"): | |
manager.register_event("", "test_type", mock_callback) | |
with pytest.raises(ValueError, match="Event name must start with 'on_'"): | |
manager.register_event("invalid_name", "test_type", mock_callback) | |
# Sending an event with complex data and verifying successful event transmission | |
async def test_sending_event_with_complex_data(self): | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type", manager.noop) | |
data = {"key": "value", "nested": [1, 2, 3]} | |
manager.send_event(event_type="test_type", data=data) | |
event_id, str_data, event_time = await queue.get() | |
assert event_id is not None | |
assert str_data is not None | |
assert event_time <= time.time() | |
# Sending an event with None data | |
def test_sending_event_with_none_data(self): | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type") | |
assert "on_test_event" in manager.events | |
assert manager.events["on_test_event"].func.__name__ == "send_event" | |
# Ensuring thread-safety when accessing the events dictionary | |
async def test_thread_safety_accessing_events_dictionary(self): | |
def mock_callback(event_type: str, data: LoggableType): | |
pass | |
async def register_events(manager): | |
manager.register_event("on_test_event_1", "test_type_1", mock_callback) | |
manager.register_event("on_test_event_2", "test_type_2", mock_callback) | |
async def access_events(manager): | |
assert "on_test_event_1" in manager.events | |
assert "on_test_event_2" in manager.events | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
await asyncio.gather(register_events(manager), access_events(manager)) | |
# Checking the performance impact of frequent event registrations | |
def test_performance_impact_frequent_registrations(self): | |
def mock_callback(event_type: str, data: LoggableType): | |
pass | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
for i in range(1000): | |
manager.register_event(f"on_test_event_{i}", "test_type", mock_callback) | |
assert len(manager.events) == 1000 | |
# Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator | |
import pytest | |
async def test_event_id_uniqueness_with_await(self): | |
queue = asyncio.Queue() | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type") | |
manager.on_test_event(data={"data_1": "value_1"}) | |
manager.on_test_event(data={"data_2": "value_2"}) | |
try: | |
event_id_1, _, _ = await queue.get() | |
event_id_2, _, _ = await queue.get() | |
except asyncio.TimeoutError: | |
pytest.fail("Test timed out while waiting for queue items") | |
assert event_id_1 != event_id_2 | |
# Ensuring the queue receives the correct event data format | |
async def test_queue_receives_correct_event_data_format(self): | |
async def mock_queue_put_nowait(data): | |
pass | |
async def mock_queue_get(): | |
return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time()) | |
queue = asyncio.Queue() | |
queue.put_nowait = mock_queue_put_nowait | |
queue.get = mock_queue_get | |
manager = EventManager(queue) | |
manager.register_event("on_test_event", "test_type", manager.noop) | |
event_data = "test_data" | |
manager.send_event(event_type="test_type", data=event_data) | |
event_id, str_data, _ = await queue.get() | |
assert isinstance(event_id, uuid.UUID) | |
assert isinstance(str_data, bytes) | |
assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data} | |
# Registering an event without specifying the event_type argument and providing the event_type argument | |
def test_register_event_without_event_type_argument_fixed(self): | |
class MockQueue: | |
def __init__(self): | |
self.data = [] | |
def put_nowait(self, item): | |
self.data.append(item) | |
queue = MockQueue() | |
event_manager = EventManager(queue) | |
event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop) | |
event_manager.send_event(event_type="test_type", data={"key": "value"}) | |
assert len(queue.data) == 1 | |
event_id, str_data, timestamp = queue.data[0] | |
# event_id follows this pattern: f"{event_type}-{uuid.uuid4()}" | |
event_type_from_id = event_id.split("-")[0] | |
assert event_type_from_id == "test_type" | |
uuid_from_id = event_id.split(event_type_from_id)[1] | |
assert isinstance(uuid_from_id, str) | |
# assert that the uuid_from_id is a valid uuid | |
try: | |
uuid.UUID(uuid_from_id) | |
except ValueError: | |
pytest.fail(f"Invalid UUID: {uuid_from_id}") | |
assert isinstance(str_data, bytes) | |
assert isinstance(timestamp, float) | |
# Accessing a non-registered event callback via __getattr__ | |
def test_accessing_non_registered_callback(self): | |
class MockQueue: | |
def __init__(self): | |
pass | |
def put_nowait(self, item): | |
pass | |
queue = MockQueue() | |
event_manager = EventManager(queue) | |
# Accessing a non-registered event callback should return the 'noop' function | |
callback = event_manager.on_non_existing_event | |
assert callback.__name__ == "noop" | |