Skip to content

Commit

Permalink
Merge branch 'schduler-fix-2'
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed May 28, 2024
2 parents ced10cd + eba6f17 commit 3abf1aa
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 30 deletions.
7 changes: 4 additions & 3 deletions mupifDB/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

import table_structures


client = MongoClient("mongodb://localhost:27017")
client = MongoClient("mongodb://localhost:"+os.environ.get('MUPIFDB_MONGODB_PORT','27107'))
db = client.MuPIF

tags_metadata = [
Expand Down Expand Up @@ -684,4 +683,6 @@ def edm_find(db: str, type: str, data: M_FindParams):

if __name__ == '__main__':
import uvicorn
uvicorn.run('main:app', host='0.0.0.0', port=8080, reload=True)
host=os.environ.get('MUPIFDB_RESTAPI_HOST','0.0.0.0')
port=int(os.environ.get('MUPIFDB_RESTAPI_PORT','8005'))
uvicorn.run('main:app', host=host, port=port, reload=True)
6 changes: 6 additions & 0 deletions mupifDB/restApiControl.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def rDelete(*, url, headers=None, auth=None, timeout=100):

RESTserverMuPIF = RESTserver

def setRESTserver(r):
'Used in tests to set RESTserver after import'
global RESTserver
global RESTserverMuPIF
RESTserver=RESTserverMuPIF=r+'/'

granta_credentials = {'username': '', 'password': ''}

bearer_token = None
Expand Down
145 changes: 145 additions & 0 deletions mupifDB/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import pytest
import sys
import time
import socket
from xprocess import ProcessStarter
from rich.pretty import pprint
from rich import print_json

def anyPort():
s=socket.socket()
s.bind(('',0))
return s.getsockname()[1]

PORTS={'nameserver':anyPort(),'mongodb':anyPort(),'restApi':anyPort()}
PORTS=dict([(k,str(v)) for k,v in PORTS.items()])
pprint(PORTS)

MUPIFDB_REST_SERVER='http://localhost:'+PORTS['restApi']
#import os
#os.environ['MUPIFDB_REST_SERVER']=MUPIFDB_REST_SERVER
import mupifDB
from mupifDB import restApiControl
restApiControl.setRESTserver(MUPIFDB_REST_SERVER)

#import mupif as mp
#mupifExamplesDir=mp.__path__[0]+'/examples/'

sys.path.append(mupifDB.__path__[0]+'/..')
import workflows.workflowdemo01 as wfdemo01

@pytest.fixture
def nameserver(xprocess):
class Starter(ProcessStarter):
env = { 'PYTHONUNBUFFERED':'1' }
args = [ sys.executable, '-m', 'Pyro5.nameserver', '--port', PORTS['nameserver']]
timeout = 5
pattern = 'NS running on '
terminate_on_interrupt = True
xprocess.ensure("nameserver",Starter)
yield
xprocess.getinfo("nameserver").terminate()

@pytest.fixture
def mongodb(xprocess):
class Starter(ProcessStarter):
args = [ '/usr/bin/mongod', '--port', PORTS['mongodb'], '--noauth', '--dbpath=./', '--logpath=/dev/stdout', '--logappend' ]
timeout = 5
max_read_lines = 100
pattern = '"ctx":"listener","msg":"Waiting for connections"'
terminate_on_interrupt = True
xprocess.ensure("mongodb",Starter)
yield
xprocess.getinfo("mongodb").terminate()

@pytest.fixture
def restApi(xprocess,mongodb,nameserver):
class Starter(ProcessStarter):
env = {
'PYTHONUNBUFFERED':'1',
'MUPIF_NS':'localhost:'+PORTS['nameserver'],
'MUPIF_LOG_LEVEL':'DEBUG',
'MUPIFDB_MONGODB_PORT':PORTS['mongodb'],
'MUPIFDB_RESTAPI_HOST':'localhost',
'MUPIFDB_RESTAPI_PORT':PORTS['restApi'],
'MUPIFDB_REST_SERVER':MUPIFDB_REST_SERVER,
}
popen_kwargs = { 'cwd': mupifDB.__path__[0]+'/api' }
args = [ sys.executable, 'main.py' ]
timeout = 5
max_read_lines = 50
pattern = 'Application startup complete.'
terminate_on_interrupt = True
xprocess.ensure("restApi",Starter)
yield
xprocess.getinfo("restApi").terminate()

@pytest.fixture
def scheduler(xprocess,restApi):
class Starter(ProcessStarter):
env = {
'PYTHONUNBUFFERED':'1',
'PYTHONPATH':mupifDB.__path__[0],
'MUPIF_NS':f'localhost:'+PORTS['nameserver'],
'MUPIF_LOG_LEVEL':'DEBUG',
'MUPIFDB_REST_SERVER':'http://localhost:'+PORTS['restApi'],
}
popen_kwargs = { 'cwd': mupifDB.__path__[0]+'/api' }
args = [ sys.executable, '-c', 'from mupifDB import workflowscheduler as ws; ws.LOOP_SLEEP_SEC=.5; ws.schedulerStatFile="./sched-stat.json"; ws.main()' ]
timeout = 10
max_read_lines = 50
# pattern = 'Entering main loop to check for Pending executions'
pattern = 'procInit called'
terminate_on_interrupt = True
xprocess.ensure("scheduler",Starter)
yield
xprocess.getinfo("scheduler").terminate()

#@pytest.fixture
#def ex2server(scheduler):
# class Starter(ProcessStarter):
# env = {
# 'PYTHONUNBUFFERED':'1',
# 'MUPIF_NS':f'localhost:'+PORTS['nameserver'],
# 'MUPIF_LOG_LEVEL':'DEBUG',
# 'PYTHONPATH': mupifExamplesDir+'/02-distrib',
# }
# args = [ sys.executable, '-c', 'import application2; import mupif as mp; mp.pyroutil.runAppServer(ns=None,appName="mupif/example/app2",app=application2.Application2())' ]
# pattern = 'Running mupif/example/app2 at PYRO:obj_'
# terminate_on_interrupt = True
# timeout = 3
# xprocess.ensure('ex2server',Starter)
# yield
# xprocess.getinfo('ex2server').terminate()

@pytest.fixture(autouse=True, scope='session')
def test_suite_cleanup():
yield
import shutil
shutil.rmtree('.pytest_cache/d/.xprocess/mongodb',ignore_errors=True)



class TestFoo:
def test_workflowdemo01(self,scheduler):
wf=wfdemo01.workflowdemo()
md=lambda k: wf.getMetadata(k)
wid=md('ID')
id=mupifDB.workflowmanager.insertWorkflowDefinition(wid=wid,description=md('Description'),source=wfdemo01.__file__,useCase='useCase1',workflowInputs=md('Inputs'),workflowOutputs=md('Outputs'),modulename=wf.__module__.split('.')[-1],classname=wf.__class__.__name__,models_md=md('Models'))
print(f'Workflow inserted, {id=}')
wrec=restApiControl.getWorkflowRecord(wid)
assert wrec['wid']==wid
# print_json(data=wrec)
weid=restApiControl.createExecution(wid,version='1',ip='localhost')
print(f'Execution created, {weid=}')
restApiControl.scheduleExecution(weid)
print(f'Execution scheduled, {weid=}')
for i in range(10):
data=restApiControl.getExecutionRecord(weid)
print(f'Execution status: {data["Status"]}')
time.sleep(1)
assert data['Status'] in ('Failed',)


# time.sleep(10)
# def test_schedule(self, ex2server):
20 changes: 13 additions & 7 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("urllib3").propagate = False

LOOP_SLEEP_SEC=20

# try to import schedulerconfig.py
authToken = None
try:
Expand Down Expand Up @@ -313,6 +315,8 @@ def updateStatPersistent (schedulerStat):

def copyLogToDB (we_id, workflowLogName):
try:
with open(workflowLogName,'r') as f:
for l in f: log.info('WORKFLOW LOG: '+l[:-1])
log.info("Copying log files to database")
with open(workflowLogName, 'rb') as f:
logID = restApiControl.uploadBinaryFile(f)
Expand All @@ -328,8 +332,7 @@ def executeWorkflow(lock, schedulerStat, we_id: str) -> None:
log.info("executeWorkflow invoked")
return executeWorkflow_inner1(lock, schedulerStat, we_id)
except Exception as e:
log.error("Execution of workflow %s failed." % we_id)
log.error(repr(e))
log.exception("Execution of workflow %s failed." % we_id)

def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> None:
we_rec = restApiControl.getExecutionRecord(we_id)
Expand All @@ -355,9 +358,9 @@ def executeWorkflow_inner1(lock, schedulerStat, we_id: str) -> None:
log.error("WEID %s not scheduled for execution" % we_id)
raise KeyError("WEID %s not scheduled for execution" % we_id)

def executeWorkflow_inner2(lock, schedulerState, we_id: str, we_rec, workflow_record) -> None:
def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_record) -> None:
'''Process workflow which is already scheduled'''
wid = we_rec['WorkflowId']
wid = we_rec['WorkflowID']
completed = 1 # todo check
log.info("we_rec status is Scheduled, processing")
# execute the selected workflow
Expand Down Expand Up @@ -521,8 +524,7 @@ def checkExecutionResources(eid):
return False


