- Dataflow based functional syntax.
- Implicitly parallelism for both async and non-async functions.
- Composable for both flows and tasks.
- Extensible with middlewares.
pip install aiosaber
- check tests for more examples.
from aiosaber import *
@task
def add(self, num):
for i in range(100000):
num += 1
return num
@task
async def multiply(num1, num2):
return num1 * num2
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply | view
num_ch = Channel.values(*list(range(100)))
f = my_flow(num_ch)
asyncio.run(f.start())
from aiosaber import *
class NameBuilder(BaseBuilder):
def __call__(self, com, *args, **kwargs):
super().__call__(com, *args, **kwargs)
com.context['name'] = type(com).__name__ + str(id(com))
class ClientProvider(BaseExecutor):
async def __call__(self, com, **kwargs):
if not context.context.get('client'):
context.context['client'] = 'client'
return await super().__call__(com, **kwargs)
class Filter(BaseHandler):
async def __call__(self, com, get, put, **kwargs):
async def filter_put(data):
if data is END or data > 3:
await put(data)
return await super().__call__(com, get, filter_put, **kwargs)
@task
async def add(self, num):
print(self.context['name'])
print(context.context['client'])
return num + 1
@flow
def myflow(num_ch):
return num_ch | add | view
context.context.update({
'builders': [NameBuilder],
'executors': [ClientProvider],
'handlers': [Filter]
})
f = myflow(Channel.values(1, 2, 3, 4, 5))
context.context.clear()
asyncio.run(f.start())