Skip to content

Commit

Permalink
Replace log-cluttering prints by log.info in scheduler (run with MUPI…
Browse files Browse the repository at this point in the history
…F_LOG_LEVEL=DEBUG to see everything during dev);
  • Loading branch information
eudoxos committed May 13, 2024
1 parent e72d7de commit 8c37dea
Showing 1 changed file with 33 additions and 40 deletions.
73 changes: 33 additions & 40 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import mupif as mp

import logging
logging.basicConfig(filename='scheduler.log',level=logging.DEBUG) # comment for production run

log=logging.getLogger('workflow-scheduler')
log.addHandler(restLogger.RestLogHandler())

# decrease verbosity here
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("urllib3").propagate = False

Expand All @@ -38,7 +41,7 @@
import schedulerConfig
authKey = schedulerConfig.authToken
except ImportError:
print("schedulerConfig import failed")
log.info("schedulerConfig import failed")
# WorkflowScheduler is a daemon, which
# will try to execute pending workflow executions in DB (those with status "Scheduled")
# internally uses a multiprocessing pool to handle workflow execution requests
Expand Down Expand Up @@ -134,17 +137,14 @@ def historyUpdateFailed(data, epoch):
fd = None
buf = None

log=logging.getLogger('workflow-scheduler')
log.addHandler(restLogger.RestLogHandler())

@Pyro5.api.expose
class SchedulerMonitor (object):
def __init__(self, ns, schedulerStat,lock):
self.ns = ns
self.stat = schedulerStat
self.lock = lock
def runServer(self):
print("SchedulerMonitor: runingServer")
log.info("SchedulerMonitor: runingServer")
return mp.pyroutil.runServer(ns=self.ns, appName="mupif.scheduler", app=self, metadata={"type:scheduler"})
def getStatistics(self):
with self.lock:
Expand Down Expand Up @@ -202,20 +202,20 @@ def procInit():
# flags: MAP_SHARED means other processes can share this mmap
# prot: PROT_WRITE means this process can write to this mmap
# buf = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
print("procInit called")
log.info("procInit called")


def procFinish(r):
print("procFinish called")
log.info("procFinish called")


def procError(r):
print("procError called:"+str(r))
log.info("procError called:"+str(r))


def updateStatRunning(lock, schedulerStat, we_id, wid):
with lock:
print("updateStatRunning called")
log.info("updateStatRunning called")
# print (schedulerStat)
# print ('------------------')

Expand Down Expand Up @@ -245,7 +245,7 @@ def updateStatRunning(lock, schedulerStat, we_id, wid):

def updateStatScheduled(lock, schedulerStat, numberOfPendingExecutions):
with lock:
print("updateStatScheduled called")
log.info("updateStatScheduled called")
#
schedulerStat['scheduledTasks'] = numberOfPendingExecutions
restApiControl.setStatScheduler(scheduledTasks = numberOfPendingExecutions)
Expand All @@ -256,7 +256,7 @@ def updateStatScheduled(lock, schedulerStat, numberOfPendingExecutions):
def updateStatFinished(lock, schedulerStat, retCode, we_id):
try:
with lock:
print("updateStatFinished called")
log.info("updateStatFinished called")

#stats_temp = restApiControl.getStatScheduler()
#restApiControl.setStatScheduler(load=int(100*int(stats_temp['runningTasks'])/poolsize))
Expand Down Expand Up @@ -293,7 +293,7 @@ def updateStatFinished(lock, schedulerStat, retCode, we_id):


def updateStatPersistent (schedulerStat):
# print("updateStatPersistent called")
# log.info("updateStatPersistent called")
if (False):
return
else:
Expand All @@ -305,8 +305,8 @@ def updateStatPersistent (schedulerStat):
jsonFile= open(schedulerStatFile, 'w')
json.dump(localStat, jsonFile)
jsonFile.close()
# print("Update:", stat)
# print("updateStatPersistent finished")
# log.info("Update:", stat)
# log.info("updateStatPersistent finished")


def copyLogToDB (we_id, workflowLogName):
Expand Down Expand Up @@ -336,17 +336,14 @@ def executeWorkflow(lock, schedulerStat, we_id):
wid = we_rec['WorkflowID']
workflow_record = restApiControl.getWorkflowRecordGeneral(wid=wid, version=workflowVersion)
if workflow_record is None:
print("Workflow document with wid %s, verison %s not found" % (wid, workflowVersion))
log.error("Workflow document with wid %s, verison %s not found" % (wid, workflowVersion))
raise KeyError("Workflow document with ID %s, version %s not found" % (wid, workflowVersion))
else:
print("Workflow document with wid %s, id %s, version %s found" % (wid, we_rec['_id'], workflowVersion))
log.info("Workflow document with wid %s, id %s, version %s found" % (wid, we_rec['_id'], workflowVersion))

# check if status is "Scheduled"
if we_rec['Status'] == 'Scheduled' or api_type == 'granta': # todo remove granta
completed = 1 # todo check
print("we_rec status is Scheduled, processing")
log.info("we_rec status is Scheduled, processing")
# execute the selected workflow
# take workflow source and run python interpreter on it in a temporary directory
Expand All @@ -355,7 +352,6 @@ def executeWorkflow(lock, schedulerStat, we_id):
with tempfile.TemporaryDirectory(dir=tempRoot, prefix='mupifDB') as tempDir:
# if (1): # uncomment this to keep temdDir
# tempDir = tempfile.mkdtemp(dir=tempRoot, prefix='mupifDB_')
print("temp dir %s created" % (tempDir,))
log.info("temp dir %s created" % (tempDir,))
workflowLogName = tempDir+'/workflow.log'
# copy workflow source to tempDir
Expand All @@ -368,31 +364,29 @@ def executeWorkflow(lock, schedulerStat, we_id):
f.close()

