diff --git a/docs/async-interface.md b/docs/async-interface.md index 81aafd2..482443a 100644 --- a/docs/async-interface.md +++ b/docs/async-interface.md @@ -11,36 +11,29 @@ stream-zip does not include an async interface. However, it is possible to const import asyncio from stream_zip import stream_zip -async def async_stream_zip(async_member_files, *args, **kwargs): +async def async_stream_zip(member_files, *args, **kwargs): - def sync_iterable(async_iterable): - async_it = aiter(async_iterable) - while True: - try: - yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result() - except StopAsyncIteration: - break - - def sync_member_files(): - for member_file in sync_iterable(async_member_files): - yield member_file[0:4] + (sync_iterable(member_file[4],),) + async def to_async_iterable(sync_iterable): + # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end + done = object() + it = iter(sync_iterable) + while (value := await asyncio.to_thread(next, it, done)) is not done: + yield value - # to_thread raises an error if StopIteration raised in it - def to_thread_safe_next(): - try: - return next(zipped_chunks_it) - except StopIteration: - return done + def to_sync_iterable(async_iterable): + done = object() + async_it = aiter(async_iterable) + while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done: + yield value loop = asyncio.get_event_loop() - zipped_chunks_it = iter(stream_zip(sync_member_files(), *args, **kwargs)) - done = object() + sync_member_files = ( + member_file[0:4] + (to_sync_iterable(member_file[4],),) + for member_file in to_sync_iterable(member_files) + ) - while True: - value = await asyncio.to_thread(to_thread_safe_next) - if value is done: - break - yield value + async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): + yield chunk ``` The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable.