Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] the TransformationCleaningAgent forces jobs to be killed #7906

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import errno
import os
import re
import time
from datetime import datetime, timedelta
from hashlib import md5

# # from DIRAC
from DIRAC import S_ERROR, S_OK
Expand Down Expand Up @@ -613,7 +611,7 @@ def __removeWMSTasks(self, transJobIDs):
jobIDs = [int(j) for j in transJobIDs if int(j)]
allRemove = True
for jobList in breakListIntoChunks(jobIDs, 500):
res = self.wmsClient.killJob(jobList)
res = self.wmsClient.killJob(jobList, force=True)
if res["OK"]:
self.log.info(f"Successfully killed {len(jobList)} jobs from WMS")
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
Expand Down Expand Up @@ -679,6 +677,11 @@ def __submitRemovalRequests(self, lfns, transID=0):
:param int transID: transformationID, only used in RequestName
:returns: S_ERROR/S_OK
"""

# These imports are used only in this function
import time
from hashlib import md5

for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)):
oRequest = Request()
requestName = "TCA_{transID}_{index}_{md5(repr(time.time()).encode()).hexdigest()[:5]}"
Expand Down
8 changes: 4 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,17 @@ def submitJob(self, jdl, jobDescriptionObject=None):

return result

def killJob(self, jobID):
def killJob(self, jobID, force=False):
"""Kill running job.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
return self.jobManager.killJob(jobID)
return self.jobManager.killJob(jobID, force=force)

def deleteJob(self, jobID):
def deleteJob(self, jobID, force=False):
"""Delete job(s) (set their status to DELETED) from the WMS Job database.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
return self.jobManager.deleteJob(jobID)
return self.jobManager.deleteJob(jobID, force=force)

def removeJob(self, jobID):
"""Fully remove job(s) from the WMS Job database.
Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def setJobsMajorStatus(self, jIDList, candidateStatus, force=False):

return self._update(cmd)

def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""):
def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", force=False):
"""Set status of the job specified by its jobID"""
# Do not update the LastUpdate time stamp if setting the Stalled status
update_flag = True
Expand All @@ -620,7 +620,7 @@ def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""):
attrNames.append("ApplicationStatus")
attrValues.append(applicationStatus[:255])

result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag)
result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag, force=force)
if not result["OK"]:
return result

Expand Down
29 changes: 16 additions & 13 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
Expand Down Expand Up @@ -435,13 +435,14 @@ def export_removeJob(self, jobIDs):

return S_OK(validJobList)

def __deleteJob(self, jobID):
"""Set the job status to "Deleted" and remove the pilot that ran.
def __deleteJob(self, jobID, force=False):
"""Set the job status to "Deleted"
and remove the pilot that ran and its logging info if the pilot is finished.

:param int jobID: job ID
:return: S_OK()/S_ERROR()
"""
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting"))["OK"]:
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]:
return result

if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
Expand Down Expand Up @@ -469,7 +470,7 @@ def __deleteJob(self, jobID):

return S_OK()

def __killJob(self, jobID, sendKillCommand=True):
def __killJob(self, jobID, sendKillCommand=True, force=False):
"""Kill one job

:param int jobID: job ID
Expand All @@ -482,14 +483,16 @@ def __killJob(self, jobID, sendKillCommand=True):
return result

self.log.info("Job marked for termination", jobID)
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination"))["OK"]:
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))[
"OK"
]:
self.log.warn("Failed to set job Killed status", result["Message"])
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
self.log.warn("Failed to delete job from the TaskQueue", result["Message"])

return S_OK()

def _kill_delete_jobs(self, jobIDList, right):
def _kill_delete_jobs(self, jobIDList, right, force=False):
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary

:param list jobIDList: job IDs
Expand Down Expand Up @@ -529,12 +532,12 @@ def _kill_delete_jobs(self, jobIDList, right):
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]

for jobID in killJobList:
result = self.__killJob(jobID)
result = self.__killJob(jobID, force=force)
if not result["OK"]:
badIDs.append(jobID)

for jobID in deleteJobList:
result = self.__deleteJob(jobID)
result = self.__deleteJob(jobID, force=force)
if not result["OK"]:
badIDs.append(jobID)

Expand Down Expand Up @@ -567,28 +570,28 @@ def _kill_delete_jobs(self, jobIDList, right):
###########################################################################
types_deleteJob = []

def export_deleteJob(self, jobIDs):
def export_deleteJob(self, jobIDs, force=False):
"""Delete jobs specified in the jobIDs list

:param list jobIDs: list of job IDs

:return: S_OK/S_ERROR
"""

return self._kill_delete_jobs(jobIDs, RIGHT_DELETE)
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE, force=force)

###########################################################################
types_killJob = []

def export_killJob(self, jobIDs):
def export_killJob(self, jobIDs, force=False):
"""Kill jobs specified in the jobIDs list

:param list jobIDs: list of job IDs

:return: S_OK/S_ERROR
"""

return self._kill_delete_jobs(jobIDs, RIGHT_KILL)
return self._kill_delete_jobs(jobIDs, RIGHT_KILL, force=force)

###########################################################################
types_resetJob = []
Expand Down
Loading