Skip to content

Commit

Permalink
feat: async interface (via threads)
Browse files Browse the repository at this point in the history
This adds a function, async_stream_unzip, that's the async version of
stream_unzip.

It's inspired by the code and discussion in the PR at
#80. However, to avoid duplication,
we have a threads-based layer to call into the existing stream_unzip function,
rather than have an almost identical copy but with async/await where needed in
it. This is similar to how https://github.com/uktrade/stream-zip provides an
async interface.
  • Loading branch information
michalc committed Mar 23, 2024
1 parent 3f74ad7 commit dc8902b
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 2 deletions.
46 changes: 46 additions & 0 deletions docs/async-interface.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
layout: sub-navigation
order: 3
title: Async interface
---


An async interface is provided via the function `async_stream_unzip`. Its usage is exactly the same as `stream_zip` except that:

1. The input must be an async iterable of bytes.
2. The member files are output as an async iterable of tuples.
3. The data of each member file is returned as an async iterable of bytes.

```python
from stream_unzip import stream_unzip
import httpx

async def zipped_chunks(client):
# Iterable that yields the bytes of a zip file
async with client.stream('GET', 'https://www.example.com/my.zip') as r:
async for chunk in r.aiter_bytes(chunk_size=65536):
yield chunk

async def main():
async with httpx.AsyncClient() as client:
async for file_name, file_size, unzipped_chunks in async_stream_unzip(
zipped_chunks(client),
password=b'my-password',
):
async for chunk in unzipped_chunks:
print(chunk)

asyncio.run(main())
```

