Skip to content

Commit

Permalink
feat: async interface
Browse files Browse the repository at this point in the history
This brings the async example from the documentation into the codebase proper.

This is inspired by the PR and conversation at
uktrade/stream-unzip#80. To avoid duplication, threads
are used to use the sync code without blocking the event loop.
  • Loading branch information
michalc committed Feb 24, 2024
1 parent 9d89a1e commit 3075648
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 38 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ In addition to being memory efficient (with some [limitations](https://stream-zi

- By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows

- By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows

<!-- --8<-- [end:features] -->

---
Expand Down
51 changes: 13 additions & 38 deletions docs/async-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,44 @@ title: Async interface
---


stream-zip does not include an async interface. However, it is possible to construct an async function that wraps stream-zip to allow the construction of zip files in a streaming way from async code without blocking the event loop.
An async interface is provided via the function `async_stream_zip`. Its usage is exactly the same as `stream_zip` except that:

```python
import asyncio
from stream_zip import stream_zip

async def async_stream_zip(member_files, *args, **kwargs):

async def to_async_iterable(sync_iterable):
# to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
done = object()
it = iter(sync_iterable)
while (value := await asyncio.to_thread(next, it, done)) is not done:
yield value

def to_sync_iterable(async_iterable):
done = object()
async_it = aiter(async_iterable)
while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done:
yield value

loop = asyncio.get_running_loop()
sync_member_files = (
member_file[0:4] + (to_sync_iterable(member_file[4],),)
for member_file in to_sync_iterable(member_files)
)

async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)):
yield chunk
```

The above allows the member files to be supplied by an async iterable, and the data of each member file to be supplied by an async iterable.
1. The member files must be provided as an async iterable of tuples.
2. The data of each member file must be provided as an async iterable of bytes.
3. Its return value is an async iterable of bytes.

```python
from datetime import datetime
from stat import S_IFREG
from stream_zip import ZIP_32
from stream_zip import async_stream_zip, ZIP_32

# Hard coded for example purposes
async def get_async_data():
async def async_data():
yield b'Some bytes 1'
yield b'Some bytes 2'

# Hard coded for example purposes
async def get_async_member_files():
async def async_member_files():
yield (
'my-file-1.txt',
datetime.now(),
S_IFREG | 0o600,
ZIP_32,
get_async_data(),
async_data(),
)
yield (
'my-file-2.txt',
datetime.now(),
S_IFREG | 0o600,
ZIP_32,
get_async_data(),
async_data(),
)

async def main():
async for chunk in async_stream_zip(get_async_member_files()):
async for chunk in async_stream_zip(async_member_files()):
print(chunk)

asyncio.run(main())
```
```

> Under the hood `async_stream_zip` uses threads as a layer over the synchronous `stream_zip` function.
2 changes: 2 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ In addition to being memory efficient (with some [limitations](/get-started/#lim
- Allows the specification of permissions on the member files and directories (although not all clients respect them)

- By default stores modification time as an extended timestamp. An extended timestamp is a more accurate timestamp than the original ZIP format allows

- Provides an async interface (that uses threads under the hood to share code with sync interface without blocking the event loop)
32 changes: 32 additions & 0 deletions stream_zip.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import deque
from struct import Struct
import asyncio
import secrets
import zlib

Expand Down Expand Up @@ -715,6 +716,37 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz
yield from evenly_sized(zipped_chunks)


async def async_stream_zip(member_files, *args, **kwargs):

async def to_async_iterable(sync_iterable):
# to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
done = object()
it = iter(sync_iterable)
while True:
value = await asyncio.to_thread(next, it, done)
if value is done:
break
yield value

def to_sync_iterable(async_iterable):
done = object()
async_it = aiter(async_iterable)
while True:
value = asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()
if value is done:
break
yield value

loop = asyncio.get_running_loop()
sync_member_files = (
member_file[0:4] + (to_sync_iterable(member_file[4],),)
for member_file in to_sync_iterable(member_files)
)

async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)):
yield chunk


class ZipError(Exception):
pass

Expand Down
110 changes: 110 additions & 0 deletions test_stream_zip.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timezone, timedelta
from io import BytesIO
import asyncio
import contextlib
import os
import secrets
Expand All @@ -15,6 +16,7 @@
from stream_unzip import IncorrectAESPasswordError, UnsupportedZip64Error, stream_unzip

from stream_zip import (
async_stream_zip,
stream_zip,
NO_COMPRESSION_64,
NO_COMPRESSION_32,
Expand All @@ -32,6 +34,9 @@
)


###################################################################################################
# Utility functions for tests

@contextlib.contextmanager
def cwd(new_dir):
old_dir = os.getcwd()
Expand All @@ -50,6 +55,9 @@ def gen_bytes(num):
yield chunk[:to_yield]


###################################################################################################
# Tests of sync interface: stream_zip

def test_with_stream_unzip_zip_64():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600
Expand Down Expand Up @@ -1274,3 +1282,105 @@ def test_crc_32_not_in_file(method):
assert crc_32[2:4] not in encrypted_bytes
assert crc_32[0:3] not in encrypted_bytes
assert crc_32[1:4] not in encrypted_bytes


###################################################################################################
# Tests of sync interface: async_stream_zip
#
# Under the hood we know that async_stream_zip delegates to stream_zip, so there isn't as much
# of a need to test everything. We have a brief test that it seems to work in one case, but
# otherwise focus on the riskiest parts: that exceptions don't propagate, or that the async
# version doesn't actually stream

def test_async_stream_zip_equivalent_to_stream_unzip_zip_32_and_zip_64():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

def sync_files():
yield 'file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000)
yield 'file-2', now, mode, ZIP_32, (b'c', b'd')

async def async_files():
async def data_1():
yield b'a' * 10000
yield b'b' * 10000

async def data_2():
yield b'c'
yield b'd'

yield 'file-1', now, mode, ZIP_64, data_1()
yield 'file-2', now, mode, ZIP_32, data_2()

# Might not be performant, but good enough for the test
async def async_concat(chunks):
result = b''
async for chunk in chunks:
result += chunk
return result

async def test():
assert b''.join(stream_zip(sync_files())) == await async_concat(async_stream_zip(async_files()))

asyncio.run(test())


def test_async_exception_propagates():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

async def async_data():
yield b'-'

async def async_files():
yield 'file-1', now, mode, ZIP_64, async_data()
raise Exception('From generator')

async def test():
async for chunk in async_stream_zip(async_files()):
pass

with pytest.raises(Exception, match='From generator'):
asyncio.run(test())


def test_async_exception_from_bytes_propagates():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

async def async_data():
yield b'-'
raise Exception('From generator')

async def async_files():
yield 'file-1', now, mode, ZIP_64, async_data()

async def test():
async for chunk in async_stream_zip(async_files()):
pass

with pytest.raises(Exception, match='From generator'):
asyncio.run(test())


def test_async_stream_zip_does_stream():
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600

state = []

async def async_data():
for i in range(0, 4):
state.append('in')
for j in range(0, 1000):
yield b'-' * 64000

async def async_files():
yield 'file-1', now, mode, ZIP_64, async_data()

async def test():
async for chunk in async_stream_zip(async_files()):
state.append('out')

asyncio.run(test())
assert state == ['in', 'in', 'out', 'in', 'out', 'in', 'out', 'out']

0 comments on commit 3075648

Please sign in to comment.