pseudotensor commited on
Commit
1c0f538
1 Parent(s): 32cc9b0

Update with h2oGPT hash 236c95819e80ab122193bfb843b55618ae285c39

Browse files
iterators/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ from .timeout_iterator import TimeoutIterator, AsyncTimeoutIterator
2
+ from .iterator_pipe import IteratorPipe, AsyncIteratorPipe
3
+
4
+ __all__ = ["TimeoutIterator", "AsyncTimeoutIterator", "IteratorPipe", "AsyncIteratorPipe"]
iterators/__pycache__/__init__.cpython-310.pyc ADDED
Binary file (337 Bytes). View file
 
iterators/__pycache__/iterator_pipe.cpython-310.pyc ADDED
Binary file (2.71 kB). View file
 
iterators/__pycache__/timeout_iterator.cpython-310.pyc ADDED
Binary file (5.63 kB). View file
 
iterators/iterator_pipe.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import queue
2
+ import asyncio
3
+
4
+
5
+ class IteratorPipe:
6
+ """
7
+ Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution
8
+ """
9
+
10
+ def __init__(self, sentinel=object()):
11
+ self._q = queue.Queue()
12
+ self._sentinel = sentinel
13
+ self._sentinel_pushed = False
14
+ self._closed = False
15
+
16
+ def __iter__(self):
17
+ return self
18
+
19
+ def __next__(self):
20
+ if self._closed:
21
+ raise StopIteration
22
+
23
+ data = self._q.get(block=True)
24
+ if data is self._sentinel:
25
+ self._closed = True
26
+ raise StopIteration
27
+
28
+ return data
29
+
30
+ def put(self, data) -> bool:
31
+ """
32
+ Pushes next item to Iterator and returns True
33
+ If iterator has been closed via close(), doesn't push anything and returns False
34
+ """
35
+ if self._sentinel_pushed:
36
+ return False
37
+
38
+ self._q.put(data)
39
+ return True
40
+
41
+ def close(self):
42
+ """
43
+ Close is idempotent. Calling close multiple times is safe
44
+ Iterator will raise StopIteration only after all elements pushed before close have been iterated
45
+ """
46
+ # make close idempotent
47
+ if not self._sentinel_pushed:
48
+ self._sentinel_pushed = True
49
+ self._q.put(self._sentinel)
50
+
51
+
52
+ class AsyncIteratorPipe:
53
+
54
+ def __init__(self, sentinel=object()):
55
+ self._q = asyncio.Queue()
56
+ self._sentinel = sentinel
57
+ self._sentinel_pushed = False
58
+ self._closed = False
59
+
60
+ def __aiter__(self):
61
+ return self
62
+
63
+ async def __anext__(self):
64
+ if self._closed:
65
+ raise StopAsyncIteration
66
+
67
+ data = await self._q.get()
68
+ if data is self._sentinel:
69
+ self._closed = True
70
+ raise StopAsyncIteration
71
+
72
+ return data
73
+
74
+ async def put(self, data) -> bool:
75
+ """
76
+ Pushes next item to Iterator and returns True
77
+ If iterator has been closed via close(), doesn't push anything and returns False
78
+ """
79
+ if self._sentinel_pushed:
80
+ return False
81
+
82
+ await self._q.put(data)
83
+ return True
84
+
85
+ async def close(self):
86
+ """
87
+ Close is idempotent. Calling close multiple times is safe
88
+ Iterator will raise StopIteration only after all elements pushed before close have been iterated
89
+ """
90
+ # make close idempotent
91
+ if not self._sentinel_pushed:
92
+ self._sentinel_pushed = True
93
+ await self._q.put(self._sentinel)
iterators/timeout_iterator.py ADDED
@@ -0,0 +1,170 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import queue
2
+ import asyncio
3
+ import threading
4
+ import traceback
5
+
6
+
7
+ class TimeoutIterator:
8
+ """
9
+ Wrapper class to add timeout feature to synchronous iterators
10
+ - timeout: timeout for next(). Default=ZERO_TIMEOUT i.e. no timeout or blocking calls to next. Updated using set_timeout()
11
+ - sentinel: the object returned by iterator when timeout happens
12
+ - reset_on_next: if set to True, timeout is reset to the value of ZERO_TIMEOUT on each iteration
13
+
14
+ TimeoutIterator uses a thread internally.
15
+ The thread stops once the iterator exhausts or raises an exception during iteration.
16
+
17
+ Any exceptions raised within the wrapped iterator are propagated as it is.
18
+ Exception is raised when all elements generated by the actual iterator before exception have been consumed
19
+ Timeout can be set dynamically before going for iteration
20
+ """
21
+ ZERO_TIMEOUT = 0.0
22
+
23
+ def __init__(self, iterator, timeout=0.0, sentinel=object(), reset_on_next=False, raise_on_exception=True):
24
+ self._iterator = iterator
25
+ self._timeout = timeout
26
+ self._sentinel = sentinel
27
+ self._reset_on_next = reset_on_next
28
+ self._raise_on_exception = raise_on_exception
29
+
30
+ self._interrupt = False
31
+ self._done = False
32
+ self._buffer = queue.Queue()
33
+ self._thread = threading.Thread(target=self.__lookahead)
34
+ self._thread.start()
35
+
36
+ def get_sentinel(self):
37
+ return self._sentinel
38
+
39
+ def set_reset_on_next(self, reset_on_next):
40
+ self._reset_on_next = reset_on_next
41
+
42
+ def set_timeout(self, timeout: float):
43
+ """
44
+ Set timeout for next iteration
45
+ """
46
+ self._timeout = timeout
47
+
48
+ def interrupt(self):
49
+ """
50
+ interrupt and stop the underlying thread.
51
+ the thread acutally dies only after interrupt has been set and
52
+ the underlying iterator yields a value after that.
53
+ """
54
+ self._interrupt = True
55
+
56
+ def __iter__(self):
57
+ return self
58
+
59
+ def __next__(self):
60
+ """
61
+ yield the result from iterator
62
+ if timeout > 0:
63
+ yield data if available.
64
+ otherwise yield sentinal
65
+ """
66
+ if self._done:
67
+ raise StopIteration
68
+
69
+ data = self._sentinel
70
+ try:
71
+ if self._timeout > self.ZERO_TIMEOUT:
72
+ data = self._buffer.get(timeout=self._timeout)
73
+ else:
74
+ data = self._buffer.get()
75
+ except queue.Empty:
76
+ pass
77
+ finally:
78
+ # see if timeout needs to be reset
79
+ if self._reset_on_next:
80
+ self._timeout = self.ZERO_TIMEOUT
81
+
82
+ # propagate any exceptions including StopIteration
83
+ if isinstance(data, BaseException):
84
+ self._done = True
85
+ if isinstance(data, StopIteration):
86
+ raise data
87
+ ex = ''.join(traceback.format_tb(data.__traceback__))
88
+ print("Generation Failed: %s %s" % (str(data), str(ex)), flush=True)
89
+ if self._raise_on_exception:
90
+ raise data
91
+ else:
92
+ return data
93
+
94
+ return data
95
+
96
+ def __lookahead(self):
97
+ try:
98
+ while True:
99
+ self._buffer.put(next(self._iterator))
100
+ if self._interrupt:
101
+ raise StopIteration()
102
+ except BaseException as e:
103
+ self._buffer.put(e)
104
+
105
+
106
+ class AsyncTimeoutIterator:
107
+ """
108
+ Async version of TimeoutIterator. See method documentation of TimeoutIterator
109
+ """
110
+ ZERO_TIMEOUT = 0.0
111
+
112
+ def __init__(self, iterator, timeout=0.0, sentinel=object(), reset_on_next=False):
113
+ self._iterator = iterator
114
+ self._timeout = timeout
115
+ self._sentinel = sentinel
116
+ self._reset_on_next = reset_on_next
117
+
118
+ self._interrupt = False
119
+ self._done = False
120
+ self._buffer = asyncio.Queue()
121
+ self._task = asyncio.get_event_loop().create_task(self.__lookahead())
122
+
123
+ def get_sentinel(self):
124
+ return self._sentinel
125
+
126
+ def set_reset_on_next(self, reset_on_next):
127
+ self._reset_on_next = reset_on_next
128
+
129
+ def set_timeout(self, timeout: float):
130
+ self._timeout = timeout
131
+
132
+ def interrupt(self):
133
+ self._interrupt = True
134
+
135
+ def __aiter__(self):
136
+ return self
137
+
138
+ async def __anext__(self):
139
+ if self._done:
140
+ raise StopAsyncIteration
141
+
142
+ data = self._sentinel
143
+ try:
144
+ if self._timeout > self.ZERO_TIMEOUT:
145
+ data = await asyncio.wait_for(self._buffer.get(), self._timeout)
146
+ else:
147
+ data = await self._buffer.get()
148
+ except asyncio.TimeoutError:
149
+ pass
150
+ finally:
151
+ # see if timeout needs to be reset
152
+ if self._reset_on_next:
153
+ self._timeout = self.ZERO_TIMEOUT
154
+
155
+ # propagate any exceptions including StopIteration
156
+ if isinstance(data, BaseException):
157
+ self._done = True
158
+ raise data
159
+
160
+ return data
161
+
162
+ async def __lookahead(self):
163
+ try:
164
+ while True:
165
+ data = await self._iterator.__anext__()
166
+ await self._buffer.put(data)
167
+ if self._interrupt:
168
+ raise StopAsyncIteration()
169
+ except BaseException as e:
170
+ await self._buffer.put(e)