From 7ce6e873a24bd2d165de2fb6fd815823b033fcf6 Mon Sep 17 00:00:00 2001 From: Michal Charemza Date: Sat, 24 Feb 2024 13:03:11 +0000 Subject: [PATCH] feat: async interface This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at https://github.com/uktrade/stream-unzip/pull/80. To avoid duplication, threads are used to use the sync code without blocking the event loop. --- README.md | 2 + docs/async-interface.md | 49 +++++------------- docs/features.md | 2 + stream_zip.py | 26 ++++++++++ test_stream_zip.py | 110 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 152 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index c491291..31491ac 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ In addition to being memory efficient (with some [limitations](https://stream-zi - By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows +- By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows + --- diff --git a/docs/async-interface.md b/docs/async-interface.md index 7d1da00..709b24e 100644 --- a/docs/async-interface.md +++ b/docs/async-interface.md @@ -5,69 +5,44 @@ title: Async interface --- -stream-zip does not include an async interface. However, it is possible to construct an async function that wraps stream-zip to allow the construction of zip files in a streaming way from async code without blocking the event loop. +An async interface is provided via the function `async_stream_zip`. Its usage is exactly the same as `stream_zip` except that -```python -import asyncio -from stream_zip import stream_zip - -async def async_stream_zip(member_files, *args, **kwargs): - - async def to_async_iterable(sync_iterable): - # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end - done = object() - it = iter(sync_iterable) - while (value := await asyncio.to_thread(next, it, done)) is not done: - yield value +1. The member files must be provided as an async iterable of tuples. +2. The data of each member file must be provided as an async iterable of bytes. +3. The return value is an async iterable of bytes. - def to_sync_iterable(async_iterable): - done = object() - async_it = aiter(async_iterable) - while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done: - yield value - - loop = asyncio.get_running_loop() - sync_member_files = ( - member_file[0:4] + (to_sync_iterable(member_file[4],),) - for member_file in to_sync_iterable(member_files) - ) - - async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): - yield chunk -``` - -The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable. +An example of its usage: ```python from datetime import datetime from stat import S_IFREG -from stream_zip import ZIP_32 +from stream_zip import async_stream_zip, ZIP_32 # Hard coded for example purposes -async def get_async_data(): +async def async_data(): yield b'Some bytes 1' yield b'Some bytes 2' # Hard coded for example purposes -async def get_async_member_files(): +async def async_member_files(): yield ( 'my-file-1.txt', datetime.now(), S_IFREG | 0o600, ZIP_32, - get_async_data(), + async_data(), ) yield ( 'my-file-2.txt', datetime.now(), S_IFREG | 0o600, ZIP_32, - get_async_data(), + async_data(), ) async def main(): - async for chunk in async_stream_zip(get_async_member_files()): + async for chunk in async_stream_zip(async_member_files()): print(chunk) asyncio.run(main()) -``` \ No newline at end of file +``` diff --git a/docs/features.md b/docs/features.md index cdf9c72..56479c1 100644 --- a/docs/features.md +++ b/docs/features.md @@ -19,3 +19,5 @@ In addition to being memory efficient (with some [limitations](/get-started/#lim - Allows the specification of permissions on the member files and directories (although not all clients respect them) - By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows + +- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop) diff --git a/stream_zip.py b/stream_zip.py index 8ca9265..212bc45 100644 --- a/stream_zip.py +++ b/stream_zip.py @@ -1,5 +1,6 @@ from collections import deque from struct import Struct +import asyncio import secrets import zlib @@ -715,6 +716,31 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz yield from evenly_sized(zipped_chunks) +async def async_stream_zip(member_files, *args, **kwargs): + + async def to_async_iterable(sync_iterable): + # to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end + done = object() + it = iter(sync_iterable) + while (value := await asyncio.to_thread(next, it, done)) is not done: + yield value + + def to_sync_iterable(async_iterable): + done = object() + async_it = aiter(async_iterable) + while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done: + yield value + + loop = asyncio.get_running_loop() + sync_member_files = ( + member_file[0:4] + (to_sync_iterable(member_file[4],),) + for member_file in to_sync_iterable(member_files) + ) + + async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)): + yield chunk + + class ZipError(Exception): pass diff --git a/test_stream_zip.py b/test_stream_zip.py index 89bff61..bf124f7 100644 --- a/test_stream_zip.py +++ b/test_stream_zip.py @@ -1,5 +1,6 @@ from datetime import datetime, timezone, timedelta from io import BytesIO +import asyncio import contextlib import os import secrets @@ -15,6 +16,7 @@ from stream_unzip import IncorrectAESPasswordError, UnsupportedZip64Error, stream_unzip from stream_zip import ( + async_stream_zip, stream_zip, NO_COMPRESSION_64, NO_COMPRESSION_32, @@ -32,6 +34,9 @@ ) +################################################################################################### +# Utility functions for tests + @contextlib.contextmanager def cwd(new_dir): old_dir = os.getcwd() @@ -50,6 +55,9 @@ def gen_bytes(num): yield chunk[:to_yield] +################################################################################################### +# Tests of sync interface: stream_zip + def test_with_stream_unzip_zip_64(): now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') mode = stat.S_IFREG | 0o600 @@ -1274,3 +1282,105 @@ def test_crc_32_not_in_file(method): assert crc_32[2:4] not in encrypted_bytes assert crc_32[0:3] not in encrypted_bytes assert crc_32[1:4] not in encrypted_bytes + + +################################################################################################### +# Tests of sync interface: async_stream_zip +# +# Under the hood we know that async_stream_zip delegates to stream_zip, so there isn't as much +# of a need to test everything. We have a brief test that it seems to work in one case, but +# otherwise focus on the riskiest parts: that exceptions don't propagate, or that the async +# version doesn't actually stream + +def test_async_stream_zip_equivalent_to_stream_unzip_zip_32_and_zip_64(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + def sync_files(): + yield 'file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000) + yield 'file-2', now, mode, ZIP_32, (b'c', b'd') + + async def async_files(): + async def data_1(): + yield b'a' * 10000 + yield b'b' * 10000 + + async def data_2(): + yield b'c' + yield b'd' + + yield 'file-1', now, mode, ZIP_64, data_1() + yield 'file-2', now, mode, ZIP_32, data_2() + + # Might not be performant, but good enough for the test + async def async_concat(chunks): + result = b'' + async for chunk in chunks: + result += chunk + return result + + async def test(): + assert b''.join(stream_zip(sync_files())) == await async_concat(async_stream_zip(async_files())) + + asyncio.run(test()) + + +def test_async_exception_propagates(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + async def async_data(): + yield b'-' + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + raise Exception('From generator') + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + with pytest.raises(Exception, match='From generator'): + asyncio.run(test()) + + +def test_async_exception_from_bytes_propagates(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + async def async_data(): + yield b'-' + raise Exception('From generator') + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + + async def test(): + async for chunk in async_stream_zip(async_files()): + pass + + with pytest.raises(Exception, match='From generator'): + asyncio.run(test()) + + +def test_async_stream_zip_does_stream(): + now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S') + mode = stat.S_IFREG | 0o600 + + state = [] + + async def async_data(): + for i in range(0, 4): + state.append('in') + for j in range(0, 1000): + yield b'-' * 64000 + + async def async_files(): + yield 'file-1', now, mode, ZIP_64, async_data() + + async def test(): + async for chunk in async_stream_zip(async_files()): + state.append('out') + + asyncio.run(test()) + assert state == ['in', 'in', 'out', 'in', 'out', 'in', 'out', 'out']