From 1bf253f7138edce2933d8e157d59fb569a39c6e6 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Date: Thu, 12 Dec 2024 17:28:04 +0530 Subject: [PATCH] fix: WF execution created before WF async call (#883) * WF execution created before WF async call * PR comment addressed - fixed import and made WF exec mode as QUEUED instead of INSTANT --------- Co-authored-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> --- backend/api_v2/deployment_helper.py | 11 ++++++++--- .../workflow_manager/workflow_v2/workflow_helper.py | 8 ++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/backend/api_v2/deployment_helper.py b/backend/api_v2/deployment_helper.py index ceaef536d..3a17d8350 100644 --- a/backend/api_v2/deployment_helper.py +++ b/backend/api_v2/deployment_helper.py @@ -1,5 +1,4 @@ import logging -import uuid from typing import Any, Optional from urllib.parse import urlencode @@ -24,7 +23,8 @@ from workflow_manager.endpoint_v2.source import SourceConnector from workflow_manager.workflow_v2.dto import ExecutionResponse from workflow_manager.workflow_v2.enums import ExecutionStatus -from workflow_manager.workflow_v2.models.workflow import Workflow +from workflow_manager.workflow_v2.execution import WorkflowExecutionServiceHelper +from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution from workflow_manager.workflow_v2.workflow_helper import WorkflowHelper logger = logging.getLogger(__name__) @@ -152,7 +152,12 @@ def execute_workflow( """ workflow_id = api.workflow.id pipeline_id = api.id - execution_id = str(uuid.uuid4()) + workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution( + workflow_id=workflow_id, + pipeline_id=pipeline_id, + mode=WorkflowExecution.Mode.QUEUE, + ) + execution_id = workflow_execution.id hash_values_of_files = SourceConnector.add_input_file_to_api_storage( workflow_id=workflow_id, diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 8df5fc575..3a119e875 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -465,8 +465,12 @@ def execute_workflow_async( WorkflowExecutionServiceHelper.update_execution_err( execution_id, str(error) ) - logger.error(f"Errors while job enqueueing {str(error)}") - logger.error(f"Error {traceback.format_exc()}") + logger.error( + f"Error while enqueuing async job for WF '{workflow_id}', " + f"execution '{execution_id}': {str(error)}", + exc_info=True, + stack_info=True, + ) return ExecutionResponse( workflow_id, execution_id,