Skip to content

Commit

Permalink
executeworkflow: move copy inputs into a separate function (no changes)
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed May 21, 2024
1 parent c07d60a commit e433402
Showing 1 changed file with 52 additions and 46 deletions.
98 changes: 52 additions & 46 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,54 +368,10 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re
# tempDir = tempfile.mkdtemp(dir=tempRoot, prefix='mupifDB_')
log.info("temp dir %s created" % (tempDir,))
workflowLogName = tempDir+'/workflow.log'
execScript = Path(tempDir+'/workflow_execution_script.py')
# copy workflow source to tempDir
try:
python_script_filename = workflow_record['modulename'] + ".py"

fc, fn = restApiControl.getBinaryFileByID(workflow_record['GridFSID'])
with open(tempDir + '/' + fn, "wb") as f:
f.write(fc)
f.close()

if fn.split('.')[-1] == 'py':
log.info("downloaded .py file..")
if fn == python_script_filename:
log.info("Filename check OK")
else:
log.info("Filename check FAILED")

elif fn.split('.')[-1] == 'zip':
log.info("downloaded .zip file, extracting..")
log.info(fn)
zf = zipfile.ZipFile(tempDir + '/' + fn, mode='r')
filenames = zipfile.ZipFile.namelist(zf)
log.info("Zipped files:")
log.info(filenames)
zf.extractall(path=tempDir)
if python_script_filename in filenames:
log.info("Filename check OK")
else:
log.error("Filename check FAILED")

else:
log.error("Unsupported file extension")

log.info("Copying executor script.")

execScript = Path(tempDir+'/workflow_execution_script.py')
shutil.copy(mupifDBModDir+'/workflow_execution_script.py', execScript)
except Exception as e:
log.error(repr(e))
# set execution code to failed ...yes or no?
restApiControl.setExecutionStatusFailed(we_id)
my_email.sendEmailAboutExecutionStatus(we_id)
try:
copyLogToDB(we_id, workflowLogName)
except:
log.info("Copying log files was not successful")

if not executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript):
return we_id, ExecutionResult.Failed

# execute
log.info("Executing we_id %s, tempdir %s" % (we_id, tempDir))
# update status
Expand Down Expand Up @@ -482,6 +438,56 @@ def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_re
pass


def executeWorkflow_copyInputs(we_id,workflow_record,tempDir,execScript) -> bool:
try:
python_script_filename = workflow_record['modulename'] + ".py"

fc, fn = restApiControl.getBinaryFileByID(workflow_record['GridFSID'])
with open(tempDir + '/' + fn, "wb") as f:
f.write(fc)
f.close()

if fn.split('.')[-1] == 'py':
log.info("downloaded .py file..")
if fn == python_script_filename:
log.info("Filename check OK")
else:
log.info("Filename check FAILED")

elif fn.split('.')[-1] == 'zip':
log.info("downloaded .zip file, extracting..")
log.info(fn)
zf = zipfile.ZipFile(tempDir + '/' + fn, mode='r')
filenames = zipfile.ZipFile.namelist(zf)
log.info("Zipped files:")
log.info(filenames)
zf.extractall(path=tempDir)
if python_script_filename in filenames:
log.info("Filename check OK")
else:
log.error("Filename check FAILED")

else:
log.error("Unsupported file extension")

log.info("Copying executor script.")

shutil.copy(mupifDBModDir+'/workflow_execution_script.py', execScript)
except Exception as e:
log.error(repr(e))
# set execution code to failed ...yes or no?
restApiControl.setExecutionStatusFailed(we_id)
my_email.sendEmailAboutExecutionStatus(we_id)
try:
copyLogToDB(we_id, workflowLogName)
except:
log.info("Copying log files was not successful")
return False
return True




def stop(var_pool):
try:
log.info("Stopping the scheduler, waiting for workers to terminate")
Expand Down

0 comments on commit e433402

Please sign in to comment.