Skip to content

Commit

Permalink
Borek's change to scheduler, isolated
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed May 28, 2024
1 parent 40f0b6b commit e652061
Showing 1 changed file with 45 additions and 27 deletions.
72 changes: 45 additions & 27 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
# internally uses a multiprocessing pool to handle workflow execution requests
#

# when workflow resources unavailable, we decrease execution priority (by increasing number of attempts)
# this constant defines the lowest priority
MAX_ATTEMPT_LEVEL = 60
# icrement of execution treshold count used to set priority for pooled execution requests
EXEC_THRESHOLD_INCREMENT = 6


# WEID Status
# Created -> Pending -> Scheduled -> Running -> Finished | Failed
Expand All @@ -65,6 +71,7 @@
class ExecutionResult(enum.Enum):
Finished = 1 # successful
Failed = 2
ResourcesUnavailable = 3


class index(enum.IntEnum):
Expand Down Expand Up @@ -296,6 +303,13 @@ def updateStatFinished(lock, schedulerStat, retCode, we_id):
log.error(repr(e))


def updateStatNoResources(lock, schedulerStat, completed, we_id):
# called when execution attempt fails due to unavailable resources
with lock:
schedulerStat['scheduledTasks'] = schedulerStat['scheduledTasks']+1
schedulerStat['runningTasks']=schedulerStat['runningTasks']-1


def updateStatPersistent (schedulerStat):
# log.info("updateStatPersistent called")
if (False):
Expand Down Expand Up @@ -329,8 +343,16 @@ def copyLogToDB (we_id, workflowLogName):

def executeWorkflow(lock, schedulerStat, we_id: str) -> None:
try:
log.info("executeWorkflow invoked")
return executeWorkflow_inner1(lock, schedulerStat, we_id)
# first make a soft resource check
if checkExecutionResources(we_id):
log.info("executeWorkflow invoked")
executeWorkflow_inner1(lock, schedulerStat, we_id)
else:
log.info("WEID %s cannot be scheduled due to unavailable resources" % weid)
if api_type!='granta':
we_rec = restApiControl.getExecutionRecord(weid)
if int(we_rec['Attempts']) < MAX_ATTEMPT_LEVEL:
restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1)
except Exception as e:
log.exception("Execution of workflow %s failed." % we_id)

Expand Down Expand Up @@ -388,7 +410,8 @@ def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_rec
updateStatRunning(lock, schedulerStat, we_id, wid)
#runningJobs[we_id]=wid # for runtime monitoring
restApiControl.setExecutionStatusRunning(we_id)
restApiControl.setExecutionAttemptsCount(we_id, int(we_rec['Attempts'])+1)
if int(we_rec['Attempts'] < MAX_ATTEMPT_LEVEL):
restApiControl.setExecutionAttemptsCount(we_id, int(we_rec['Attempts'])+1)
# uses the same python interpreter as the current process
cmd = [sys.executable, execScript, '-eid', str(we_id)]
# log.info(cmd)
Expand Down Expand Up @@ -443,6 +466,7 @@ def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_rec
return we_id, ExecutionResult.Failed # XXX ??
elif completed == 2:
log.warning("Workflow execution %s could not be initialized due to lack of resources" % we_id)
updateStatNoResources(lock, schedulerStat, complete, we_id)
restApiControl.setExecutionStatusPending(we_id, True)
else:
pass
Expand Down Expand Up @@ -608,30 +632,20 @@ def main():
weid = wed['_id']
# result1 = pool.apply_async(test)
# log.info(result1.get())
if checkExecutionResources(weid):
result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat,weid), callback=procFinish, error_callback=procError)
log.info(result)
log.info("WEID %s added to the execution pool" % weid)
else:
log.info("WEID %s cannot be scheduled due to unavailable resources" % weid)
try:
we_rec = restApiControl.getExecutionRecord(weid)
restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1)
except Exception as e:
log.error(repr(e))
result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat,weid), callback=procFinish, error_callback=procError)
log.info(result)
log.info("WEID %s added to the execution pool" % weid)

log.info("Done\n")

log.info("Entering main loop to check for Pending executions")
execThreshold = 0 # start with highest priority executions (those with no previous execution attemts)
# add new execution (Pending)
while stopFlag is not True:
# retrieve weids with status "Scheduled" from DB
try:
pending_executions = restApiControl.getPendingExecutions(num_limit=poolsize*10)
# if schedulerStat['scheduledTasks'] < 200:
# pending_executions = restApiControl.getPendingExecutions(num_limit=poolsize*4)
# else:
# pending_executions = []

pending_executions = restApiControl.getPendingExecutions(num_limit=poolsize*10) # XXX: executionCountThreshold=execThreshold
except Exception as e:
log.error(repr(e))
pending_executions = []
Expand Down Expand Up @@ -668,14 +682,14 @@ def main():
result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat, weid), callback=procFinish, error_callback=procError)
# log.info(result.get())
log.info("WEID %s added to the execution pool" % weid)
else:
log.info("WEID %s cannot be scheduled due to unavailable resources" % weid)
if api_type != 'granta':
try:
we_rec = restApiControl.getExecutionRecord(weid)
restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1)
except Exception as e:
log.error(repr(e))
#else:
# log.info("WEID %s cannot be scheduled due to unavailable resources" % weid)
# if api_type != 'granta':
# try:
# we_rec = restApiControl.getExecutionRecord(weid)
# restApiControl.setExecutionAttemptsCount(weid, int(we_rec['Attempts']) + 1)
# except Exception as e:
# log.error(repr(e))

# ok, no more jobs to schedule for now, wait
l = int(100*int(schedulerStat['runningTasks'])/poolsize)
Expand All @@ -697,6 +711,10 @@ def main():
# lazy update of persistent statistics, done in main thread thus thread safe
with statusLock:
updateStatPersistent(schedulerStat)
execThreshold = execThreshold + EXEC_THRESHOLD_INCREMENT
if execThreshold > MAX_ATTEMPT_LEVEL:
# reset threshold to restart with highest priority executions
execThreshold = 0
log.info("waiting..")
time.sleep(LOOP_SLEEP_SEC)
except Exception as err:
Expand Down

0 comments on commit e652061

Please sign in to comment.