Spaces:
Running
Running
File size: 8,612 Bytes
d202ada |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import asyncio
import json
import time
import uuid
import pytest
from langflow.events.event_manager import EventManager
from langflow.schema.log import LoggableType
class TestEventManager:
# Registering an event with a valid name and callback using a mock callback function
def test_register_event_with_valid_name_and_callback_with_mock_callback(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", mock_callback)
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func == mock_callback
# Registering an event with an empty name
def test_register_event_with_empty_name(self):
queue = asyncio.Queue()
manager = EventManager(queue)
with pytest.raises(ValueError, match="Event name cannot be empty"):
manager.register_event("", "test_type")
# Registering an event with a valid name and no callback
def test_register_event_with_valid_name_and_no_callback(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func == manager.send_event
# Accessing a non-registered event callback via __getattr__ with the recommended fix
def test_accessing_non_registered_event_callback_with_recommended_fix(self):
queue = asyncio.Queue()
manager = EventManager(queue)
result = manager.__getattr__("non_registered_event")
assert result == manager.noop
# Accessing a registered event callback via __getattr__
def test_accessing_registered_event_callback(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", mock_callback)
assert manager.on_test_event.func == mock_callback
# Handling a large number of events in the queue
def test_handling_large_number_of_events(self):
def mock_queue_put_nowait(item):
pass
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
manager = EventManager(queue)
for i in range(1000):
manager.register_event(f"on_test_event_{i}", "test_type", manager.noop)
assert len(manager.events) == 1000
# Testing registration of an event with an invalid name with the recommended fix
def test_register_event_with_invalid_name_fixed(self):
def mock_callback(event_type, data):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
with pytest.raises(ValueError, match="Event name cannot be empty"):
manager.register_event("", "test_type", mock_callback)
with pytest.raises(ValueError, match="Event name must start with 'on_'"):
manager.register_event("invalid_name", "test_type", mock_callback)
# Sending an event with complex data and verifying successful event transmission
async def test_sending_event_with_complex_data(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
data = {"key": "value", "nested": [1, 2, 3]}
manager.send_event(event_type="test_type", data=data)
event_id, str_data, event_time = await queue.get()
assert event_id is not None
assert str_data is not None
assert event_time <= time.time()
# Sending an event with None data
def test_sending_event_with_none_data(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
assert "on_test_event" in manager.events
assert manager.events["on_test_event"].func.__name__ == "send_event"
# Ensuring thread-safety when accessing the events dictionary
async def test_thread_safety_accessing_events_dictionary(self):
def mock_callback(event_type: str, data: LoggableType):
pass
async def register_events(manager):
manager.register_event("on_test_event_1", "test_type_1", mock_callback)
manager.register_event("on_test_event_2", "test_type_2", mock_callback)
async def access_events(manager):
assert "on_test_event_1" in manager.events
assert "on_test_event_2" in manager.events
queue = asyncio.Queue()
manager = EventManager(queue)
await asyncio.gather(register_events(manager), access_events(manager))
# Checking the performance impact of frequent event registrations
def test_performance_impact_frequent_registrations(self):
def mock_callback(event_type: str, data: LoggableType):
pass
queue = asyncio.Queue()
manager = EventManager(queue)
for i in range(1000):
manager.register_event(f"on_test_event_{i}", "test_type", mock_callback)
assert len(manager.events) == 1000
# Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator
import pytest
async def test_event_id_uniqueness_with_await(self):
queue = asyncio.Queue()
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type")
manager.on_test_event(data={"data_1": "value_1"})
manager.on_test_event(data={"data_2": "value_2"})
try:
event_id_1, _, _ = await queue.get()
event_id_2, _, _ = await queue.get()
except asyncio.TimeoutError:
pytest.fail("Test timed out while waiting for queue items")
assert event_id_1 != event_id_2
# Ensuring the queue receives the correct event data format
async def test_queue_receives_correct_event_data_format(self):
async def mock_queue_put_nowait(data):
pass
async def mock_queue_get():
return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time())
queue = asyncio.Queue()
queue.put_nowait = mock_queue_put_nowait
queue.get = mock_queue_get
manager = EventManager(queue)
manager.register_event("on_test_event", "test_type", manager.noop)
event_data = "test_data"
manager.send_event(event_type="test_type", data=event_data)
event_id, str_data, _ = await queue.get()
assert isinstance(event_id, uuid.UUID)
assert isinstance(str_data, bytes)
assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data}
# Registering an event without specifying the event_type argument and providing the event_type argument
def test_register_event_without_event_type_argument_fixed(self):
class MockQueue:
def __init__(self):
self.data = []
def put_nowait(self, item):
self.data.append(item)
queue = MockQueue()
event_manager = EventManager(queue)
event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop)
event_manager.send_event(event_type="test_type", data={"key": "value"})
assert len(queue.data) == 1
event_id, str_data, timestamp = queue.data[0]
# event_id follows this pattern: f"{event_type}-{uuid.uuid4()}"
event_type_from_id = event_id.split("-")[0]
assert event_type_from_id == "test_type"
uuid_from_id = event_id.split(event_type_from_id)[1]
assert isinstance(uuid_from_id, str)
# assert that the uuid_from_id is a valid uuid
try:
uuid.UUID(uuid_from_id)
except ValueError:
pytest.fail(f"Invalid UUID: {uuid_from_id}")
assert isinstance(str_data, bytes)
assert isinstance(timestamp, float)
# Accessing a non-registered event callback via __getattr__
def test_accessing_non_registered_callback(self):
class MockQueue:
def __init__(self):
pass
def put_nowait(self, item):
pass
queue = MockQueue()
event_manager = EventManager(queue)
# Accessing a non-registered event callback should return the 'noop' function
callback = event_manager.on_non_existing_event
assert callback.__name__ == "noop"
|