Skip to content

Commit

Permalink
Implement MSPileup clean-up task
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Mar 21, 2023
1 parent eaaf7a3 commit 6f6d4b2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, msConfig, **kwargs):
self.rucioAccount = msConfig.get('rucioAccount', 'wmcore_transferor')
self.rucioUrl = msConfig['rucioUrl'] # aligned with MSCore init
self.rucioAuthUrl = msConfig['rucioAuthUrl'] # aligned with MSCore init
self.cleanupDaysThreshold = msConfig.get('cleanupDaysThreshold', 15)
dryRun = msConfig.get('dryRun', False)
self.rucioClient = self.rucio # set in MSCore init
self.dataManager = MSPileupData(msConfig)
Expand All @@ -59,3 +60,4 @@ def executeCycle(self):
self.mgr.monitoringTask()
self.mgr.activeTask(marginSpace=self.marginSpace)
self.mgr.inactiveTask()
self.mgr.cleanupTask(self.cleanupDaysThreshold)
26 changes: 26 additions & 0 deletions src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,32 @@ def pileupSizeTask(self):
msg = f"MSPileup pileup size task failed with error {exp}"
self.logger.exception(msg)

def cleanupTask(self, cleanupDaysThreshold):
"""
Execute cleanup task according to the following logic:
1. Fetch documents from backend database for the following conditions
- active=False, and
- empty ruleIds list; and
- empty currentRSEs; and
document has been deactivated for a while (deactivatedOn=XXX),
2. For those documents which are fetched make delete call to backend database
:param timeThreshold: time threshold in days which will determine document clean-up readiness
"""
spec = {'active': False}
docs = self.mgr.getPileup(spec)
deleteDocs = 0
seconds = cleanupDaysThreshold * 24 * 60 * 60 # convert to second
for doc in docs:
if not doc['ruleIds'] and not doc['currentRSEs'] and \
doc['deactivatedOn'] + seconds > time.time():
spec = {'_id': doc['_id']}
self.logger.info("Cleanup task delete pileup %s", doc['pileupName'])
self.mgr.deletePileup(spec)
deleteDocs += 1
self.logger.info("Cleanup task deleted %d pileup objects", deleteDocs)

def monitoringTask(self):
"""
Execute Monitoring task according to the following logic:
Expand Down

0 comments on commit 6f6d4b2

Please sign in to comment.