diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index 4917cd87d7d..71c3d6bdd8b 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -12,9 +12,7 @@ import errno import os import re -import time from datetime import datetime, timedelta -from hashlib import md5 # # from DIRAC from DIRAC import S_ERROR, S_OK @@ -613,7 +611,7 @@ def __removeWMSTasks(self, transJobIDs): jobIDs = [int(j) for j in transJobIDs if int(j)] allRemove = True for jobList in breakListIntoChunks(jobIDs, 500): - res = self.wmsClient.killJob(jobList) + res = self.wmsClient.killJob(jobList, force=True) if res["OK"]: self.log.info(f"Successfully killed {len(jobList)} jobs from WMS") elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res): @@ -679,6 +677,11 @@ def __submitRemovalRequests(self, lfns, transID=0): :param int transID: transformationID, only used in RequestName :returns: S_ERROR/S_OK """ + + # These imports are used only in this function + import time + from hashlib import md5 + for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)): oRequest = Request() requestName = "TCA_{transID}_{index}_{md5(repr(time.time()).encode()).hexdigest()[:5]}" diff --git a/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py b/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py index 37f7d175132..d5d70c653c6 100755 --- a/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py @@ -212,17 +212,17 @@ def submitJob(self, jdl, jobDescriptionObject=None): return result - def killJob(self, jobID): + def killJob(self, jobID, force=False): """Kill running job. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - return self.jobManager.killJob(jobID) + return self.jobManager.killJob(jobID, force=force) - def deleteJob(self, jobID): + def deleteJob(self, jobID, force=False): """Delete job(s) (set their status to DELETED) from the WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - return self.jobManager.deleteJob(jobID) + return self.jobManager.deleteJob(jobID, force=force) def removeJob(self, jobID): """Fully remove job(s) from the WMS Job database. diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 699c2cd9e37..5fca9478adf 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -601,7 +601,7 @@ def setJobsMajorStatus(self, jIDList, candidateStatus, force=False): return self._update(cmd) - def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""): + def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", force=False): """Set status of the job specified by its jobID""" # Do not update the LastUpdate time stamp if setting the Stalled status update_flag = True @@ -620,7 +620,7 @@ def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""): attrNames.append("ApplicationStatus") attrValues.append(applicationStatus[:255]) - result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag) + result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag, force=force) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index abf908782b9..d34b2712042 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -22,8 +22,8 @@ from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient -from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_DELETE, RIGHT_KILL, @@ -435,13 +435,14 @@ def export_removeJob(self, jobIDs): return S_OK(validJobList) - def __deleteJob(self, jobID): - """Set the job status to "Deleted" and remove the pilot that ran. + def __deleteJob(self, jobID, force=False): + """Set the job status to "Deleted" + and remove the pilot that ran and its logging info if the pilot is finished. :param int jobID: job ID :return: S_OK()/S_ERROR() """ - if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting"))["OK"]: + if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]: return result if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]: @@ -469,7 +470,7 @@ def __deleteJob(self, jobID): return S_OK() - def __killJob(self, jobID, sendKillCommand=True): + def __killJob(self, jobID, sendKillCommand=True, force=False): """Kill one job :param int jobID: job ID @@ -482,14 +483,16 @@ def __killJob(self, jobID, sendKillCommand=True): return result self.log.info("Job marked for termination", jobID) - if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination"))["OK"]: + if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))[ + "OK" + ]: self.log.warn("Failed to set job Killed status", result["Message"]) if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]: self.log.warn("Failed to delete job from the TaskQueue", result["Message"]) return S_OK() - def _kill_delete_jobs(self, jobIDList, right): + def _kill_delete_jobs(self, jobIDList, right, force=False): """Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary :param list jobIDList: job IDs @@ -529,12 +532,12 @@ def _kill_delete_jobs(self, jobIDList, right): stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING] for jobID in killJobList: - result = self.__killJob(jobID) + result = self.__killJob(jobID, force=force) if not result["OK"]: badIDs.append(jobID) for jobID in deleteJobList: - result = self.__deleteJob(jobID) + result = self.__deleteJob(jobID, force=force) if not result["OK"]: badIDs.append(jobID) @@ -567,7 +570,7 @@ def _kill_delete_jobs(self, jobIDList, right): ########################################################################### types_deleteJob = [] - def export_deleteJob(self, jobIDs): + def export_deleteJob(self, jobIDs, force=False): """Delete jobs specified in the jobIDs list :param list jobIDs: list of job IDs @@ -575,12 +578,12 @@ def export_deleteJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self._kill_delete_jobs(jobIDs, RIGHT_DELETE) + return self._kill_delete_jobs(jobIDs, RIGHT_DELETE, force=force) ########################################################################### types_killJob = [] - def export_killJob(self, jobIDs): + def export_killJob(self, jobIDs, force=False): """Kill jobs specified in the jobIDs list :param list jobIDs: list of job IDs @@ -588,7 +591,7 @@ def export_killJob(self, jobIDs): :return: S_OK/S_ERROR """ - return self._kill_delete_jobs(jobIDs, RIGHT_KILL) + return self._kill_delete_jobs(jobIDs, RIGHT_KILL, force=force) ########################################################################### types_resetJob = []