Skip to content

Commit

Permalink
added ack and forgot in api deployemnt post call also (#897)
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e authored Dec 13, 2024
1 parent ac09190 commit 465dbaa
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,9 @@ def _set_result_acknowledge(execution: WorkflowExecution) -> None:
f"ExecutionID [{execution.id}] - Task {execution.task_id} acknowledged"
)

@staticmethod
@classmethod
def execute_workflow_async(
cls,
workflow_id: str,
execution_id: str,
hash_values_of_files: dict[str, FileHash],
Expand Down Expand Up @@ -445,7 +446,7 @@ def execute_workflow_async(
}
org_schema = UserContext.get_organization_identifier()
log_events_id = StateStore.get(Common.LOG_EVENTS_ID)
async_execution = WorkflowHelper.execute_bin.apply_async(
async_execution: AsyncResult = cls.execute_bin.apply_async(
args=[
org_schema, # schema_name
workflow_id, # workflow_id
Expand Down Expand Up @@ -480,6 +481,10 @@ def execute_workflow_async(
workflow_execution.status,
result=task_result,
)
# If task is complete, handle acknowledgment and forgetting the
if async_execution.ready():
async_execution.forget() # Remove the result from the result backend.
cls._set_result_acknowledge(workflow_execution)
return execution_response
except celery_exceptions.TimeoutError:
return ExecutionResponse(
Expand Down

0 comments on commit 465dbaa

Please sign in to comment.