From efd99f6d2b16669453823bda3af2f96e88f1c387 Mon Sep 17 00:00:00 2001 From: Michal Charemza Date: Sat, 23 Mar 2024 13:43:08 +0000 Subject: [PATCH] feat: async interface (via threads) This adds a function, async_stream_unzip, that's the async version of stream_unzip. It's inspired by the code and discussion in the PR at https://github.com/uktrade/stream-unzip/pull/80. However, to avoid duplication, we have a threads-based layer to call into the existing stream_unzip function, rather than have an almost identical copy but with async/await where needed in it. This is similar to how https://github.com/uktrade/stream-zip provides an async interface. --- docs/async-interface.md | 41 +++++++++++++++ docs/exceptions.md | 2 +- docs/features.md | 2 + docs/publish-a-release.md | 2 +- stream_unzip.py | 45 ++++++++++++++++ test.py | 106 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 docs/async-interface.md diff --git a/docs/async-interface.md b/docs/async-interface.md new file mode 100644 index 0000000..26324a6 --- /dev/null +++ b/docs/async-interface.md @@ -0,0 +1,41 @@ +--- +layout: sub-navigation +order: 3 +title: Async interface +--- + + +An async interface is provided via the function `async_stream_unzip`. Its usage is exactly the same as `stream_zip` except that: + +1. The input must be an async iterable of bytes. +2. The member files are output as an async iterable of tuples. +3. The data of each member file is returned as an async iterable of bytes. + +```python +from stream_unzip import stream_unzip +import httpx + +async def zipped_chunks(): + # Iterable that yields the bytes of a zip file + async with httpx.stream('GET', 'https://www.example.com/my.zip') as r: + yield from r.iter_bytes(chunk_size=65536) + +async def main(): + async for file_name, file_size, unzipped_chunks in async_stream_unzip(zipped_chunks(), password=b'my-password'): + async for chunk in unzipped_chunks: + print(chunk) + +asyncio.run(main()) +``` + +> ### Warnings +> +> Under the hood `async_stream_unzip` uses threads as a layer over the synchronous `stream_unzip` function. This has two consequences: +> +> 1. A possible performance penalty over a theoretical implementation that is pure async without threads. +> +> 2. The [contextvars](https://docs.python.org/3/library/contextvars.html) context available in the async iterables of files or data is a shallow copy of the context where async_stream_unzip is called from. +> +> This means that existing context variables are available inside the input iterable, but any changes made to the context itself from inside the iterable will not propagate out to the original context. Changes made to mutable data structures that are part of the context, for example dictionaries, will propagate out. +> +> This does not affect Python 3.6, because contextvars is not available. diff --git a/docs/exceptions.md b/docs/exceptions.md index a7493df..0d45c29 100644 --- a/docs/exceptions.md +++ b/docs/exceptions.md @@ -1,6 +1,6 @@ --- layout: sub-navigation -order: 3 +order: 4 title: Exceptions --- diff --git a/docs/features.md b/docs/features.md index f8f0fe9..71d5943 100644 --- a/docs/features.md +++ b/docs/features.md @@ -20,3 +20,5 @@ In addition to being memory efficient, stream-unzip supports: - ZIP files created by Java's ZipOutputStream that are larger than 4GiB. At the time of writing libarchive-based stream readers cannot read these without error. - BZip2-compressed ZIPs. + +- An async interface (that uses threads under the hood). diff --git a/docs/publish-a-release.md b/docs/publish-a-release.md index 82a38d4..d74e35f 100644 --- a/docs/publish-a-release.md +++ b/docs/publish-a-release.md @@ -1,6 +1,6 @@ --- layout: sub-navigation -order: 5 +order: 6 title: How to publish a release --- diff --git a/stream_unzip.py b/stream_unzip.py index 60d0922..da2fec5 100644 --- a/stream_unzip.py +++ b/stream_unzip.py @@ -1,5 +1,6 @@ from functools import partial from struct import Struct +import asyncio import bz2 import zlib @@ -462,6 +463,50 @@ def all(): for _ in unzipped_chunks: raise UnfinishedIterationError() + +async def async_stream_unzip(chunks, *args, **kwargs): + + async def to_async_iterable(sync_iterable): + # asyncio.to_thread is not available until Python 3.9, and StopIteration doesn't get + # propagated by run_in_executor, so we use a sentinel to detect the end of the iterable + done = object() + it = iter(sync_iterable) + + # contextvars are not available until Python 3.7 + try: + import contextvars + except ImportError: + get_args = lambda: (next, it, done) + else: + get_args = lambda: (contextvars.copy_context().run, next, it, done) + + while True: + value = await loop.run_in_executor(None, *get_args()) + if value is done: + break + yield value + + def to_sync_iterable(async_iterable): + # The built-in aiter and anext functions are not available until Python 3.10 + async_it = async_iterable.__aiter__() + while True: + try: + value = asyncio.run_coroutine_threadsafe(async_it.__anext__(), loop).result() + except StopAsyncIteration: + break + yield value + + # get_running_loop is preferred, but isn't available until Python 3.7 + try: + loop = asyncio.get_running_loop() + except: + loop = asyncio.get_event_loop() + unzipped_chunks = stream_unzip(to_sync_iterable(chunks), *args, **kwargs) + + async for name, size, chunks in to_async_iterable(unzipped_chunks): + yield name, size, to_async_iterable(chunks) + + class UnzipError(Exception): pass diff --git a/test.py b/test.py index 42310f5..a12cd1c 100644 --- a/test.py +++ b/test.py @@ -1,11 +1,14 @@ +import asyncio import itertools import io +import platform import unittest import uuid import random import zipfile from stream_unzip import ( + async_stream_unzip, stream_unzip, UnfinishedIterationError, TruncatedDataError, @@ -746,3 +749,106 @@ def yield_input(): for name, size, chunks in stream_unzip(yield_input()): for chunk in chunks: pass + + def test_async_stream_unzip(self): + async def async_bytes(): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', b'-' * 100000) + zf.writestr('second.txt', b'*' * 100000) + zip_bytes = file.getvalue() + + yield zip_bytes + + results = [] + + async def test(): + async for name, size, chunks in async_stream_unzip(async_bytes()): + b = b'' + async for chunk in chunks: + b += chunk + results.append((name, size, b)) + + asyncio.get_event_loop().run_until_complete(test()) + self.assertEqual(results, [ + (b'first.txt', 100000, b'-' * 100000), + (b'second.txt', 100000, b'*' * 100000), + ]) + + def test_async_exception_from_bytes_propagates(self): + async def async_bytes(): + yield b'P' + raise Exception('From async bytes') + + async def test(): + await async_stream_unzip(async_bytes()).__aiter__().__anext__() + + with self.assertRaisesRegex(Exception, 'From async bytes'): + asyncio.get_event_loop().run_until_complete(test()) + + def test_async_does_stream(self): + state = [] + + async def async_bytes(): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', b'-' * 100000) + zf.writestr('second.txt', b'*' * 100000) + zip_bytes = file.getvalue() + + chunk_size = 100 + for i in range(0, len(zip_bytes), chunk_size): + state.append('in') + yield zip_bytes[i:i + chunk_size] + await asyncio.sleep(0) + + async def test(): + async for name, size, chunks in async_stream_unzip(async_bytes()): + async for chunk in chunks: + state.append('out') + + asyncio.get_event_loop().run_until_complete(test()) + self.assertEqual(state, ['in', 'out', 'in', 'out', 'in', 'out', 'out', 'in', 'out', 'in']) + + @unittest.skipIf( + tuple(int(v) for v in platform.python_version().split('.')) < (3,7,0), + "contextvars are not supported before Python 3.7.0", + ) + def test_copy_of_context_variable_available_in_iterable(self): + # Ideally the context would be identical in the iterables, because that's what a purely asyncio + # implementation of stream-zip would likely do + + import contextvars + + var = contextvars.ContextVar('test') + var.set('set-from-outer') + + d = contextvars.ContextVar('d') + d.set({'key': 'original-value'}) + + inner = None + + async def async_bytes(): + nonlocal inner + inner = var.get() + + var.set('set-from-inner') + d.get()['key'] = 'set-from-inner' + + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('first.txt', b'-' * 100000) + zf.writestr('second.txt', b'*' * 100000) + zip_bytes = file.getvalue() + + yield zip_bytes + + async def test(): + async for name, size, chunks in async_stream_unzip(async_bytes()): + async for chunk in chunks: + pass + + asyncio.get_event_loop().run_until_complete(test()) + self.assertEqual(var.get(), 'set-from-outer') + self.assertEqual(inner, 'set-from-outer') + self.assertEqual(d.get()['key'], 'set-from-inner')