v0.2.0
v0.2.0 is out now, with the main improvement being the addition of streaming support!
Now, if you have a workflow that writes to the event stream like:
class ProgressEvent(Event):
progress: str
# create a dummy workflow
class MyWorkflow(Workflow):
@step()
async def run_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
# Your workflow logic here
arg1 = str(ev.get("arg1", ""))
result = arg1 + "_result"
# stream events as steps run
ctx.write_event_to_stream(
ProgressEvent(progress="I am doing something!")
)
return StopEvent(result=result)
You can stream the events using the client
# create a session
session = client.create_session()
# kick off run
task_id = session.run_nowait("streaming_workflow", arg1="hello_world")
# stream events -- the will yield a dict representing each event
for event in session.get_task_result_stream(task_id):
print(event)
# get final result
result = session.get_task_result(task_id)
print(result)
# prints 'hello_world_result'