From 7cf0411b30da92eaa362174608e98353d32497f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20=C5=A0milauer?= Date: Tue, 21 May 2024 11:34:24 +0200 Subject: [PATCH] cleanups and clarifications in workflow execution --- mupifDB/workflowscheduler.py | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/mupifDB/workflowscheduler.py b/mupifDB/workflowscheduler.py index dfe8061..b2c1d6e 100644 --- a/mupifDB/workflowscheduler.py +++ b/mupifDB/workflowscheduler.py @@ -323,7 +323,7 @@ def copyLogToDB (we_id, workflowLogName): log.error(repr(e)) -def executeWorkflow(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResult]: +def executeWorkflow(lock, schedulerStat, we_id: str) -> None: try: log.info("executeWorkflow invoked") return executeWorkflow_inner1(lock, schedulerStat, we_id) @@ -331,7 +331,7 @@ def executeWorkflow(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResul log.error("Execution of workflow %s failed." % we_id) log.error(repr(e)) -def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> Tuple[str,ExecutionResult]: +def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> None: we_rec = restApiControl.getExecutionRecord(we_id) if we_rec is None: log.error("Workflow Execution record %s not found" % we_id) @@ -355,7 +355,7 @@ def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> Tuple[str,Executi log.error("WEID %s not scheduled for execution" % we_id) raise KeyError("WEID %s not scheduled for execution" % we_id) -def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_record) -> Tuple[str,ExecutionResult]: +def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_record) -> None: '''Process workflow which is already scheduled''' wid = we_rec['WorkflowId'] completed = 1 # todo check @@ -371,8 +371,14 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re workflowLogName = tempDir+'/workflow.log' execScript = Path(tempDir+'/workflow_execution_script.py') # copy workflow source to tempDir - if not executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript,workflowLogName): - return we_id, ExecutionResult.Failed + try: + executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript) + except Exception as e: + log.error(repr(e)) + # set execution code to failed ...yes or no? + restApiControl.setExecutionStatusFailed(we_id) + my_email.sendEmailAboutExecutionStatus(we_id) + return None # execute log.info("Executing we_id %s, tempdir %s" % (we_id, tempDir)) # update status @@ -426,12 +432,12 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re log.warning("Workflow execution %s Finished" % we_id) restApiControl.setExecutionStatusFinished(we_id) my_email.sendEmailAboutExecutionStatus(we_id) - return we_id, ExecutionResult.Finished + return we_id, ExecutionResult.Finished # XXX ?? elif completed == 1: log.warning("Workflow execution %s Failed" % we_id) restApiControl.setExecutionStatusFailed(we_id) my_email.sendEmailAboutExecutionStatus(we_id) - return we_id, ExecutionResult.Failed + return we_id, ExecutionResult.Failed # XXX ?? elif completed == 2: log.warning("Workflow execution %s could not be initialized due to lack of resources" % we_id) restApiControl.setExecutionStatusPending(we_id, True) @@ -439,8 +445,7 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re pass -def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript,workflowLogName) -> bool: - try: +def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript) -> None: python_script_filename = workflow_record['modulename'] + ".py" fc, fn = restApiControl.getBinaryFileByID(workflow_record['GridFSID']) @@ -474,18 +479,6 @@ def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript,workflow log.info("Copying executor script.") shutil.copy(mupifDBModDir+'/workflow_execution_script.py', execScript) - except Exception as e: - log.error(repr(e)) - # set execution code to failed ...yes or no? - restApiControl.setExecutionStatusFailed(we_id) - my_email.sendEmailAboutExecutionStatus(we_id) - try: - # XXX: there is no log yet, what is this support to copy?? - copyLogToDB(we_id, workflowLogName) - except: - log.info("Copying log files was not successful") - return False - return True