diff --git a/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py b/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py index 41274d6ea9..07dc5665a7 100644 --- a/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py +++ b/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py @@ -21,6 +21,7 @@ from WMCore.Services.Rucio.Rucio import Rucio, WMRucioException, RUCIO_VALID_PROJECT from WMCore.WMException import WMException from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread +from Utils.IteratorTools import grouper class RucioInjectorException(WMException): @@ -78,6 +79,9 @@ def __init__(self, config): authUrl=config.RucioInjector.rucioAuthUrl, configDict={'logger': self.logger}) + self.useDsetReplicaDeep = getattr(config.RucioInjector, "useDsetReplicaDeep", False) + self.delBlockSlicesize = getattr(config.RucioInjector, "delBlockSlicesize", 100) + # metadata dictionary information to be added to block/container rules # cannot be a python dictionary, but a JSON string instead self.metaData = json.dumps(dict(agentHost=config.Agent.hostName, @@ -341,69 +345,74 @@ def deleteBlocks(self): logging.info("Found %d candidate blocks for rule deletion", len(blockDict)) - blocksToDelete = [] - containerDict = {} - # Populate containerDict, assigning each block to its correspondant container - for blockName in blockDict: - container = blockDict[blockName]['dataset'] - # If the container is not in the dictionary, create a new entry for it - if container not in containerDict: - # Set of sites to which the container needs to be transferred - sites = set(x.replace("_MSS", "_Tape") for x in blockDict[blockName]['sites']) - containerDict[container] = {'blocks': [], 'rse': sites} - containerDict[container]['blocks'].append(blockName) - - for contName in containerDict: - cont = containerDict[contName] - - # Checks if the container is not requested in any sites. - # This should never be triggered, but better safe than sorry - if not cont['rse']: - logging.warning("No rules for container: %s. Its blocks won't be deleted.", contName) - continue + for blocksSlice in grouper(blockDict, self.delBlockSlicesize): + logging.info("Handeling %d candidate blocks", len(blocksSlice)) + containerDict = {} + # Populate containerDict, assigning each block to its correspondant container + for blockName in blocksSlice: + container = blockDict[blockName]['dataset'] + # If the container is not in the dictionary, create a new entry for it + if container not in containerDict: + # Set of sites to which the container needs to be transferred + sites = set(x.replace("_MSS", "_Tape") for x in blockDict[blockName]['sites']) + containerDict[container] = {'blocks': [], 'rse': sites} + containerDict[container]['blocks'].append(blockName) + + blocksToDelete = [] + for contName in containerDict: + cont = containerDict[contName] + + # Checks if the container is not requested in any sites. + # This should never be triggered, but better safe than sorry + if not cont['rse']: + logging.warning("No rules for container: %s. Its blocks won't be deleted.", contName) + continue - try: - # Get RSE in which each block is available - availableRSEs = self.rucio.getReplicaInfoForBlocks(block=cont['blocks']) - except Exception as exc: - msg = "Failed to get replica info for blocks in container: %s.\n" % contName - msg += "Will retry again in the next cycle. Error: %s" % str(exc) - logging.error(msg) - continue + try: + # Get RSE in which each block is available + availableRSEs = self.rucio.getReplicaInfoForBlocks(block=cont['blocks'], deep=self.useDsetReplicaDeep) + except Exception as exc: + msg = "Failed to get replica info for blocks in container: %s.\n" % contName + msg += "Will retry again in the next cycle. Error: %s" % str(exc) + logging.error(msg) + continue - for blockRSEs in availableRSEs: - # If block is available at every RSE its container needs to be transferred, the block can be deleted - blockSites = set(blockRSEs['replica']) - if cont['rse'].issubset(blockSites): - blocksToDelete.append(blockRSEs['name']) + for blockRSEs in availableRSEs: + # If block is available at every RSE its container needs to be transferred, the block can be deleted + blockSites = set(blockRSEs['replica']) + logging.debug("BlockName: %s", blockRSEs['name']) + logging.debug("Needed: %s / Available: %s", str(cont['rse']), str(blockSites)) + if cont['rse'].issubset(blockSites): + blocksToDelete.append(blockRSEs['name']) + + # Delete agent created rules locking the block + binds = [] + logging.info("Going to delete %d block rules", len(blocksToDelete)) + for block in blocksToDelete: + try: + rules = self.rucio.listDataRules(block, scope=self.scope, account=self.rucioAcct) + except WMRucioException as exc: + logging.warning("Unable to retrieve replication rules for block: %s. Will retry in the next cycle.", block) + else: + if not rules: + logging.info("Block rule for: %s has been deleted by previous cycles", block) + binds.append({'DELETED': 1, 'BLOCKNAME': block}) + continue + for rule in rules: + deletedRules = 0 + if self.rucio.deleteRule(rule['id'], purgeReplicas=True): + logging.info("Successfully deleted rule: %s, for block %s.", rule['id'], block) + deletedRules += 1 + else: + logging.warning("Failed to delete rule: %s, for block %s. Will retry in the next cycle.", rule['id'], block) + if deletedRules == len(rules): + binds.append({'DELETED': 1, 'BLOCKNAME': block}) + logging.info("Successfully deleted all rules for block %s.", block) - # Delete agent created rules locking the block - binds = [] - logging.info("Going to delete %d block rules", len(blocksToDelete)) - for block in blocksToDelete: - try: - rules = self.rucio.listDataRules(block, scope=self.scope, account=self.rucioAcct) - except WMRucioException as exc: - logging.warning("Unable to retrieve replication rules for block: %s. Will retry in the next cycle.", block) - else: - if not rules: - logging.info("Block rule for: %s has been deleted by previous cycles", block) - binds.append({'DELETED': 1, 'BLOCKNAME': block}) - continue - for rule in rules: - deletedRules = 0 - if self.rucio.deleteRule(rule['id'], purgeReplicas=True): - logging.info("Successfully deleted rule: %s, for block %s.", rule['id'], block) - deletedRules += 1 - else: - logging.warning("Failed to delete rule: %s, for block %s. Will retry in the next cycle.", rule['id'], block) - if deletedRules == len(rules): - binds.append({'DELETED': 1, 'BLOCKNAME': block}) - logging.info("Successfully deleted all rules for block %s.", block) + self.markBlocksDeleted.execute(binds) + logging.info("Marked %d blocks as deleted in the database", len(binds)) - self.markBlocksDeleted.execute(binds) - logging.info("Marked %d blocks as deleted in the database", len(binds)) return def insertContainerRules(self): diff --git a/src/python/WMCore/Services/Rucio/Rucio.py b/src/python/WMCore/Services/Rucio/Rucio.py index f3896045eb..7e84cbe2cb 100644 --- a/src/python/WMCore/Services/Rucio/Rucio.py +++ b/src/python/WMCore/Services/Rucio/Rucio.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # coding=utf-8 """ Rucio Service class developed on top of the native Rucio Client @@ -244,10 +244,13 @@ def getReplicaInfoForBlocks(self, **kwargs): It used to mimic the same PhEDEx wrapper API, listing all the current locations where the provided input data is. - :kwargs: either a dataset or a block name has to be provided. Not both! + :param kwargs: Supported keyword arguments: + * block: List of either dataset or block names. Not both!, + * deep: Whether or not to lookup for replica info at file level :return: a list of dictionaries with replica information """ kwargs.setdefault("scope", "cms") + kwargs.setdefault("deep", False) blockNames = [] result = [] @@ -268,10 +271,17 @@ def getReplicaInfoForBlocks(self, **kwargs): inputDids.append({"scope": kwargs["scope"], "type": "DATASET", "name": block}) resultDict = {} - for item in self.cli.list_dataset_replicas_bulk(inputDids): - resultDict.setdefault(item['name'], []) - if item['state'].upper() == 'AVAILABLE': - resultDict[item['name']].append(item['rse']) + if kwargs['deep']: + for did in inputDids: + for item in self.cli.list_dataset_replicas(kwargs["scope"], did["name"], deep=kwargs['deep']): + resultDict.setdefault(item['name'], []) + if item['state'].upper() == 'AVAILABLE': + resultDict[item['name']].append(item['rse']) + else: + for item in self.cli.list_dataset_replicas_bulk(inputDids): + resultDict.setdefault(item['name'], []) + if item['state'].upper() == 'AVAILABLE': + resultDict[item['name']].append(item['rse']) # Finally, convert it to a list of dictionaries, like: # [{"name": "block_A", "replica": ["nodeA", "nodeB"]}, diff --git a/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py b/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py index 317434e317..4bcfe4e40c 100644 --- a/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py +++ b/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py @@ -224,6 +224,15 @@ def testGetReplicaInfoForBlocks(self): for item in res: self.assertTrue(len(item['replica']) > 0) + #Setting deep=True should yield the same results + res = self.myRucio.getReplicaInfoForBlocks(dataset=DSET, deep=True) + self.assertTrue(isinstance(res, list)) + self.assertTrue(len(res) >= 1) # Again, there are 11 replicas + blocks = [item['name'] for item in res] + self.assertTrue(BLOCK in blocks) + for item in res: + self.assertTrue(len(item['replica']) > 0) + def testGetPFN(self): """ Test `getPFN` method