agent-flow / src /backend /tests /unit /events /test_event_manager.py
Tai Truong
fix readme
d202ada
raw
history blame
8.61 kB
import asyncio
import json
import time
import uuid
import pytest
from langflow.events.event_manager import EventManager
from langflow.schema.log import LoggableType
class TestEventManager:
# Registering an event with a valid name and callback using a mock callback function
def test_register_event_with_valid_name_and_callback_with_mock_callback(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", mock_callback)
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func == mock_callback
# Registering an event with an empty name
def test_register_event_with_empty_name(self):
queue = asyncio.Queue()
manager = EventManager(queue)
with pytest.raises(ValueError, match="Event name cannot be empty"):
manager.register_event("", "test_type")
# Registering an event with a valid name and no callback
def test_register_event_with_valid_name_and_no_callback(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func == manager.send_event
# Accessing a non-registered event callback via __getattr__ with the recommended fix
def test_accessing_non_registered_event_callback_with_recommended_fix(self):
queue = asyncio.Queue()
manager = EventManager(queue)
result = manager.__getattr__("non_registered_event")
assert result == manager.noop
# Accessing a registered event callback via __getattr__
def test_accessing_registered_event_callback(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", mock_callback)
assert manager.on_test_event.func == mock_callback
# Handling a large number of events in the queue
def test_handling_large_number_of_events(self):
def mock_queue_put_nowait(item):
pass
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
manager = EventManager(queue)
for i in range(1000):
manager.register_event(f"on_test_event_{i}", "test_type", manager.noop)
assert len(manager.events) == 1000
# Testing registration of an event with an invalid name with the recommended fix
def test_register_event_with_invalid_name_fixed(self):
def mock_callback(event_type, data):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
with pytest.raises(ValueError, match="Event name cannot be empty"):
manager.register_event("", "test_type", mock_callback)
with pytest.raises(ValueError, match="Event name must start with 'on_'"):
manager.register_event("invalid_name", "test_type", mock_callback)
# Sending an event with complex data and verifying successful event transmission
async def test_sending_event_with_complex_data(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
data = {"key": "value", "nested": [1, 2, 3]}
manager.send_event(event_type="test_type", data=data)
event_id, str_data, event_time = await queue.get()
assert event_id is not None
assert str_data is not None
assert event_time <= time.time()
# Sending an event with None data
def test_sending_event_with_none_data(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func.__name__ == "send_event"
# Ensuring thread-safety when accessing the events dictionary
async def test_thread_safety_accessing_events_dictionary(self):
def mock_callback(event_type: str, data: LoggableType):
pass
async def register_events(manager):
manager.register_event("on_test_event_1", "test_type_1", mock_callback)
manager.register_event("on_test_event_2", "test_type_2", mock_callback)
async def access_events(manager):
assert "on_test_event_1" in manager.events
assert "on_test_event_2" in manager.events
queue = asyncio.Queue()
manager = EventManager(queue)
await asyncio.gather(register_events(manager), access_events(manager))
# Checking the performance impact of frequent event registrations
def test_performance_impact_frequent_registrations(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
for i in range(1000):
manager.register_event(f"on_test_event_{i}", "test_type", mock_callback)
assert len(manager.events) == 1000
# Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator
import pytest
async def test_event_id_uniqueness_with_await(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
manager.on_test_event(data={"data_1": "value_1"})
manager.on_test_event(data={"data_2": "value_2"})
try:
event_id_1, _, _ = await queue.get()
event_id_2, _, _ = await queue.get()
except asyncio.TimeoutError:
pytest.fail("Test timed out while waiting for queue items")
assert event_id_1 != event_id_2
# Ensuring the queue receives the correct event data format
async def test_queue_receives_correct_event_data_format(self):
async def mock_queue_put_nowait(data):
pass
async def mock_queue_get():
return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time())
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
queue.get = mock_queue_get
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
event_data = "test_data"
manager.send_event(event_type="test_type", data=event_data)
event_id, str_data, _ = await queue.get()
assert isinstance(event_id, uuid.UUID)
assert isinstance(str_data, bytes)
assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data}
# Registering an event without specifying the event_type argument and providing the event_type argument
def test_register_event_without_event_type_argument_fixed(self):
class MockQueue:
def __init__(self):
self.data = []
def put_nowait(self, item):
self.data.append(item)
queue = MockQueue()
event_manager = EventManager(queue)
event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop)
event_manager.send_event(event_type="test_type", data={"key": "value"})
assert len(queue.data) == 1
event_id, str_data, timestamp = queue.data[0]
# event_id follows this pattern: f"{event_type}-{uuid.uuid4()}"
event_type_from_id = event_id.split("-")[0]
assert event_type_from_id == "test_type"
uuid_from_id = event_id.split(event_type_from_id)[1]
assert isinstance(uuid_from_id, str)
# assert that the uuid_from_id is a valid uuid
try:
uuid.UUID(uuid_from_id)
except ValueError:
pytest.fail(f"Invalid UUID: {uuid_from_id}")
assert isinstance(str_data, bytes)
assert isinstance(timestamp, float)
# Accessing a non-registered event callback via __getattr__
def test_accessing_non_registered_callback(self):
class MockQueue:
def __init__(self):
pass
def put_nowait(self, item):
pass
queue = MockQueue()
event_manager = EventManager(queue)
# Accessing a non-registered event callback should return the 'noop' function
callback = event_manager.on_non_existing_event
assert callback.__name__ == "noop"