> ### Warnings
>
> Under the hood `async_stream_unzip` uses threads as a layer over the synchronous `stream_unzip` function. This has two consequences:
>
> 1. A possible performance penalty over a theoretical implementation that is pure async without threads.
>
> 2. The [contextvars](https://docs.python.org/3/library/contextvars.html) context available in the async iterables of files or data is a shallow copy of the context where async_stream_unzip is called from.
>
> This means that existing context variables are available inside the input iterable, but any changes made to the context itself from inside the iterable will not propagate out to the original context. Changes made to mutable data structures that are part of the context, for example dictionaries, will propagate out.
>
> This does not affect Python 3.6, because contextvars is not available.
2 changes: 1 addition & 1 deletion docs/exceptions.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: sub-navigation
order: 3
order: 4
title: Exceptions
---

Expand Down
2 changes: 2 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ In addition to being memory efficient, stream-unzip supports:
- ZIP files created by Java's ZipOutputStream that are larger than 4GiB. At the time of writing libarchive-based stream readers cannot read these without error.

- BZip2-compressed ZIPs.

- An async interface (that uses threads under the hood).
2 changes: 1 addition & 1 deletion docs/publish-a-release.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: sub-navigation
order: 5
order: 6
title: How to publish a release
---

Expand Down
45 changes: 45 additions & 0 deletions stream_unzip.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from functools import partial
from struct import Struct
import asyncio
import bz2
import zlib

Expand Down Expand Up @@ -462,6 +463,50 @@ def all():
for _ in unzipped_chunks:
raise UnfinishedIterationError()


async def async_stream_unzip(chunks, *args, **kwargs):

async def to_async_iterable(sync_iterable):
# asyncio.to_thread is not available until Python 3.9, and StopIteration doesn't get
# propagated by run_in_executor, so we use a sentinel to detect the end of the iterable
done = object()
it = iter(sync_iterable)

# contextvars are not available until Python 3.7
try:
import contextvars
except ImportError:
get_args = lambda: (next, it, done)
else:
get_args = lambda: (contextvars.copy_context().run, next, it, done)

while True:
value = await loop.run_in_executor(None, *get_args())
if value is done:
break
yield value

def to_sync_iterable(async_iterable):
# The built-in aiter and anext functions are not available until Python 3.10
async_it = async_iterable.__aiter__()
while True:
try:
value = asyncio.run_coroutine_threadsafe(async_it.__anext__(), loop).result()
except StopAsyncIteration:
break
yield value

# get_running_loop is preferred, but isn't available until Python 3.7
try:
loop = asyncio.get_running_loop()
except:
loop = asyncio.get_event_loop()
unzipped_chunks = stream_unzip(to_sync_iterable(chunks), *args, **kwargs)

async for name, size, chunks in to_async_iterable(unzipped_chunks):
yield name, size, to_async_iterable(chunks)


class UnzipError(Exception):
pass

Expand Down
106 changes: 106 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import asyncio
import itertools
import io
import platform
import unittest
import uuid
import random
import zipfile

from stream_unzip import (
async_stream_unzip,
stream_unzip,
UnfinishedIterationError,
TruncatedDataError,
Expand Down Expand Up @@ -746,3 +749,106 @@ def yield_input():
for name, size, chunks in stream_unzip(yield_input()):
for chunk in chunks:
pass

def test_async_stream_unzip(self):
async def async_bytes():
file = io.BytesIO()
with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('first.txt', b'-' * 100000)
zf.writestr('second.txt', b'*' * 100000)
zip_bytes = file.getvalue()

yield zip_bytes

results = []

async def test():
async for name, size, chunks in async_stream_unzip(async_bytes()):
b = b''
async for chunk in chunks:
b += chunk
results.append((name, size, b))

asyncio.get_event_loop().run_until_complete(test())
self.assertEqual(results, [
(b'first.txt', 100000, b'-' * 100000),
(b'second.txt', 100000, b'*' * 100000),
])

def test_async_exception_from_bytes_propagates(self):
async def async_bytes():
yield b'P'
raise Exception('From async bytes')

async def test():
await async_stream_unzip(async_bytes()).__aiter__().__anext__()

with self.assertRaisesRegex(Exception, 'From async bytes'):
asyncio.get_event_loop().run_until_complete(test())

def test_async_does_stream(self):
state = []

async def async_bytes():
file = io.BytesIO()
with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('first.txt', b'-' * 100000)
zf.writestr('second.txt', b'*' * 100000)
zip_bytes = file.getvalue()

chunk_size = 100
for i in range(0, len(zip_bytes), chunk_size):
state.append('in')
yield zip_bytes[i:i + chunk_size]
await asyncio.sleep(0)

async def test():
async for name, size, chunks in async_stream_unzip(async_bytes()):
async for chunk in chunks:
state.append('out')

asyncio.get_event_loop().run_until_complete(test())
self.assertEqual(state, ['in', 'out', 'in', 'out', 'in', 'out', 'out', 'in', 'out', 'in'])

@unittest.skipIf(
tuple(int(v) for v in platform.python_version().split('.')) < (3,7,0),
"contextvars are not supported before Python 3.7.0",
)
def test_copy_of_context_variable_available_in_iterable(self):
# Ideally the context would be identical in the iterables, because that's what a purely asyncio
# implementation of stream-zip would likely do

import contextvars

var = contextvars.ContextVar('test')
var.set('set-from-outer')

d = contextvars.ContextVar('d')
d.set({'key': 'original-value'})

inner = None

async def async_bytes():
nonlocal inner
inner = var.get()

var.set('set-from-inner')
d.get()['key'] = 'set-from-inner'

file = io.BytesIO()
with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('first.txt', b'-' * 100000)
zf.writestr('second.txt', b'*' * 100000)
zip_bytes = file.getvalue()

yield zip_bytes

async def test():
async for name, size, chunks in async_stream_unzip(async_bytes()):
async for chunk in chunks:
pass

asyncio.get_event_loop().run_until_complete(test())
self.assertEqual(var.get(), 'set-from-outer')
self.assertEqual(inner, 'set-from-outer')
self.assertEqual(d.get()['key'], 'set-from-inner')

0 comments on commit dc8902b

Please sign in to comment.