Skip to content

Commit

Permalink
scheduler: separate out one more level (minimal diff)
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed May 21, 2024
1 parent 5f4409f commit c07d60a
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ctypes
import json
import jsonpickle
from typing import Tuple

import restApiControl
import restLogger
Expand Down Expand Up @@ -321,15 +322,15 @@ def copyLogToDB (we_id, workflowLogName):
log.error(repr(e))


def executeWorkflow(lock, schedulerStat, we_id) -> None:
def executeWorkflow(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResult]:
try:
log.info("executeWorkflow invoked")
executeWorkflow_inner(lock, schedulerStat, we_id)
return executeWorkflow_inner1(lock, schedulerStat, we_id)
except Exception as e:
log.error("Execution of workflow %s failed." % we_id)
log.error(repr(e))

def executeWorkflow_inner(lock, schedulerStat, we_id) -> None:
def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResult]:
we_rec = restApiControl.getExecutionRecord(we_id)
if we_rec is None:
log.error("Workflow Execution record %s not found" % we_id)
Expand All @@ -348,6 +349,14 @@ def executeWorkflow_inner(lock, schedulerStat, we_id) -> None:

# check if status is "Scheduled"
if we_rec['Status'] == 'Scheduled' or api_type == 'granta': # todo remove granta
return executeWorkflow_inner2(lock,schedulerStat,we_id,we_rec,workflow_record)
else:
log.error("WEID %s not scheduled for execution" % we_id)
raise KeyError("WEID %s not scheduled for execution" % we_id)

def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_record) -> Tuple[str,ExecutionResult]:
'''Process workflow which is already scheduled'''
wid = we_rec['WorkflowId']
completed = 1 # todo check
log.info("we_rec status is Scheduled, processing")
# execute the selected workflow
Expand Down Expand Up @@ -472,9 +481,6 @@ def executeWorkflow_inner(lock, schedulerStat, we_id) -> None:
else:
pass

else:
log.error("WEID %s not scheduled for execution" % we_id)
raise KeyError("WEID %s not scheduled for execution" % we_id)

def stop(var_pool):
try:
Expand Down

0 comments on commit c07d60a

Please sign in to comment.