if __name__ == '__main__':

def main():
if (Path(schedulerStatFile).is_file()):
with open(schedulerStatFile,'r') as f:
stat = json.load(f)
Expand Down Expand Up @@ -696,7 +698,7 @@ def checkExecutionResources(eid):
with statusLock:
updateStatPersistent(schedulerStat)
log.info("waiting..")
time.sleep(20)
time.sleep(LOOP_SLEEP_SEC)
except Exception as err:
log.info("Error: " + repr(err))
stop(pool)
Expand All @@ -707,3 +709,7 @@ def checkExecutionResources(eid):
log.error('Already running.')

log.info("Exiting MupifDB Workflow Scheduler\n")

if __name__ == '__main__':
main()

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ astropy
rich
pytest-rich
pytest
pytest-xprocess
requests
parsy
attrdict3
Expand Down
41 changes: 21 additions & 20 deletions workflows/workflowdemo01.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import mupif
import mupif.physics.physicalquantities as PQ
import mupif as mp
import logging
from pymongo import MongoClient
import argparse
import sys
import mupifDB.workflowmanager
from bson import ObjectId

log = logging.getLogger()

class workflowdemo (mupif.workflow.Workflow):
def __init__(self, metaData={}):
def __init__(self, metadata={}):
"""
Initializes the workflow.
"""
Expand All @@ -20,34 +15,35 @@ def __init__(self, metaData={}):
'Description': 'Demo thermal problem using finite elements on rectangular domain',
'Model_refs_ID': [],
'Inputs': [
{'Name': 'Effective conductivity', 'Type': 'mupif.Property', 'Required': False, 'Type_ID': 'mupif.PropertyID.PID_effective_conductivity', 'Units':'W/m/K'},
{'Name': 'Dimension', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.PropertyID.PID_Dimension', 'Units':'m', 'Obj_ID': [0,1]},
{'Name': 'Prescribed temperature', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.PropertyID.PID_dirichletBC', 'Units':'K', 'Obj_ID': [0,1,2,3]},
{'Name': 'External temperature', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.PropertyID.PID_conventionExternalTemperature', 'Units':'K', 'Obj_ID': [0,1,2,3]},
{'Name': 'Convention coefficient', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.PropertyID.PID_conventionCoefficient', 'Units':'none', 'Obj_ID': [0,1,2,3]}
{'Name': 'Effective conductivity', 'Type': 'mupif.Property', 'Required': False, 'Type_ID': 'mupif.DataID.PID_HeatConductivityLiquid', 'Units':'W/m/K', 'Set_at':'timestep'},
{'Name': 'Dimension', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.DataID.PID_Dimension', 'Units':'m', 'Obj_ID': [0,1], 'Set_at':'timestep'},
{'Name': 'Prescribed temperature', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.DataID.PID_dirichletBC', 'Units':'K', 'Obj_ID': [0,1,2,3], 'Set_at':'timestep'},
{'Name': 'External temperature', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.DataID.PID_conventionExternalTemperature', 'Units':'K', 'Obj_ID': [0,1,2,3], 'Set_at':'timestep'},
{'Name': 'Convention coefficient', 'Type': 'mupif.Property', 'Required': False,'Type_ID': 'mupif.DataID.PID_conventionCoefficient', 'Units':'none', 'Obj_ID': [0,1,2,3], 'Set_at':'timestep'}
],
'Outputs': [
{'Name':'Temperature field', 'Type': 'mupif.Field', 'Required':True,'Type_ID':'mupif.FieldID.FID_Temperature', 'Units':'T'}
]
{'Name':'Temperature field', 'Type': 'mupif.Field', 'Required':True,'Type_ID':'mupif.DataID.FID_Temperature', 'Units':'T'}
],
'Models': [],
}
super(workflowdemo, self).__init__(metaData=MD)
self.updateMetadata(metaData)
super().__init__(metadata=MD)
self.updateMetadata(metadata)


def initialize(self, file='', workdir='', targetTime=PQ.PhysicalQuantity(0., 's'), metaData={}, validateMetaData=True, **kwargs):
super(workflowdemo, self).initialize(file, workdir, targetTime, metaData, validateMetaData, **kwargs)
def initialize(self, workdir='', metadata=None, validateMetaData=True, **kwargs):
super().initialize(workdir=workdir, metadata=metadata, validateMetaData=validateMetaData, **kwargs)

def solveStep(self, istep, stageID=0, runInBackground=False):
log.info ('Workflow02 solveStep finished')

def getField(self, fieldID, time, objectID=0):
if fieldID == mupif.FieldID.FID_Temperature:
if fieldID == mupif.DataID.FID_Temperature:
return mupif.field.Field(mupif.mesh.UnstructuredMesh(), mupif.FieldID.FID_Temperature,mupif.ValueType.Scalar, 'none', time)
else:
pass

def getCriticalTimeStep(self):
return PQ.PhysicalQuantity(1.0, 's')
return 1*mp.U['s']

def terminate(self):
super(workflowdemo, self).terminate()
Expand All @@ -60,6 +56,11 @@ def getAPIVersion(self):


if __name__ == "__main__":
import argparse
import sys
import mupifDB.workflowmanager
from bson import ObjectId

# execute only if run as a script
client = MongoClient()
db = client.MuPIF
Expand Down

0 comments on commit 3abf1aa

Please sign in to comment.