From 1abe67d850a6ed419f2eca6391dd8cd0454e33a2 Mon Sep 17 00:00:00 2001 From: Michal Charemza Date: Sat, 24 Feb 2024 13:03:11 +0000 Subject: [PATCH] feat: async interface This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at https://github.com/uktrade/stream-unzip/pull/80. To avoid duplication, threads are used to use the sync code without blocking the event loop. --- README.md | 2 + docs/async-interface.md | 61 ++++++---------- docs/features.md | 2 + stream_zip.py | 43 +++++++++++ test_stream_zip.py | 158 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 228 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index c491291..98762ab 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ In addition to being memory efficient (with some [limitations](https://stream-zi - By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows +- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop) + --- diff --git a/docs/async-interface.md b/docs/async-interface.md index 7d1da00..f6ffa43 100644 --- a/docs/async-interface.md +++ b/docs/async-interface.md @@ -5,69 +5,54 @@ title: Async interface --- -stream-zip does not include an async interface. However, it is possible to construct an async function that wraps stream-zip to allow the construction of zip files in a streaming way from async code without blocking the event loop. +An async interface is provided via the function `async_stream_zip`. Its usage is exactly the same as `stream_zip` except that: -```python -import asyncio -from stream_zip import stream_zip - -async def async_stream_zip(member_files, *args, **kwargs): - - async def to_async_iterable(sync_iterable): - # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end - done = object() - it = iter(sync_iterable) - while (value := await asyncio.to_thread(next, it, done)) is not done: - yield value - - def to_sync_iterable(async_iterable): - done = object() - async_it = aiter(async_iterable) - while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done: - yield value - - loop = asyncio.get_running_loop() - sync_member_files = ( - member_file[0:4] + (to_sync_iterable(member_file[4],),) - for member_file in to_sync_iterable(member_files) - ) - - async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): - yield chunk -``` - -The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable. +1. The member files must be provided as an async iterable of tuples. +2. The data of each member file must be provided as an async iterable of bytes. +3. Its return value is an async iterable of bytes. ```python from datetime import datetime from stat import S_IFREG -from stream_zip import ZIP_32 +from stream_zip import async_stream_zip, ZIP_32 # Hard coded for example purposes -async def get_async_data(): +async def async_data(): yield b'Some bytes 1' yield b'Some bytes 2' # Hard coded for example purposes -async def get_async_member_files(): +async def async_member_files(): yield ( 'my-file-1.txt', datetime.now(), S_IFREG | 0o600, ZIP_32, - get_async_data(), + async_data(), ) yield ( 'my-file-2.txt', datetime.now(), S_IFREG | 0o600, ZIP_32, - get_async_data(), + async_data(), ) async def main(): - async for chunk in async_stream_zip(get_async_member_files()): + async for chunk in async_stream_zip(async_member_files()): print(chunk) asyncio.run(main()) -``` \ No newline at end of file +``` + +> ### Warnings +> +> Under the hood `async_stream_zip` uses threads as a layer over the synchronous `stream_zip` function. This has two consequences: +> +> 1. A possible performance penalty over a theoretical implementation that is pure async without threads. +> +> 2. The [contextvars](https://docs.python.org/3/library/contextvars.html) context available in the async iterables of files or data is a shallow copy of the context where async_stream_zip is called from. +> +> This means that existing context variables are available inside the iterables, but any changes made to the context itself from inside the iterables will not propagate out to the original context. Changes made to mutable data structures that are part of the context, for example dictionaries, will propagate out. +> +> This does not affect Python 3.6, because contextvars is not available. diff --git a/docs/features.md b/docs/features.md index cdf9c72..56479c1 100644 --- a/docs/features.md +++ b/docs/features.md @@ -19,3 +19,5 @@ In addition to being memory efficient (with some [limitations](/get-started/#lim - Allows the specification of permissions on the member files and directories (although not all clients respect them) - By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows + +- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop) diff --git a/stream_zip.py b/stream_zip.py index 8ca9265..3e577a2 100644 --- a/stream_zip.py +++ b/stream_zip.py @@ -1,5 +1,6 @@ from collections import deque from struct import Struct +import asyncio import secrets import zlib @@ -715,6 +716,48 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz yield from evenly_sized(zipped_chunks) +async def async_stream_zip(member_files, *args, **kwargs): + + async def to_async_iterable(sync_iterable): + # asyncio.to_thread is not available until Python 3.9, and StopIteration doesn't get + # propagated by run_in_executor, so we use a sentinel to detect the end of the iterable + done = object() + it = iter(sync_iterable) + + # contextvars are not available until Python 3.7 + try: + import contextvars + except ImportError: + get_args = lambda: (next, it, done) + else: + get_args = lambda: (contextvars.copy_context().run, next, it, done) + + while True: + value = await loop.run_in_executor(None, *get_args()) + if value is done: + break + yield value + + def to_sync_iterable(async_iterable): + # The built-in aiter and anext functions are not available until Python 3.10 + async_it = async_iterable.__aiter__() + while True: + try: + value = asyncio.run_coroutine_threadsafe(async_it.__anext__(), loop).result() + except StopAsyncIteration: + break + yield value + + loop = asyncio.get_running_loop() + sync_member_files = ( + member_file[0:4] + (to_sync_iterable(member_file[4],),) + for member_file in to_sync_iterable(member_files) + ) + + async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): + yield chunk + + class ZipError(Exception): pass diff --git a/test_stream_zip.py b/test_stream_zip.py index 89bff61..e87907b 100644 --- a/test_stream_zip.py +++ b/test_stream_zip.py @@ -1,7 +1,10 @@ from datetime import datetime, timezone, timedelta from io import BytesIO +import asyncio import contextlib +import contextvars import os +import platform import secrets import stat import subprocess @@ -15,6 +18,7 @@ from stream_unzip import IncorrectAESPasswordError, UnsupportedZip64Error, stream_unzip from stream_zip import ( + async_stream_zip, stream_zip, NO_COMPRESSION_64, NO_COMPRESSION_32, @@ -32,6 +36,9 @@ ) +################################################################################################### +# Utility functions for tests + @contextlib.contextmanager def cwd(new_dir): old_dir = os.getcwd() @@ -50,6 +57,9 @@ def gen_bytes(num): yield chunk[:to_yield] +################################################################################################### +# Tests of sync interface: stream_zip + def test_with_stream_unzip_zip_64(): now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') mode = stat.S_IFREG | 0o600 @@ -1274,3 +1284,151 @@ def test_crc_32_not_in_file(method): assert crc_32[2:4] not in encrypted_bytes assert crc_32[0:3] not in encrypted_bytes assert crc_32[1:4] not in encrypted_bytes + + +################################################################################################### +# Tests of sync interface: async_stream_zip +# +# Under the hood we know that async_stream_zip delegates to stream_zip, so there isn't as much +# of a need to test everything. We have a brief test that it seems to work in one case, but +# otherwise focus on the riskiest parts: that exceptions don't propagate, that the async version +# doesn't actually stream, or that context vars are not propagated properly + +def test_async_stream_zip_equivalent_to_stream_unzip_zip_32_and_zip_64(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + def sync_files(): + yield 'file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000) + yield 'file-2', now, mode, ZIP_32, (b'c', b'd') + + async def async_files(): + async def data_1(): + yield b'a' * 10000 + yield b'b' * 10000 + + async def data_2(): + yield b'c' + yield b'd' + + yield 'file-1', now, mode, ZIP_64, data_1() + yield 'file-2', now, mode, ZIP_32, data_2() + + # Might not be performant, but good enough for the test + async def async_concat(chunks): + result = b'' + async for chunk in chunks: + result += chunk + return result + + async def test(): + assert b''.join(stream_zip(sync_files())) == await async_concat(async_stream_zip(async_files())) + + asyncio.run(test()) + + +def test_async_exception_propagates(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + async def async_data(): + yield b'-' + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + raise Exception('From generator') + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + with pytest.raises(Exception, match='From generator'): + asyncio.run(test()) + + +def test_async_exception_from_bytes_propagates(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + async def async_data(): + yield b'-' + raise Exception('From generator') + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + with pytest.raises(Exception, match='From generator'): + asyncio.run(test()) + + +def test_async_stream_zip_does_stream(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + state = [] + + async def async_data(): + for i in range(0, 4): + state.append('in') + for j in range(0, 1000): + yield b'-' * 64000 + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + + async def test(): + async for chunk in async_stream_zip(async_files()): + state.append('out') + + asyncio.run(test()) + assert state == ['in', 'in', 'out', 'in', 'out', 'in', 'out', 'out'] + + +@pytest.mark.skipif( + tuple(int(v) for v in platform.python_version().split('.')) < (3,7,0), + reason="contextvars are not supported before Python 3.7.0", +) +def test_copy_of_context_variable_available_in_iterable(): + # Ideally the context would be identical in the iterables, because that's what a purely asyncio + # implementation of stream-zip would likely do + + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + var = contextvars.ContextVar('test') + var.set('set-from-outer') + + d = contextvars.ContextVar('d') + d.set({'key': 'original-value'}) + + inner_files = None + inner_bytes = None + + async def async_files(): + nonlocal inner_files, inner_bytes + + async def data_1(): + nonlocal inner_bytes + inner_bytes = var.get() + var.set('set-from-inner-bytes') + d.get()['key'] = 'set-from-inner-bytes' + yield b'-' + + inner_files = var.get() + var.set('set-from-inner-files') + d.get()['key'] = 'set-from-inner-files' + yield 'file-1', now, mode, ZIP_64, data_1() + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + asyncio.run(test()) + assert var.get() == 'set-from-outer' + assert inner_files == 'set-from-outer' + assert inner_bytes == 'set-from-outer' + assert d.get()['key'] == 'set-from-inner-bytes'