diff --git a/README.md b/README.md index c491291..98762ab 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ In addition to being memory efficient (with some [limitations](https://stream-zi - By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows +- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop) + --- diff --git a/docs/async-interface.md b/docs/async-interface.md index 7d1da00..5030337 100644 --- a/docs/async-interface.md +++ b/docs/async-interface.md @@ -5,69 +5,49 @@ title: Async interface --- -stream-zip does not include an async interface. However, it is possible to construct an async function that wraps stream-zip to allow the construction of zip files in a streaming way from async code without blocking the event loop. +An async interface is provided via the function `async_stream_zip`. Its usage is exactly the same as `stream_zip` except that: -```python -import asyncio -from stream_zip import stream_zip - -async def async_stream_zip(member_files, *args, **kwargs): - - async def to_async_iterable(sync_iterable): - # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end - done = object() - it = iter(sync_iterable) - while (value := await asyncio.to_thread(next, it, done)) is not done: - yield value - - def to_sync_iterable(async_iterable): - done = object() - async_it = aiter(async_iterable) - while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done: - yield value - - loop = asyncio.get_running_loop() - sync_member_files = ( - member_file[0:4] + (to_sync_iterable(member_file[4],),) - for member_file in to_sync_iterable(member_files) - ) - - async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): - yield chunk -``` - -The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable. +1. The member files must be provided as an async iterable of tuples. +2. The data of each member file must be provided as an async iterable of bytes. +3. Its return value is an async iterable of bytes. ```python from datetime import datetime from stat import S_IFREG -from stream_zip import ZIP_32 +from stream_zip import async_stream_zip, ZIP_32 # Hard coded for example purposes -async def get_async_data(): +async def async_data(): yield b'Some bytes 1' yield b'Some bytes 2' # Hard coded for example purposes -async def get_async_member_files(): +async def async_member_files(): yield ( 'my-file-1.txt', datetime.now(), S_IFREG | 0o600, ZIP_32, - get_async_data(), + async_data(), ) yield ( 'my-file-2.txt', datetime.now(), S_IFREG | 0o600, ZIP_32, - get_async_data(), + async_data(), ) async def main(): - async for chunk in async_stream_zip(get_async_member_files()): + async for chunk in async_stream_zip(async_member_files()): print(chunk) asyncio.run(main()) -``` \ No newline at end of file +``` + +> ### Warnings +> +> Under the hood `async_stream_zip` uses threads as a layer over the synchronous `stream_zip` function. This has two consequences: +> +> 1. A possible performance penalty over a theoretical implementation that doesn't use threads +> 2. A separate [contextvars](https://docs.python.org/3/library/contextvars.html) context in the async iterables of files or data to where async_stream_zip is called from. A copy of the wider context is passed in, but any changes made to the context from inside the iterables do not get propagated out diff --git a/docs/features.md b/docs/features.md index cdf9c72..56479c1 100644 --- a/docs/features.md +++ b/docs/features.md @@ -19,3 +19,5 @@ In addition to being memory efficient (with some [limitations](/get-started/#lim - Allows the specification of permissions on the member files and directories (although not all clients respect them) - By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows + +- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop) diff --git a/stream_zip.py b/stream_zip.py index 8ca9265..5b59f95 100644 --- a/stream_zip.py +++ b/stream_zip.py @@ -1,5 +1,7 @@ from collections import deque from struct import Struct +import asyncio +import contextvars import secrets import zlib @@ -715,6 +717,39 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz yield from evenly_sized(zipped_chunks) +async def async_stream_zip(member_files, *args, **kwargs): + + async def to_async_iterable(sync_iterable): + # asyncio.to_thread is not available until Python 3.9, and StopIteration doesn't get + # propagated by run_in_executor, so we use a sentinel to detect the end of the iterable + done = object() + it = iter(sync_iterable) + while True: + value = await loop.run_in_executor(None, contextvars.copy_context().run, next, it, done) + if value is done: + break + yield value + + def to_sync_iterable(async_iterable): + # The built-in aiter and anext functions are not available until Python 3.10 + async_it = async_iterable.__aiter__() + while True: + try: + value = asyncio.run_coroutine_threadsafe(async_it.__anext__(), loop).result() + except StopAsyncIteration: + break + yield value + + loop = asyncio.get_running_loop() + sync_member_files = ( + member_file[0:4] + (to_sync_iterable(member_file[4],),) + for member_file in to_sync_iterable(member_files) + ) + + async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): + yield chunk + + class ZipError(Exception): pass diff --git a/test_stream_zip.py b/test_stream_zip.py index 89bff61..04280de 100644 --- a/test_stream_zip.py +++ b/test_stream_zip.py @@ -1,6 +1,8 @@ from datetime import datetime, timezone, timedelta from io import BytesIO +import asyncio import contextlib +import contextvars import os import secrets import stat @@ -15,6 +17,7 @@ from stream_unzip import IncorrectAESPasswordError, UnsupportedZip64Error, stream_unzip from stream_zip import ( + async_stream_zip, stream_zip, NO_COMPRESSION_64, NO_COMPRESSION_32, @@ -32,6 +35,9 @@ ) +################################################################################################### +# Utility functions for tests + @contextlib.contextmanager def cwd(new_dir): old_dir = os.getcwd() @@ -50,6 +56,9 @@ def gen_bytes(num): yield chunk[:to_yield] +################################################################################################### +# Tests of sync interface: stream_zip + def test_with_stream_unzip_zip_64(): now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') mode = stat.S_IFREG | 0o600 @@ -1274,3 +1283,141 @@ def test_crc_32_not_in_file(method): assert crc_32[2:4] not in encrypted_bytes assert crc_32[0:3] not in encrypted_bytes assert crc_32[1:4] not in encrypted_bytes + + +################################################################################################### +# Tests of sync interface: async_stream_zip +# +# Under the hood we know that async_stream_zip delegates to stream_zip, so there isn't as much +# of a need to test everything. We have a brief test that it seems to work in one case, but +# otherwise focus on the riskiest parts: that exceptions don't propagate, that the async version +# doesn't actually stream, or that context vars are not propagated properly + +def test_async_stream_zip_equivalent_to_stream_unzip_zip_32_and_zip_64(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + def sync_files(): + yield 'file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000) + yield 'file-2', now, mode, ZIP_32, (b'c', b'd') + + async def async_files(): + async def data_1(): + yield b'a' * 10000 + yield b'b' * 10000 + + async def data_2(): + yield b'c' + yield b'd' + + yield 'file-1', now, mode, ZIP_64, data_1() + yield 'file-2', now, mode, ZIP_32, data_2() + + # Might not be performant, but good enough for the test + async def async_concat(chunks): + result = b'' + async for chunk in chunks: + result += chunk + return result + + async def test(): + assert b''.join(stream_zip(sync_files())) == await async_concat(async_stream_zip(async_files())) + + asyncio.run(test()) + + +def test_async_exception_propagates(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + async def async_data(): + yield b'-' + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + raise Exception('From generator') + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + with pytest.raises(Exception, match='From generator'): + asyncio.run(test()) + + +def test_async_exception_from_bytes_propagates(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + async def async_data(): + yield b'-' + raise Exception('From generator') + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + with pytest.raises(Exception, match='From generator'): + asyncio.run(test()) + + +def test_async_stream_zip_does_stream(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + state = [] + + async def async_data(): + for i in range(0, 4): + state.append('in') + for j in range(0, 1000): + yield b'-' * 64000 + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + + async def test(): + async for chunk in async_stream_zip(async_files()): + state.append('out') + + asyncio.run(test()) + assert state == ['in', 'in', 'out', 'in', 'out', 'in', 'out', 'out'] + + +def test_copy_of_context_variable_available_in_iterable(): + # Ideally the context would be identical in the iterables, because that's what a purely asyncio + # implementation of stream-zip would like do + + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + var = contextvars.ContextVar('test') + var.set('set-from-outer') + + inner_files = None + inner_bytes = None + + async def async_files(): + nonlocal inner_files, inner_bytes + + async def data_1(): + nonlocal inner_bytes + inner_bytes = var.get() + var.set('set-from-inner-bytes') + yield b'-' + + inner_files = var.get() + var.set('set-from-inner-files') + yield 'file-1', now, mode, ZIP_64, data_1() + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + asyncio.run(test()) + assert var.get() == 'set-from-outer' + assert inner_files == 'set-from-outer' + assert inner_bytes == 'set-from-outer'