Skip to content

Commit

Permalink
Merge pull request #11517 from vkuznet/fix-issue-11514
Browse files Browse the repository at this point in the history
Implement MSPileup clean-up task
  • Loading branch information
amaltaro authored Mar 22, 2023
2 parents eaaf7a3 + 7e747e8 commit 570e0a2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, msConfig, **kwargs):
self.rucioAccount = msConfig.get('rucioAccount', 'wmcore_transferor')
self.rucioUrl = msConfig['rucioUrl'] # aligned with MSCore init
self.rucioAuthUrl = msConfig['rucioAuthUrl'] # aligned with MSCore init
self.cleanupDaysThreshold = msConfig.get('cleanupDaysThreshold', 15)
dryRun = msConfig.get('dryRun', False)
self.rucioClient = self.rucio # set in MSCore init
self.dataManager = MSPileupData(msConfig)
Expand All @@ -59,3 +60,4 @@ def executeCycle(self):
self.mgr.monitoringTask()
self.mgr.activeTask(marginSpace=self.marginSpace)
self.mgr.inactiveTask()
self.mgr.cleanupTask(self.cleanupDaysThreshold)
26 changes: 26 additions & 0 deletions src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,32 @@ def pileupSizeTask(self):
msg = f"MSPileup pileup size task failed with error {exp}"
self.logger.exception(msg)

def cleanupTask(self, cleanupDaysThreshold):
"""
Execute cleanup task according to the following logic:
1. Fetch documents from backend database for the following conditions
- active=False, and
- empty ruleIds list; and
- empty currentRSEs; and
document has been deactivated for a while (deactivatedOn=XXX),
2. For those documents which are fetched make delete call to backend database
:param timeThreshold: time threshold in days which will determine document clean-up readiness
"""
spec = {'active': False}
docs = self.mgr.getPileup(spec)
deleteDocs = 0
seconds = cleanupDaysThreshold * 24 * 60 * 60 # convert to second
for doc in docs:
if not doc['ruleIds'] and not doc['currentRSEs'] and \
doc['deactivatedOn'] + seconds > time.time():
spec = {'_id': doc['_id']}
self.logger.info("Cleanup task delete pileup %s", doc['pileupName'])
self.mgr.deletePileup(spec)
deleteDocs += 1
self.logger.info("Cleanup task deleted %d pileup objects", deleteDocs)

def monitoringTask(self):
"""
Execute Monitoring task according to the following logic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ def testMSPileupTasksWithMockApi(self):
found = True
self.assertEqual(found, True)

# update doc in MSPileup and call cleanup task to delete it
data['active'] = False
data['rulesIds'] = []
data['currentRSEs'] = []
data['deactivatedOn'] = 0
self.mgr.updatePileup(data)
obj.cleanupTask(0)


if __name__ == '__main__':
unittest.main()

0 comments on commit 570e0a2

Please sign in to comment.