if fn.split('.')[-1] == 'py':
print("downloaded .py file..")
log.info("downloaded .py file..")
if fn == python_script_filename:
print("Filename check OK")
log.info("Filename check OK")
else:
print("Filename check FAILED")
log.info("Filename check FAILED")

elif fn.split('.')[-1] == 'zip':
print("downloaded .zip file, extracting..")
print(fn)
log.info("downloaded .zip file, extracting..")
log.info(fn)
zf = zipfile.ZipFile(tempDir + '/' + fn, mode='r')
filenames = zipfile.ZipFile.namelist(zf)
print("Zipped files:")
print(filenames)
log.info("Zipped files:")
log.info(filenames)
zf.extractall(path=tempDir)
if python_script_filename in filenames:
print("Filename check OK")
log.info("Filename check OK")
else:
print ("Filename check FAILED")
log.error("Filename check FAILED")

else:
log.error("Unsupported file extension")

print("Copying executor script.")
log.info("Copying executor script.")

execScript = Path(tempDir+'/workflow_execution_script.py')
shutil.copy(mupifDBModDir+'/workflow_execution_script.py', execScript)
Expand All @@ -409,7 +403,6 @@ def executeWorkflow(lock, schedulerStat, we_id):
return we_id, ExecutionResult.Failed

# execute
print("Executing we_id %s, tempdir %s" % (we_id, tempDir))
log.info("Executing we_id %s, tempdir %s" % (we_id, tempDir))
# update status
updateStatRunning(lock, schedulerStat, we_id, wid)
Expand All @@ -418,7 +411,7 @@ def executeWorkflow(lock, schedulerStat, we_id):
restApiControl.setExecutionAttemptsCount(we_id, int(we_rec['Attempts'])+1)
# uses the same python interpreter as the current process
cmd = [sys.executable, execScript, '-eid', str(we_id)]
# print(cmd)
# log.info(cmd)
with open(workflowLogName, 'w') as workflowLog:
ll = 10*'='
workflowLog.write(f'''
Expand All @@ -438,15 +431,15 @@ def executeWorkflow(lock, schedulerStat, we_id):
{ll} duration: {str((dt:=(t1-t0))-datetime.timedelta(microseconds=dt.microseconds))} {ll}
{ll} exit status of {cmd}: {completed} ({'ERROR' if completed!=0 else 'SUCCESS'}) {ll}''')

# print(tempDir)
# log.info(tempDir)
log.info('command:' + str(cmd) + ' Return Code:'+str(completed))

# store execution log
logID = None

p = Path(tempDir)
for it in p.iterdir():
print(it)
log.info(it)

try:
copyLogToDB(we_id, workflowLogName)
Expand Down Expand Up @@ -510,7 +503,7 @@ def checkWorkflowResources(wid, version):

def checkExecutionResources(eid):
try:
print("Checking execution resources")
log.info("Checking execution resources")
if api_type == 'granta':
return True # todo granta temporary
execution = restApiControl.getExecutionRecord(eid)
Expand Down Expand Up @@ -602,7 +595,7 @@ def checkExecutionResources(eid):
scheduled_executions = []

for wed in scheduled_executions:
print(str(wed['_id']) + " found as Scheduled")
log.info(str(wed['_id']) + " found as Scheduled")
# add the correspoding weid to the pool, change status to scheduled
weid = wed['_id']
# result1 = pool.apply_async(test)
Expand Down Expand Up @@ -637,7 +630,7 @@ def checkExecutionResources(eid):

updateStatScheduled(statusLock, schedulerStat, len(pending_executions)) # update status
for wed in pending_executions:
print(str(wed['_id']) + " found as pending")
log.info(str(wed['_id']) + " found as pending")
weid = wed['_id']

# check number of attempts for execution
Expand All @@ -660,9 +653,9 @@ def checkExecutionResources(eid):
log.error(repr(e))

if not res:
print("Could not update execution status")
log.info("Could not update execution status")
else:
print("Updated status of execution")
log.info("Updated status of execution")

result = pool.apply_async(executeWorkflow, args=(statusLock, schedulerStat, weid), callback=procFinish, error_callback=procError)
# log.info(result.get())
Expand All @@ -688,15 +681,15 @@ def checkExecutionResources(eid):
try:
#stats = restApiControl.getStatScheduler()
# bp HUHUHUHUHUHUUH
print(str(lt.tm_mday)+"."+str(lt.tm_mon)+"."+str(lt.tm_year)+" "+str(lt.tm_hour)+":"+str(lt.tm_min)+":"+str(lt.tm_sec)+" Scheduled/Running/Load:" +
log.info(str(lt.tm_mday)+"."+str(lt.tm_mon)+"."+str(lt.tm_year)+" "+str(lt.tm_hour)+":"+str(lt.tm_min)+":"+str(lt.tm_sec)+" Scheduled/Running/Load:" +
str(schedulerStat['scheduledTasks'])+"/"+str(schedulerStat['runningTasks'])+"/"+str(schedulerStat['load']))
except Exception as e:
log.error(repr(e))

# lazy update of persistent statistics, done in main thread thus thread safe
with statusLock:
updateStatPersistent(schedulerStat)
print("waiting..")
log.info("waiting..")
time.sleep(20)
except Exception as err:
log.info("Error: " + repr(err))
Expand Down

0 comments on commit 8c37dea

Please sign in to comment.