From e652061b640f50fe1ba44638f76c120017901222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20=C5=A0milauer?= Date: Tue, 28 May 2024 11:48:20 +0200 Subject: [PATCH] Borek's change to scheduler, isolated --- mupifDB/workflowscheduler.py | 72 ++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/mupifDB/workflowscheduler.py b/mupifDB/workflowscheduler.py index 16114a4..c48449a 100644 --- a/mupifDB/workflowscheduler.py +++ b/mupifDB/workflowscheduler.py @@ -51,6 +51,12 @@ # internally uses a multiprocessing pool to handle workflow execution requests # +# when workflow resources unavailable, we decrease execution priority (by increasing number of attempts) +# this constant defines the lowest priority +MAX_ATTEMPT_LEVEL = 60 +# icrement of execution treshold count used to set priority for pooled execution requests +EXEC_THRESHOLD_INCREMENT = 6 + # WEID Status # Created -> Pending -> Scheduled -> Running -> Finished | Failed @@ -65,6 +71,7 @@ class ExecutionResult(enum.Enum): Finished = 1 # successful Failed = 2 + ResourcesUnavailable = 3 class index(enum.IntEnum): @@ -296,6 +303,13 @@ def updateStatFinished(lock, schedulerStat, retCode, we_id): log.error(repr(e)) +def updateStatNoResources(lock, schedulerStat, completed, we_id): + # called when execution attempt fails due to unavailable resources + with lock: + schedulerStat['scheduledTasks'] = schedulerStat['scheduledTasks']+1 + schedulerStat['runningTasks']=schedulerStat['runningTasks']-1 + + def updateStatPersistent (schedulerStat): # log.info("updateStatPersistent called") if (False): @@ -329,8 +343,16 @@ def copyLogToDB (we_id, workflowLogName): def executeWorkflow(lock, schedulerStat, we_id: str) -> None: try: - log.info("executeWorkflow invoked") - return executeWorkflow_inner1(lock, schedulerStat, we_id) + # first make a soft resource check + if checkExecutionResources(we_id): + log.info("executeWorkflow invoked") + executeWorkflow_inner1(lock, schedulerStat, we_id) + else: + log.info("WEID %s cannot be scheduled due to unavailable resources" % weid) + if api_type!='granta': + we_rec = restApiControl.getExecutionRecord(weid) + if int(we_rec['Attempts']) < MAX_ATTEMPT_LEVEL: + restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1) except Exception as e: log.exception("Execution of workflow %s failed." % we_id) @@ -388,7 +410,8 @@ def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_rec updateStatRunning(lock, schedulerStat, we_id, wid) #runningJobs[we_id]=wid # for runtime monitoring restApiControl.setExecutionStatusRunning(we_id) - restApiControl.setExecutionAttemptsCount(we_id, int(we_rec['Attempts'])+1) + if int(we_rec['Attempts'] < MAX_ATTEMPT_LEVEL): + restApiControl.setExecutionAttemptsCount(we_id, int(we_rec['Attempts'])+1) # uses the same python interpreter as the current process cmd = [sys.executable, execScript, '-eid', str(we_id)] # log.info(cmd) @@ -443,6 +466,7 @@ def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_rec return we_id, ExecutionResult.Failed # XXX ?? elif completed == 2: log.warning("Workflow execution %s could not be initialized due to lack of resources" % we_id) + updateStatNoResources(lock, schedulerStat, complete, we_id) restApiControl.setExecutionStatusPending(we_id, True) else: pass @@ -608,30 +632,20 @@ def main(): weid = wed['_id'] # result1 = pool.apply_async(test) # log.info(result1.get()) - if checkExecutionResources(weid): - result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat,weid), callback=procFinish, error_callback=procError) - log.info(result) - log.info("WEID %s added to the execution pool" % weid) - else: - log.info("WEID %s cannot be scheduled due to unavailable resources" % weid) - try: - we_rec = restApiControl.getExecutionRecord(weid) - restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1) - except Exception as e: - log.error(repr(e)) + result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat,weid), callback=procFinish, error_callback=procError) + log.info(result) + log.info("WEID %s added to the execution pool" % weid) log.info("Done\n") log.info("Entering main loop to check for Pending executions") + execThreshold = 0 # start with highest priority executions (those with no previous execution attemts) # add new execution (Pending) while stopFlag is not True: # retrieve weids with status "Scheduled" from DB try: - pending_executions = restApiControl.getPendingExecutions(num_limit=poolsize*10) - # if schedulerStat['scheduledTasks'] < 200: - # pending_executions = restApiControl.getPendingExecutions(num_limit=poolsize*4) - # else: - # pending_executions = [] + + pending_executions = restApiControl.getPendingExecutions(num_limit=poolsize*10) # XXX: executionCountThreshold=execThreshold except Exception as e: log.error(repr(e)) pending_executions = [] @@ -668,14 +682,14 @@ def main(): result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat, weid), callback=procFinish, error_callback=procError) # log.info(result.get()) log.info("WEID %s added to the execution pool" % weid) - else: - log.info("WEID %s cannot be scheduled due to unavailable resources" % weid) - if api_type != 'granta': - try: - we_rec = restApiControl.getExecutionRecord(weid) - restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1) - except Exception as e: - log.error(repr(e)) + #else: + # log.info("WEID %s cannot be scheduled due to unavailable resources" % weid) + # if api_type != 'granta': + # try: + # we_rec = restApiControl.getExecutionRecord(weid) + # restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1) + # except Exception as e: + # log.error(repr(e)) # ok, no more jobs to schedule for now, wait l = int(100*int(schedulerStat['runningTasks'])/poolsize) @@ -697,6 +711,10 @@ def main(): # lazy update of persistent statistics, done in main thread thus thread safe with statusLock: updateStatPersistent(schedulerStat) + execThreshold = execThreshold + EXEC_THRESHOLD_INCREMENT + if execThreshold > MAX_ATTEMPT_LEVEL: + # reset threshold to restart with highest priority executions + execThreshold = 0 log.info("waiting..") time.sleep(LOOP_SLEEP_SEC) except Exception as err: