Skip to content

Commit

Permalink
sweep: DIRACGrid#7093 refactor job submit
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Aug 3, 2023
1 parent 50af210 commit 415196a
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 191 deletions.
4 changes: 2 additions & 2 deletions src/DIRAC/Core/Utilities/ReturnValues.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(self, result: DErrorReturnType | str, errCode: int = 0):
self.result = cast(DErrorReturnType, result)


def returnValueOrRaise(result: DReturnType[T]) -> T:
def returnValueOrRaise(result: DReturnType[T], *, errorCode: int = 0) -> T:
"""Unwrap an S_OK/S_ERROR response into a value or Exception
This method assists with using exceptions in DIRAC code by raising
Expand All @@ -217,7 +217,7 @@ def returnValueOrRaise(result: DReturnType[T]) -> T:
if "ExecInfo" in result:
raise result["ExecInfo"][0]
else:
raise SErrorException(result)
raise SErrorException(result, errorCode)
return result["Value"]


Expand Down
242 changes: 53 additions & 189 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,22 @@
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, convertToReturnValue
from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN, cmpError
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient

#############################################################################
# utility functions


def compressJDL(jdl):
"""Return compressed JDL string."""
return base64.b64encode(zlib.compress(jdl.encode(), -1)).decode()


def extractJDL(compressedJDL):
"""Return decompressed JDL string."""
# the starting bracket is guaranteeed by JobManager.submitJob
# we need the check to be backward compatible
if isinstance(compressedJDL, bytes):
if compressedJDL.startswith(b"["):
return compressedJDL.decode()
else:
if compressedJDL.startswith("["):
return compressedJDL
return zlib.decompress(base64.b64decode(compressedJDL)).decode()


#############################################################################
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import (
checkAndAddOwner,
fixJDL,
checkAndPrepareJob,
createJDLWithInitialStatus,
compressJDL,
extractJDL,
)


class JobDB(DB):
Expand All @@ -70,10 +54,6 @@ def __init__(self, parentLogger=None):
self.maxRescheduling = self.getCSOption("MaxRescheduling", 3)

# loading the function that will be used to determine the platform (it can be VO specific)
res = ObjectLoader().loadObject("ConfigurationSystem.Client.Helpers.Resources", "getDIRACPlatform")
if not res["OK"]:
self.log.fatal(res["Message"])
self.getDIRACPlatform = res["Value"]

self.jobAttributeNames = []

Expand Down Expand Up @@ -923,42 +903,28 @@ def insertNewJobIntoDB(
:param str initialMinorStatus: optional initial minor job status
:return: new job ID
"""
jobManifest = JobManifest()
result = jobManifest.load(jdl)
if not result["OK"]:
return result
jobManifest.setOptionsFromDict({"Owner": owner, "OwnerGroup": ownerGroup})
result = jobManifest.check()
jobAttrs = {
"LastUpdateTime": str(datetime.datetime.utcnow()),
"SubmissionTime": str(datetime.datetime.utcnow()),
"Owner": owner,
"OwnerGroup": ownerGroup,
}

result = checkAndAddOwner(jdl, owner, ownerGroup)
if not result["OK"]:
return result
jobAttrNames = []
jobAttrValues = []
jobManifest = result["Value"]
jdl = fixJDL(jdl)

# 1.- insert original JDL on DB and get new JobID
# Fix the possible lack of the brackets in the JDL
if jdl.strip()[0].find("[") != 0:
jdl = "[" + jdl + "]"
result = self.__insertNewJDL(jdl)
if not result["OK"]:
return S_ERROR(EWMSSUBM, "Failed to insert JDL in to DB")

jobID = result["Value"]

jobManifest.setOption("JobID", jobID)

jobAttrNames.append("JobID")
jobAttrValues.append(jobID)

jobAttrNames.append("LastUpdateTime")
jobAttrValues.append(str(datetime.datetime.utcnow()))

jobAttrNames.append("SubmissionTime")
jobAttrValues.append(str(datetime.datetime.utcnow()))

jobAttrNames.append("Owner")
jobAttrValues.append(owner)

jobAttrNames.append("OwnerGroup")
jobAttrValues.append(ownerGroup)
jobAttrs["JobID"] = jobID

# 2.- Check JDL and Prepare DIRAC JDL
jobJDL = jobManifest.dumpAsJDL()
Expand All @@ -972,13 +938,11 @@ def insertNewJobIntoDB(
retVal = S_OK(jobID)
retVal["JobID"] = jobID
if not classAdJob.isOK():
jobAttrNames.append("Status")
jobAttrValues.append(JobStatus.FAILED)
jobAttrs["Status"] = JobStatus.FAILED

jobAttrNames.append("MinorStatus")
jobAttrValues.append("Error in JDL syntax")
jobAttrs["MinorStatus"] = "Error in JDL syntax"

result = self.insertFields("Jobs", jobAttrNames, jobAttrValues)
result = self.insertFields("Jobs", inDict=jobAttrs)
if not result["OK"]:
return result

Expand All @@ -987,53 +951,21 @@ def insertNewJobIntoDB(
return retVal

classAdJob.insertAttributeInt("JobID", jobID)
result = self.__checkAndPrepareJob(
jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrNames, jobAttrValues
)
vo = getVOForGroup(ownerGroup)
result = self.__checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo)
if not result["OK"]:
return result

priority = classAdJob.getAttributeInt("Priority")
if priority is None:
priority = 0
jobAttrNames.append("UserPriority")
jobAttrValues.append(priority)

for jdlName in self.jdl2DBParameters:
# Defaults are set by the DB.
jdlValue = classAdJob.getAttributeString(jdlName)
if jdlValue:
jobAttrNames.append(jdlName)
jobAttrValues.append(jdlValue)

jdlValue = classAdJob.getAttributeString("Site")
if jdlValue:
jobAttrNames.append("Site")
if jdlValue.find(",") != -1:
jobAttrValues.append("Multiple")
else:
jobAttrValues.append(jdlValue)

jobAttrNames.append("VerifiedFlag")
jobAttrValues.append("True")

jobAttrNames.append("Status")
jobAttrValues.append(initialStatus)

jobAttrNames.append("MinorStatus")
jobAttrValues.append(initialMinorStatus)

reqJDL = classAdReq.asJDL()
classAdJob.insertAttributeInt("JobRequirements", reqJDL)

jobJDL = classAdJob.asJDL()
jobJDL = createJDLWithInitialStatus(
classAdJob, classAdReq, self.jdl2DBParameters, jobAttrs, initialStatus, initialMinorStatus
)

result = self.setJobJDL(jobID, jobJDL)
if not result["OK"]:
return result

# Adding the job in the Jobs table
result = self.insertFields("Jobs", jobAttrNames, jobAttrValues)
result = self.insertFields("Jobs", inDict=jobAttrs)
if not result["OK"]:
return result

Expand Down Expand Up @@ -1076,79 +1008,22 @@ def insertNewJobIntoDB(

return retVal

def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrNames, jobAttrValues):
def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo):
"""
Check Consistency of Submitted JDL and set some defaults
Prepare subJDL with Job Requirements
"""
error = ""
vo = getVOForGroup(ownerGroup)

jdlOwner = classAdJob.getAttributeString("Owner")
jdlOwnerGroup = classAdJob.getAttributeString("OwnerGroup")
jdlVO = classAdJob.getAttributeString("VirtualOrganization")

if jdlOwner and jdlOwner != owner:
error = "Wrong Owner in JDL"
elif jdlOwnerGroup and jdlOwnerGroup != ownerGroup:
error = "Wrong Owner Group in JDL"
elif jdlVO and jdlVO != vo:
error = "Wrong Virtual Organization in JDL"

classAdJob.insertAttributeString("Owner", owner)
classAdJob.insertAttributeString("OwnerGroup", ownerGroup)

if vo:
classAdJob.insertAttributeString("VirtualOrganization", vo)

classAdReq.insertAttributeString("OwnerGroup", ownerGroup)
if vo:
classAdReq.insertAttributeString("VirtualOrganization", vo)
retVal = checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo)

inputDataPolicy = Operations(vo=vo).getValue("InputDataPolicy/InputDataModule")
if inputDataPolicy and not classAdJob.lookupAttribute("InputDataModule"):
classAdJob.insertAttributeString("InputDataModule", inputDataPolicy)
if not retVal["OK"]:
if cmpError(retVal, EWMSSUBM):
resultInsert = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()))
if not resultInsert["OK"]:
retVal["MinorStatus"] += f"; {resultInsert['Message']}"

# priority
priority = classAdJob.getAttributeInt("Priority")
if priority is None:
priority = 0
classAdReq.insertAttributeInt("UserPriority", priority)

