From ef875e8bac762968ef52f5ce06c5b006f739a183 Mon Sep 17 00:00:00 2001 From: Andrei Fajardo Date: Mon, 30 Sep 2024 13:33:53 -0400 Subject: [PATCH] fix check for when Workflow is done or if step raised error --- .../llama_index/core/workflow/handler.py | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/llama-index-core/llama_index/core/workflow/handler.py b/llama-index-core/llama_index/core/workflow/handler.py index 6402a611b63df..4444d1fc8ffa5 100644 --- a/llama-index-core/llama_index/core/workflow/handler.py +++ b/llama-index-core/llama_index/core/workflow/handler.py @@ -61,35 +61,33 @@ async def run_step(self) -> Optional[Event]: # the chance to run (we won't actually sleep here). await asyncio.sleep(0) - # check if StopEvent is in holding - if isinstance(self.ctx._step_event_holding, StopEvent): - # See if we're done, or if a step raised any error - retval = None - we_done = False - exception_raised = None + # check if we're done, or if a step raised error + we_done = False + exception_raised = None + retval = None + for t in self.ctx._tasks: + # Check if we're done + if not t.done(): + continue + + we_done = True + e = t.exception() + if type(e) != WorkflowDone: + exception_raised = e + + if we_done: + # Remove any reference to the tasks for t in self.ctx._tasks: - # Check if we're done - if not t.done(): - continue - - we_done = True - e = t.exception() - if type(e) != WorkflowDone: - exception_raised = e - - if we_done: - # Remove any reference to the tasks - for t in self.ctx._tasks: - t.cancel() - await asyncio.sleep(0) - res = self.ctx.get_result() - - self.set_result(res) + t.cancel() + await asyncio.sleep(0) if exception_raised: + self.set_exception(exception_raised) # Mark as done raise exception_raised - else: + res = self.ctx.get_result() + self.set_result(res) + else: # continue with running next step # notify unblocked task that we're ready to accept next event async with self.ctx._step_condition: self.ctx._step_condition.notify() @@ -98,7 +96,6 @@ async def run_step(self) -> Optional[Event]: async with self.ctx._step_event_written: await self.ctx._step_event_written.wait() retval = self.ctx._step_event_holding - else: raise ValueError("Context is not set!")