Skip to content

Commit

Permalink
fix: WF execution created before WF async call (#883)
Browse files Browse the repository at this point in the history
* WF execution created before WF async call

* PR comment addressed - fixed import and made WF exec mode as QUEUED instead of INSTANT

---------

Co-authored-by: ali <[email protected]>
  • Loading branch information
chandrasekharan-zipstack and muhammad-ali-e authored Dec 12, 2024
1 parent 35a32d5 commit 1bf253f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
11 changes: 8 additions & 3 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import uuid
from typing import Any, Optional
from urllib.parse import urlencode

Expand All @@ -24,7 +23,8 @@
from workflow_manager.endpoint_v2.source import SourceConnector
from workflow_manager.workflow_v2.dto import ExecutionResponse
from workflow_manager.workflow_v2.enums import ExecutionStatus
from workflow_manager.workflow_v2.models.workflow import Workflow
from workflow_manager.workflow_v2.execution import WorkflowExecutionServiceHelper
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution
from workflow_manager.workflow_v2.workflow_helper import WorkflowHelper

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -152,7 +152,12 @@ def execute_workflow(
"""
workflow_id = api.workflow.id
pipeline_id = api.id
execution_id = str(uuid.uuid4())
workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution(
workflow_id=workflow_id,
pipeline_id=pipeline_id,
mode=WorkflowExecution.Mode.QUEUE,
)
execution_id = workflow_execution.id

hash_values_of_files = SourceConnector.add_input_file_to_api_storage(
workflow_id=workflow_id,
Expand Down
8 changes: 6 additions & 2 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,12 @@ def execute_workflow_async(
WorkflowExecutionServiceHelper.update_execution_err(
execution_id, str(error)
)
logger.error(f"Errors while job enqueueing {str(error)}")
logger.error(f"Error {traceback.format_exc()}")
logger.error(
f"Error while enqueuing async job for WF '{workflow_id}', "
f"execution '{execution_id}': {str(error)}",
exc_info=True,
stack_info=True,
)
return ExecutionResponse(
workflow_id,
execution_id,
Expand Down

0 comments on commit 1bf253f

Please sign in to comment.