Skip to content

Commit

Permalink
cleanups and clarifications in workflow execution
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed May 21, 2024
1 parent 81d6c46 commit 7cf0411
Showing 1 changed file with 14 additions and 21 deletions.
35 changes: 14 additions & 21 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,15 @@ def copyLogToDB (we_id, workflowLogName):
log.error(repr(e))


def executeWorkflow(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResult]:
def executeWorkflow(lock, schedulerStat, we_id: str) -> None:
try:
log.info("executeWorkflow invoked")
return executeWorkflow_inner1(lock, schedulerStat, we_id)
except Exception as e:
log.error("Execution of workflow %s failed." % we_id)
log.error(repr(e))

def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResult]:
def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> None:
we_rec = restApiControl.getExecutionRecord(we_id)
if we_rec is None:
log.error("Workflow Execution record %s not found" % we_id)
Expand All @@ -355,7 +355,7 @@ def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> Tuple[str,Executi
log.error("WEID %s not scheduled for execution" % we_id)
raise KeyError("WEID %s not scheduled for execution" % we_id)

def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_record) -> Tuple[str,ExecutionResult]:
def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_record) -> None:
'''Process workflow which is already scheduled'''
wid = we_rec['WorkflowId']
completed = 1 # todo check
Expand All @@ -371,8 +371,14 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re
workflowLogName = tempDir+'/workflow.log'
execScript = Path(tempDir+'/workflow_execution_script.py')
# copy workflow source to tempDir
if not executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript,workflowLogName):
return we_id, ExecutionResult.Failed
try:
executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript)
except Exception as e:
log.error(repr(e))
# set execution code to failed ...yes or no?
restApiControl.setExecutionStatusFailed(we_id)
my_email.sendEmailAboutExecutionStatus(we_id)
return None
# execute
log.info("Executing we_id %s, tempdir %s" % (we_id, tempDir))
# update status
Expand Down Expand Up @@ -426,21 +432,20 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re
log.warning("Workflow execution %s Finished" % we_id)
restApiControl.setExecutionStatusFinished(we_id)
my_email.sendEmailAboutExecutionStatus(we_id)
return we_id, ExecutionResult.Finished
return we_id, ExecutionResult.Finished # XXX ??
elif completed == 1:
log.warning("Workflow execution %s Failed" % we_id)
restApiControl.setExecutionStatusFailed(we_id)
my_email.sendEmailAboutExecutionStatus(we_id)
return we_id, ExecutionResult.Failed
return we_id, ExecutionResult.Failed # XXX ??
elif completed == 2:
log.warning("Workflow execution %s could not be initialized due to lack of resources" % we_id)
restApiControl.setExecutionStatusPending(we_id, True)
else:
pass


def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript,workflowLogName) -> bool:
try:
def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript) -> None:
python_script_filename = workflow_record['modulename'] + ".py"

fc, fn = restApiControl.getBinaryFileByID(workflow_record['GridFSID'])
Expand Down Expand Up @@ -474,18 +479,6 @@ def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript,workflow
log.info("Copying executor script.")

shutil.copy(mupifDBModDir+'/workflow_execution_script.py', execScript)
except Exception as e:
log.error(repr(e))
# set execution code to failed ...yes or no?
restApiControl.setExecutionStatusFailed(we_id)
my_email.sendEmailAboutExecutionStatus(we_id)
try:
# XXX: there is no log yet, what is this support to copy??
copyLogToDB(we_id, workflowLogName)
except:
log.info("Copying log files was not successful")
return False
return True



Expand Down

0 comments on commit 7cf0411

Please sign in to comment.