Skip to content

Commit

Permalink
docs: simplify async interface
Browse files Browse the repository at this point in the history
This simplifies the code a bit in the suggested async interface. This is
inspired by the discussion at uktrade/stream-unzip#80,
and makes this code more consistent with the code there.

Whether this will continue to be an example in the docs or integrated into the
codebase in some way, I'm not sure right now.
  • Loading branch information
michalc committed Feb 24, 2024
1 parent aea2194 commit 89257b0
Showing 1 changed file with 18 additions and 25 deletions.
43 changes: 18 additions & 25 deletions docs/async-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,29 @@ stream-zip does not include an async interface. However, it is possible to const
import asyncio
from stream_zip import stream_zip

async def async_stream_zip(async_member_files, *args, **kwargs):
async def async_stream_zip(member_files, *args, **kwargs):

def sync_iterable(async_iterable):
async_it = aiter(async_iterable)
while True:
try:
yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
except StopAsyncIteration:
break

def sync_member_files():
for member_file in sync_iterable(async_member_files):
yield member_file[0:4] + (sync_iterable(member_file[4],),)
async def to_async_iterable(sync_iterable):
# to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
done = object()
it = iter(sync_iterable)
while (value := await asyncio.to_thread(next, it, done)) is not done:
yield value

# to_thread raises an error if StopIteration raised in it
def to_thread_safe_next():
try:
return next(zipped_chunks_it)
except StopIteration:
return done
def to_sync_iterable(async_iterable):
done = object()
async_it = aiter(async_iterable)
while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done:
yield value

loop = asyncio.get_event_loop()
zipped_chunks_it = iter(stream_zip(sync_member_files(), *args, **kwargs))
done = object()
sync_member_files = (
member_file[0:4] + (to_sync_iterable(member_file[4],),)
for member_file in to_sync_iterable(member_files)
)

while True:
value = await asyncio.to_thread(to_thread_safe_next)
if value is done:
break
yield value
async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)):
yield chunk
```

The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable.
Expand Down

0 comments on commit 89257b0

Please sign in to comment.