# CPU time
cpuTime = classAdJob.getAttributeInt("CPUTime")
if cpuTime is None:
opsHelper = Operations(group=ownerGroup)
cpuTime = opsHelper.getValue("JobDescription/DefaultCPUTime", 86400)
classAdReq.insertAttributeInt("CPUTime", cpuTime)

# platform(s)
platformList = classAdJob.getListFromExpression("Platform")
if platformList:
result = self.getDIRACPlatform(platformList)
if not result["OK"]:
return result
if result["Value"]:
classAdReq.insertAttributeVectorString("Platforms", result["Value"])
return retVal
else:
error = "OS compatibility info not found"

if error:
retVal = S_ERROR(EWMSSUBM, error)
retVal["JobId"] = jobID
retVal["Status"] = JobStatus.FAILED
retVal["MinorStatus"] = error

jobAttrNames.append("Status")
jobAttrValues.append(JobStatus.FAILED)

jobAttrNames.append("MinorStatus")
jobAttrValues.append(error)
resultInsert = self.setJobAttributes(jobID, jobAttrNames, jobAttrValues)
if not resultInsert["OK"]:
retVal["MinorStatus"] += f"; {resultInsert['Message']}"

return retVal
return retVal

return S_OK()

Expand Down Expand Up @@ -1237,11 +1112,7 @@ def rescheduleJob(self, jobID):
return res
return S_ERROR(f"Maximum number of reschedulings is reached: {self.maxRescheduling}")

jobAttrNames = []
jobAttrValues = []

jobAttrNames.append("RescheduleCounter")
jobAttrValues.append(rescheduleCounter)
jobAttrs = {"RescheduleCounter": rescheduleCounter}

# Save the job parameters for later debugging
result = JobMonitoringClient().getJobParameters(jobID)
Expand Down Expand Up @@ -1281,14 +1152,15 @@ def rescheduleJob(self, jobID):
retVal["JobID"] = jobID

classAdJob.insertAttributeInt("JobID", jobID)

result = self.__checkAndPrepareJob(
jobID,
classAdJob,
classAdReq,
resultDict["Owner"],
resultDict["OwnerGroup"],
jobAttrNames,
jobAttrValues,
jobAttrs,
getVOForGroup(resultDict["OwnerGroup"]),
)

if not result["OK"]:
Expand All @@ -1297,8 +1169,7 @@ def rescheduleJob(self, jobID):
priority = classAdJob.getAttributeInt("Priority")
if priority is None:
priority = 0
jobAttrNames.append("UserPriority")
jobAttrValues.append(priority)
jobAttrs["UserPriority"] = priority

siteList = classAdJob.getListFromExpression("Site")
if not siteList:
Expand All @@ -1308,26 +1179,19 @@ def rescheduleJob(self, jobID):
else:
site = siteList[0]

jobAttrNames.append("Site")
jobAttrValues.append(site)
jobAttrs["Site"] = site

jobAttrNames.append("Status")
jobAttrValues.append(JobStatus.RECEIVED)
jobAttrs["Status"] = JobStatus.RECEIVED

jobAttrNames.append("MinorStatus")
jobAttrValues.append(JobMinorStatus.RESCHEDULED)
jobAttrs["MinorStatus"] = JobMinorStatus.RESCHEDULED

jobAttrNames.append("ApplicationStatus")
jobAttrValues.append("Unknown")
jobAttrs["ApplicationStatus"] = "Unknown"

jobAttrNames.append("ApplicationNumStatus")
jobAttrValues.append(0)
jobAttrs["ApplicationNumStatus"] = 0

jobAttrNames.append("LastUpdateTime")
jobAttrValues.append(str(datetime.datetime.utcnow()))
jobAttrs["LastUpdateTime"] = str(datetime.datetime.utcnow())

jobAttrNames.append("RescheduleTime")
jobAttrValues.append(str(datetime.datetime.utcnow()))
jobAttrs["RescheduleTime"] = str(datetime.datetime.utcnow())

reqJDL = classAdReq.asJDL()
classAdJob.insertAttributeInt("JobRequirements", reqJDL)
Expand All @@ -1346,7 +1210,7 @@ def rescheduleJob(self, jobID):
if not result["OK"]:
return result

result = self.setJobAttributes(jobID, jobAttrNames, jobAttrValues, force=True)
result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True)
if not result["OK"]:
return result

Expand Down
Loading

0 comments on commit 415196a

Please sign in to comment.