An async stream processing microframework for Python
Slurry builds on the concepts of structured concurrency and memory channels, originating in Trio, and uses them to create a microframework for processing streaming data.
The basic building blocks of Slurry includes:
- Pipelines - An asynchronous context manager which encapsulates a stream process.
- Sections - The individual processing steps.
- Taps - Output channels for the processed stream.
- Extensions - A way to add more processing steps to an existing pipeline.
Slurry avoids using asynchronous generator functions, in favor of the pull-push programming style of memory channels. It can be thought of as an asynchronous version of itertools - on steroids!
Included in the basic library are a number of basic stream processing building blocks, like
Map
, Chain
, Merge
and Zip
, and it is easy to build your own!
Enough talk! Time to see what's up!
async with Pipeline.create(
Zip(produce_increasing_integers(1, max=3), produce_alphabet(0.9, max=3))
) as pipeline, pipeline.tap() as aiter:
results = [item async for item in aiter]
assert results == [(0,'a'), (1, 'b'), (2, 'c')]
The example producers (which are not part of the framework) could look like this:
async def produce_increasing_integers(interval, *, max=3):
for i in range(max):
yield i
if i == max-1:
break
await trio.sleep(interval)
async def produce_alphabet(interval, *, max=3):
for i, c in enumerate(string.ascii_lowercase):
yield c
if i == max - 1:
break
await trio.sleep(interval)
Further documentation is available on readthedocs. Check out the source code on github.
Still here? Wanna try it out yourself? Install from PyPI:
pip install slurry
Slurry is tested on Python 3.8 or greater and requires the Trio concurrency and IO library.
Slurry is licensed under the MIT license.