-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/make it async #80
Conversation
Wow! Thanks for this I am unsure about the duplication I have to admit. Instead, should there be a layer that allows calling all the existing sync code in import asyncio
from stream_unzip import stream_unzip
async def async_stream_unzip(chunks, *args, **kwargs):
async def to_async_iterable(sync_iterable):
# to_thread errors if StopIteration raised in it. So we use a sentinel to detect the end
done = object()
it = iter(sync_iterable)
while (value := await asyncio.to_thread(next, it, done)) is not done:
yield value
def to_sync_iterable(async_iterable):
done = object()
async_it = aiter(async_iterable)
while (value := asyncio.run_coroutine_threadsafe(anext(async_it, done), loop).result()) is not done:
yield value
loop = asyncio.get_running_loop()
unzipped_chunks = stream_unzip(to_sync_iterable(chunks), *args, **kwargs)
async for name, size, chunks in to_async_iterable(unzipped_chunks):
yield name, size, to_async_iterable(chunks) Also, I would probably lean to this being in the same module, And finally, running the tests it looks like they don't pass on Python 3.6 and 3.7: looks like since they use IsolatedAsyncioTestCase which I think was only introduced in Python 3.8. I think I'm a fan of continuing to support older versions of Python unless we really can't for some reason. Edit: simplified the above code a bit |
This simplifies the code a bit in the suggested async interface. This is inspired by the discussion at uktrade/stream-unzip#80, and makes this code more consistent with the code there. Whether this will continue to be an example in the docs or integrated into the codebase in some way, I'm not sure right now.
Ah just realised that |
Turning into a stream of consciousness a bit, but I think I am more and more a fan of not having the duplication, and instead having a thread-based layer in front of the existing sync code to avoid blocking the event loop. Unless the performance implications of such a layer are really tremendous, then I suspect that for the vast majority of cases this will be good enough. The main reason is maintainability I think: already this is a lot of quite specific knowledge required to make changes in this library. If this were to go in, to make almost any change a maintainer would need all of:
Of the above, 1. and 2. I suspect are inescapable really, but with a thread-based layer 3 and 4 are avoided for many classes of changes. And on 3: as much as I do like async in general, I can't escape the fact that, at least anecdotally, many Python developers are not able to reason about it as well as sync code. (Maybe I am being condescending here... not sure. But I wouldn't want my fear of being condescending to make me unrealistic) |
Hello there, yes I think you are right, normally duplicated code is not ideal, I just did it because originally I looked into your code and I identified quite quickly what needs changing for it to be async so I changed it. I have not really tried to optimize, just quickly rewrote the parts that enable async iterations. All in all, your proposed solution is much better, I agree with all the points that you have mentioned, and very much welcome the interest in incorporating async functionality. |
I'm curious - can I ask what this transformation was in more detail? And why it was decided to be done? Essentially this is to have a bit of knowledge of how stream-unzip is used in order to inform future changes. |
Sure, we have a task where we run anomaly detection on a lot of files, and for that we have to fetch them. Some files were in zip archives therefore these needed to be unzipped (your lib was/is being used to do this) . Recently, as there was a lot of IO involved, and since Python is knowingly bad at multithreading but for IO bound tasks async execution is fitting, we have decided to switch to asyncio for both log fetching and local file management. Also we implemented a “semi-multiservice” architecture where services can also be horizontally scaled, but at the heart, this async transformation was. By using this arch. we achieved a non-indispensable speed up compared to full sync processing. |
Ah good to know - thanks! |
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
This brings the async example from the documentation into the codebase proper. This is inspired by the PR and conversation at uktrade/stream-unzip#80. To avoid duplication, threads are used to use the sync code without blocking the event loop.
Inspired a bit by this PR and discussion, have just added async support (via threads) to stream-zip: uktrade/stream-zip#114 |
@marcell-szabo To check, would you be ok with converting this PR to use a threads-based layer like in this message, and like the threads-based layer that's now in stream-zip? No is of course all good, but just if it is no, I/we might do it ourselves |
This adds a function, async_stream_unzip, that's the async version of stream_unzip. It's inspired by the code and discussion in the PR at #80. However, to avoid duplication, we have a threads-based layer to call into the existing stream_unzip function, rather than have an almost identical copy but with async/await where needed in it. This is similar to how https://github.com/uktrade/stream-zip provides an async interface.
This adds a function, async_stream_unzip, that's the async version of stream_unzip. It's inspired by the code and discussion in the PR at #80. However, to avoid duplication, we have a threads-based layer to call into the existing stream_unzip function, rather than have an almost identical copy but with async/await where needed in it. This is similar to how https://github.com/uktrade/stream-zip provides an async interface.
This adds a function, async_stream_unzip, that's the async version of stream_unzip. It's inspired by the code and discussion in the PR at #80. However, to avoid duplication, we have a threads-based layer to call into the existing stream_unzip function, rather than have an almost identical copy but with async/await where needed in it. This is similar to how https://github.com/uktrade/stream-zip provides an async interface.
This adds a function, async_stream_unzip, that's the async version of stream_unzip. It's inspired by the code and discussion in the PR at #80. However, to avoid duplication, we have a threads-based layer to call into the existing stream_unzip function, rather than have an almost identical copy but with async/await where needed in it. This is similar to how https://github.com/uktrade/stream-zip provides an async interface.
As discussed, a threads-powered async interface has now been released in v0.0.91 |
Hello,
Recently I had to transform a codebase, that had a dependency on this library, to use asyncio. Therefore, I transformed the stream_unzip.py to accept an asynchronous generator as data source, and run the entire unzipping process asynchronously. I would like to contribute the modification back to this original library, so here is my modified code.
Note, that there has not been any modification that concerns the core unzipping/decrypting etc. functionality, only the generators and data passing has become asynchronous. To prove this I have rewritten the tests in test.py to check the new async behavior, the result can be seen here, all of them are passing.