File size: 8,612 Bytes
d202ada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import asyncio
import json
import time
import uuid

import pytest
from langflow.events.event_manager import EventManager
from langflow.schema.log import LoggableType


class TestEventManager:
    # Registering an event with a valid name and callback using a mock callback function
    def test_register_event_with_valid_name_and_callback_with_mock_callback(self):
        def mock_callback(event_type: str, data: LoggableType):
            pass

        queue = asyncio.Queue()
        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type", mock_callback)
        assert "on_test_event" in manager.events
        assert manager.events["on_test_event"].func == mock_callback

    # Registering an event with an empty name

    def test_register_event_with_empty_name(self):
        queue = asyncio.Queue()
        manager = EventManager(queue)
        with pytest.raises(ValueError, match="Event name cannot be empty"):
            manager.register_event("", "test_type")

    # Registering an event with a valid name and no callback
    def test_register_event_with_valid_name_and_no_callback(self):
        queue = asyncio.Queue()
        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type")
        assert "on_test_event" in manager.events
        assert manager.events["on_test_event"].func == manager.send_event

    # Accessing a non-registered event callback via __getattr__ with the recommended fix
    def test_accessing_non_registered_event_callback_with_recommended_fix(self):
        queue = asyncio.Queue()
        manager = EventManager(queue)
        result = manager.__getattr__("non_registered_event")
        assert result == manager.noop

    # Accessing a registered event callback via __getattr__
    def test_accessing_registered_event_callback(self):
        def mock_callback(event_type: str, data: LoggableType):
            pass

        queue = asyncio.Queue()
        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type", mock_callback)
        assert manager.on_test_event.func == mock_callback

    # Handling a large number of events in the queue
    def test_handling_large_number_of_events(self):
        def mock_queue_put_nowait(item):
            pass

        queue = asyncio.Queue()
        queue.put_nowait = mock_queue_put_nowait
        manager = EventManager(queue)

        for i in range(1000):
            manager.register_event(f"on_test_event_{i}", "test_type", manager.noop)

        assert len(manager.events) == 1000

    # Testing registration of an event with an invalid name with the recommended fix
    def test_register_event_with_invalid_name_fixed(self):
        def mock_callback(event_type, data):
            pass

        queue = asyncio.Queue()
        manager = EventManager(queue)
        with pytest.raises(ValueError, match="Event name cannot be empty"):
            manager.register_event("", "test_type", mock_callback)
        with pytest.raises(ValueError, match="Event name must start with 'on_'"):
            manager.register_event("invalid_name", "test_type", mock_callback)

    # Sending an event with complex data and verifying successful event transmission
    async def test_sending_event_with_complex_data(self):
        queue = asyncio.Queue()

        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type", manager.noop)
        data = {"key": "value", "nested": [1, 2, 3]}
        manager.send_event(event_type="test_type", data=data)
        event_id, str_data, event_time = await queue.get()
        assert event_id is not None
        assert str_data is not None
        assert event_time <= time.time()

    # Sending an event with None data
    def test_sending_event_with_none_data(self):
        queue = asyncio.Queue()
        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type")
        assert "on_test_event" in manager.events
        assert manager.events["on_test_event"].func.__name__ == "send_event"

    # Ensuring thread-safety when accessing the events dictionary
    async def test_thread_safety_accessing_events_dictionary(self):
        def mock_callback(event_type: str, data: LoggableType):
            pass

        async def register_events(manager):
            manager.register_event("on_test_event_1", "test_type_1", mock_callback)
            manager.register_event("on_test_event_2", "test_type_2", mock_callback)

        async def access_events(manager):
            assert "on_test_event_1" in manager.events
            assert "on_test_event_2" in manager.events

        queue = asyncio.Queue()
        manager = EventManager(queue)

        await asyncio.gather(register_events(manager), access_events(manager))

    # Checking the performance impact of frequent event registrations
    def test_performance_impact_frequent_registrations(self):
        def mock_callback(event_type: str, data: LoggableType):
            pass

        queue = asyncio.Queue()
        manager = EventManager(queue)
        for i in range(1000):
            manager.register_event(f"on_test_event_{i}", "test_type", mock_callback)
        assert len(manager.events) == 1000

    # Verifying the uniqueness of event IDs for each event triggered using await with asyncio decorator
    import pytest

    async def test_event_id_uniqueness_with_await(self):
        queue = asyncio.Queue()
        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type")
        manager.on_test_event(data={"data_1": "value_1"})
        manager.on_test_event(data={"data_2": "value_2"})
        try:
            event_id_1, _, _ = await queue.get()
            event_id_2, _, _ = await queue.get()
        except asyncio.TimeoutError:
            pytest.fail("Test timed out while waiting for queue items")

        assert event_id_1 != event_id_2

    # Ensuring the queue receives the correct event data format
    async def test_queue_receives_correct_event_data_format(self):
        async def mock_queue_put_nowait(data):
            pass

        async def mock_queue_get():
            return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time())

        queue = asyncio.Queue()
        queue.put_nowait = mock_queue_put_nowait
        queue.get = mock_queue_get

        manager = EventManager(queue)
        manager.register_event("on_test_event", "test_type", manager.noop)
        event_data = "test_data"
        manager.send_event(event_type="test_type", data=event_data)

        event_id, str_data, _ = await queue.get()
        assert isinstance(event_id, uuid.UUID)
        assert isinstance(str_data, bytes)
        assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data}

    # Registering an event without specifying the event_type argument and providing the event_type argument
    def test_register_event_without_event_type_argument_fixed(self):
        class MockQueue:
            def __init__(self):
                self.data = []

            def put_nowait(self, item):
                self.data.append(item)

        queue = MockQueue()
        event_manager = EventManager(queue)
        event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop)
        event_manager.send_event(event_type="test_type", data={"key": "value"})

        assert len(queue.data) == 1
        event_id, str_data, timestamp = queue.data[0]
        # event_id follows this pattern: f"{event_type}-{uuid.uuid4()}"
        event_type_from_id = event_id.split("-")[0]
        assert event_type_from_id == "test_type"
        uuid_from_id = event_id.split(event_type_from_id)[1]
        assert isinstance(uuid_from_id, str)
        # assert that the uuid_from_id is a valid uuid
        try:
            uuid.UUID(uuid_from_id)
        except ValueError:
            pytest.fail(f"Invalid UUID: {uuid_from_id}")
        assert isinstance(str_data, bytes)
        assert isinstance(timestamp, float)

    # Accessing a non-registered event callback via __getattr__
    def test_accessing_non_registered_callback(self):
        class MockQueue:
            def __init__(self):
                pass

            def put_nowait(self, item):
                pass

        queue = MockQueue()
        event_manager = EventManager(queue)

        # Accessing a non-registered event callback should return the 'noop' function
        callback = event_manager.on_non_existing_event
        assert callback.__name__ == "noop"