|
import queue |
|
import asyncio |
|
|
|
|
|
class IteratorPipe: |
|
""" |
|
Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution |
|
""" |
|
|
|
def __init__(self, sentinel=object()): |
|
self._q = queue.Queue() |
|
self._sentinel = sentinel |
|
self._sentinel_pushed = False |
|
self._closed = False |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
if self._closed: |
|
raise StopIteration |
|
|
|
data = self._q.get(block=True) |
|
if data is self._sentinel: |
|
self._closed = True |
|
raise StopIteration |
|
|
|
return data |
|
|
|
def put(self, data) -> bool: |
|
""" |
|
Pushes next item to Iterator and returns True |
|
If iterator has been closed via close(), doesn't push anything and returns False |
|
""" |
|
if self._sentinel_pushed: |
|
return False |
|
|
|
self._q.put(data) |
|
return True |
|
|
|
def close(self): |
|
""" |
|
Close is idempotent. Calling close multiple times is safe |
|
Iterator will raise StopIteration only after all elements pushed before close have been iterated |
|
""" |
|
|
|
if not self._sentinel_pushed: |
|
self._sentinel_pushed = True |
|
self._q.put(self._sentinel) |
|
|
|
|
|
class AsyncIteratorPipe: |
|
|
|
def __init__(self, sentinel=object()): |
|
self._q = asyncio.Queue() |
|
self._sentinel = sentinel |
|
self._sentinel_pushed = False |
|
self._closed = False |
|
|
|
def __aiter__(self): |
|
return self |
|
|
|
async def __anext__(self): |
|
if self._closed: |
|
raise StopAsyncIteration |
|
|
|
data = await self._q.get() |
|
if data is self._sentinel: |
|
self._closed = True |
|
raise StopAsyncIteration |
|
|
|
return data |
|
|
|
async def put(self, data) -> bool: |
|
""" |
|
Pushes next item to Iterator and returns True |
|
If iterator has been closed via close(), doesn't push anything and returns False |
|
""" |
|
if self._sentinel_pushed: |
|
return False |
|
|
|
await self._q.put(data) |
|
return True |
|
|
|
async def close(self): |
|
""" |
|
Close is idempotent. Calling close multiple times is safe |
|
Iterator will raise StopIteration only after all elements pushed before close have been iterated |
|
""" |
|
|
|
if not self._sentinel_pushed: |
|
self._sentinel_pushed = True |
|
await self._q.put(self._sentinel) |
|
|