Skip to content

Commit

Permalink
feat: async interface
Browse files Browse the repository at this point in the history
This brings the async example from the documentation into the codebase proper.

This is inspired by the PR and conversation at
uktrade/stream-unzip#80. To avoid duplication, threads
are used to use the sync code without blocking the event loop.
  • Loading branch information
michalc committed Feb 24, 2024
1 parent 9d89a1e commit 76d8928
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 38 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ In addition to being memory efficient (with some [limitations](https://stream-zi

- By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows

- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop)

<!-- --8<-- [end:features] -->

---
Expand Down
56 changes: 18 additions & 38 deletions docs/async-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,49 @@ title: Async interface
---


stream-zip does not include an async interface. However, it is possible to construct an async function that wraps stream-zip to allow the construction of zip files in a streaming way from async code without blocking the event loop.
An async interface is provided via the function `async_stream_zip`. Its usage is exactly the same as `stream_zip` except that:

```python
import asyncio
from stream_zip import stream_zip

async def async_stream_zip(member_files, *args, **kwargs):

async def to_async_iterable(sync_iterable):
# to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
done = object()
it = iter(sync_iterable)
while (value := await asyncio.to_thread(next, it, done)) is not done:
yield value

def to_sync_iterable(async_iterable):
done = object()
async_it = aiter(async_iterable)
while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done:
yield value

loop = asyncio.get_running_loop()
sync_member_files = (
member_file[0:4] + (to_sync_iterable(member_file[4],),)
for member_file in to_sync_iterable(member_files)
)

async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)):
yield chunk
```

The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable.
1. The member files must be provided as an async iterable of tuples.
2. The data of each member file must be provided as an async iterable of bytes.
3. Its return value is an async iterable of bytes.

```python
from datetime import datetime
from stat import S_IFREG
from stream_zip import ZIP_32
from stream_zip import async_stream_zip, ZIP_32

# Hard coded for example purposes
async def get_async_data():
async def async_data():
yield b'Some bytes 1'
yield b'Some bytes 2'

# Hard coded for example purposes
async def get_async_member_files():
async def async_member_files():
yield (
'my-file-1.txt',
datetime.now(),
S_IFREG | 0o600,
ZIP_32,
get_async_data(),
async_data(),
)
yield (
'my-file-2.txt',
datetime.now(),
S_IFREG | 0o600,
ZIP_32,
get_async_data(),
async_data(),
)

async def main():
async for chunk in async_stream_zip(get_async_member_files()):
async for chunk in async_stream_zip(async_member_files()):
print(chunk)

asyncio.run(main())
```
```

> ### Warnings
>
> Under the hood `async_stream_zip` uses threads as a layer over the synchronous `stream_zip` function. This has two consequences:
>
> 1. A possible performance penalty over a theoretical implementation that doesn't use threads
> 2. A separate [contextvars](https://docs.python.org/3/library/contextvars.html) context in the async iterables of files or data to where async_stream_zip is called from. A copy of the wider context is passed in, but any changes made to the context from inside the iterables do not get propagated out
2 changes: 2 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ In addition to being memory efficient (with some [limitations](/get-started/#lim
- Allows the specification of permissions on the member files and directories (although not all clients respect them)

- By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows

- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop)
35 changes: 35 additions & 0 deletions stream_zip.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import deque
from struct import Struct
import asyncio
import contextvars
import secrets
import zlib

Expand Down Expand Up @@ -715,6 +717,39 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz
yield from evenly_sized(zipped_chunks)


async def async_stream_zip(member_files, *args, **kwargs):

async def to_async_iterable(sync_iterable):
# asyncio.to_thread is not available until Python 3.9, and StopIteration doesn't get
# propagated by run_in_executor, so we use a sentinel to detect the end of the iterable
done = object()
it = iter(sync_iterable)
while True:
value = await loop.run_in_executor(None, contextvars.copy_context().run, next, it, done)
if value is done:
break
yield value

def to_sync_iterable(async_iterable):
# The built-in aiter and anext functions are not available until Python 3.10
async_it = async_iterable.__aiter__()
while True:
try:
value = asyncio.run_coroutine_threadsafe(async_it.__anext__(), loop).result()
except StopAsyncIteration:
break
yield value

loop = asyncio.get_running_loop()
sync_member_files = (
member_file[0:4] + (to_sync_iterable(member_file[4],),)
for member_file in to_sync_iterable(member_files)
)

async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)):
yield chunk


class ZipError(Exception):
pass

Expand Down
147 changes: 147 additions & 0 deletions test_stream_zip.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime, timezone, timedelta
from io import BytesIO
import asyncio
import contextlib
import contextvars
import os
import secrets
import stat
Expand All @@ -15,6 +17,7 @@
from stream_unzip import IncorrectAESPasswordError, UnsupportedZip64Error, stream_unzip

from stream_zip import (
async_stream_zip,
stream_zip,
NO_COMPRESSION_64,
NO_COMPRESSION_32,
Expand All @@ -32,6 +35,9 @@
)


###################################################################################################
# Utility functions for tests

@contextlib.contextmanager
def cwd(new_dir):
old_dir = os.getcwd()
Expand All @@ -50,6 +56,9 @@ def gen_bytes(num):
yield chunk[:to_yield]


###################################################################################################
# Tests of sync interface: stream_zip

def test_with_stream_unzip_zip_64():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600
Expand Down Expand Up @@ -1274,3 +1283,141 @@ def test_crc_32_not_in_file(method):
assert crc_32[2:4] not in encrypted_bytes
assert crc_32[0:3] not in encrypted_bytes
assert crc_32[1:4] not in encrypted_bytes


###################################################################################################
# Tests of sync interface: async_stream_zip
#
# Under the hood we know that async_stream_zip delegates to stream_zip, so there isn't as much
# of a need to test everything. We have a brief test that it seems to work in one case, but
# otherwise focus on the riskiest parts: that exceptions don't propagate, that the async version
# doesn't actually stream, or that context vars are not propagated properly

def test_async_stream_zip_equivalent_to_stream_unzip_zip_32_and_zip_64():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

def sync_files():
yield 'file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000)
yield 'file-2', now, mode, ZIP_32, (b'c', b'd')

async def async_files():
async def data_1():
yield b'a' * 10000
yield b'b' * 10000

async def data_2():
yield b'c'
yield b'd'

yield 'file-1', now, mode, ZIP_64, data_1()
yield 'file-2', now, mode, ZIP_32, data_2()

# Might not be performant, but good enough for the test
async def async_concat(chunks):
result = b''
async for chunk in chunks:
result += chunk
return result

async def test():
assert b''.join(stream_zip(sync_files())) == await async_concat(async_stream_zip(async_files()))

asyncio.run(test())


def test_async_exception_propagates():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

async def async_data():
yield b'-'

async def async_files():
yield 'file-1', now, mode, ZIP_64, async_data()
raise Exception('From generator')

async def test():
async for chunk in async_stream_zip(async_files()):
pass

with pytest.raises(Exception, match='From generator'):
asyncio.run(test())


def test_async_exception_from_bytes_propagates():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

async def async_data():
yield b'-'
raise Exception('From generator')

async def async_files():
yield 'file-1', now, mode, ZIP_64, async_data()

async def test():
async for chunk in async_stream_zip(async_files()):
pass

with pytest.raises(Exception, match='From generator'):
asyncio.run(test())


def test_async_stream_zip_does_stream():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

state = []

async def async_data():
for i in range(0, 4):
state.append('in')
for j in range(0, 1000):
yield b'-' * 64000

async def async_files():
yield 'file-1', now, mode, ZIP_64, async_data()

async def test():
async for chunk in async_stream_zip(async_files()):
state.append('out')

asyncio.run(test())
assert state == ['in', 'in', 'out', 'in', 'out', 'in', 'out', 'out']


def test_copy_of_context_variable_available_in_iterable():
# Ideally the context would be identical in the iterables, because that's what a purely asyncio
# implementation of stream-zip would like do

now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

var = contextvars.ContextVar('test')
var.set('set-from-outer')

inner_files = None
inner_bytes = None

async def async_files():
nonlocal inner_files, inner_bytes

async def data_1():
nonlocal inner_bytes
inner_bytes = var.get()
var.set('set-from-inner-bytes')
yield b'-'

inner_files = var.get()
var.set('set-from-inner-files')
yield 'file-1', now, mode, ZIP_64, data_1()

async def test():
async for chunk in async_stream_zip(async_files()):
pass

asyncio.run(test())
assert var.get() == 'set-from-outer'
assert inner_files == 'set-from-outer'
assert inner_bytes == 'set-from-outer'

0 comments on commit 76d8928

Please sign in to comment.