File size: 2,519 Bytes
b585c7f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
import queue
import asyncio
class IteratorPipe:
"""
Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution
"""
def __init__(self, sentinel=object()):
self._q = queue.Queue()
self._sentinel = sentinel
self._sentinel_pushed = False
self._closed = False
def __iter__(self):
return self
def __next__(self):
if self._closed:
raise StopIteration
data = self._q.get(block=True)
if data is self._sentinel:
self._closed = True
raise StopIteration
return data
def put(self, data) -> bool:
"""
Pushes next item to Iterator and returns True
If iterator has been closed via close(), doesn't push anything and returns False
"""
if self._sentinel_pushed:
return False
self._q.put(data)
return True
def close(self):
"""
Close is idempotent. Calling close multiple times is safe
Iterator will raise StopIteration only after all elements pushed before close have been iterated
"""
# make close idempotent
if not self._sentinel_pushed:
self._sentinel_pushed = True
self._q.put(self._sentinel)
class AsyncIteratorPipe:
def __init__(self, sentinel=object()):
self._q = asyncio.Queue()
self._sentinel = sentinel
self._sentinel_pushed = False
self._closed = False
def __aiter__(self):
return self
async def __anext__(self):
if self._closed:
raise StopAsyncIteration
data = await self._q.get()
if data is self._sentinel:
self._closed = True
raise StopAsyncIteration
return data
async def put(self, data) -> bool:
"""
Pushes next item to Iterator and returns True
If iterator has been closed via close(), doesn't push anything and returns False
"""
if self._sentinel_pushed:
return False
await self._q.put(data)
return True
async def close(self):
"""
Close is idempotent. Calling close multiple times is safe
Iterator will raise StopIteration only after all elements pushed before close have been iterated
"""
# make close idempotent
if not self._sentinel_pushed:
self._sentinel_pushed = True
await self._q.put(self._sentinel)
|