Skip to content

Commit

Permalink
Merge pull request #10902 from germanfgv/deepReplicaInfo
Browse files Browse the repository at this point in the history
Adding --deep option to getReplicaInfoForBlocks
  • Loading branch information
todor-ivanov authored Dec 2, 2021
2 parents ba8e8c3 + 7ec92e0 commit d2b9dd7
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 64 deletions.
125 changes: 67 additions & 58 deletions src/python/WMComponent/RucioInjector/RucioInjectorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from WMCore.Services.Rucio.Rucio import Rucio, WMRucioException, RUCIO_VALID_PROJECT
from WMCore.WMException import WMException
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread
from Utils.IteratorTools import grouper


class RucioInjectorException(WMException):
Expand Down Expand Up @@ -78,6 +79,9 @@ def __init__(self, config):
authUrl=config.RucioInjector.rucioAuthUrl,
configDict={'logger': self.logger})

self.useDsetReplicaDeep = getattr(config.RucioInjector, "useDsetReplicaDeep", False)
self.delBlockSlicesize = getattr(config.RucioInjector, "delBlockSlicesize", 100)

# metadata dictionary information to be added to block/container rules
# cannot be a python dictionary, but a JSON string instead
self.metaData = json.dumps(dict(agentHost=config.Agent.hostName,
Expand Down Expand Up @@ -341,69 +345,74 @@ def deleteBlocks(self):

logging.info("Found %d candidate blocks for rule deletion", len(blockDict))

blocksToDelete = []
containerDict = {}
# Populate containerDict, assigning each block to its correspondant container
for blockName in blockDict:
container = blockDict[blockName]['dataset']
# If the container is not in the dictionary, create a new entry for it
if container not in containerDict:
# Set of sites to which the container needs to be transferred
sites = set(x.replace("_MSS", "_Tape") for x in blockDict[blockName]['sites'])
containerDict[container] = {'blocks': [], 'rse': sites}
containerDict[container]['blocks'].append(blockName)

for contName in containerDict:
cont = containerDict[contName]

# Checks if the container is not requested in any sites.
# This should never be triggered, but better safe than sorry
if not cont['rse']:
logging.warning("No rules for container: %s. Its blocks won't be deleted.", contName)
continue
for blocksSlice in grouper(blockDict, self.delBlockSlicesize):
logging.info("Handeling %d candidate blocks", len(blocksSlice))
containerDict = {}
# Populate containerDict, assigning each block to its correspondant container
for blockName in blocksSlice:
container = blockDict[blockName]['dataset']
# If the container is not in the dictionary, create a new entry for it
if container not in containerDict:
# Set of sites to which the container needs to be transferred
sites = set(x.replace("_MSS", "_Tape") for x in blockDict[blockName]['sites'])
containerDict[container] = {'blocks': [], 'rse': sites}
containerDict[container]['blocks'].append(blockName)

blocksToDelete = []
for contName in containerDict:
cont = containerDict[contName]

# Checks if the container is not requested in any sites.
# This should never be triggered, but better safe than sorry
if not cont['rse']:
logging.warning("No rules for container: %s. Its blocks won't be deleted.", contName)
continue

try:
# Get RSE in which each block is available
availableRSEs = self.rucio.getReplicaInfoForBlocks(block=cont['blocks'])
except Exception as exc:
msg = "Failed to get replica info for blocks in container: %s.\n" % contName
msg += "Will retry again in the next cycle. Error: %s" % str(exc)
logging.error(msg)
continue
try:
# Get RSE in which each block is available
availableRSEs = self.rucio.getReplicaInfoForBlocks(block=cont['blocks'], deep=self.useDsetReplicaDeep)
except Exception as exc:
msg = "Failed to get replica info for blocks in container: %s.\n" % contName
msg += "Will retry again in the next cycle. Error: %s" % str(exc)
logging.error(msg)
continue

for blockRSEs in availableRSEs:
# If block is available at every RSE its container needs to be transferred, the block can be deleted
blockSites = set(blockRSEs['replica'])
if cont['rse'].issubset(blockSites):
blocksToDelete.append(blockRSEs['name'])
for blockRSEs in availableRSEs:
# If block is available at every RSE its container needs to be transferred, the block can be deleted
blockSites = set(blockRSEs['replica'])
logging.debug("BlockName: %s", blockRSEs['name'])
logging.debug("Needed: %s / Available: %s", str(cont['rse']), str(blockSites))
if cont['rse'].issubset(blockSites):
blocksToDelete.append(blockRSEs['name'])

# Delete agent created rules locking the block
binds = []
logging.info("Going to delete %d block rules", len(blocksToDelete))
for block in blocksToDelete:
try:
rules = self.rucio.listDataRules(block, scope=self.scope, account=self.rucioAcct)
except WMRucioException as exc:
logging.warning("Unable to retrieve replication rules for block: %s. Will retry in the next cycle.", block)
else:
if not rules:
logging.info("Block rule for: %s has been deleted by previous cycles", block)
binds.append({'DELETED': 1, 'BLOCKNAME': block})
continue
for rule in rules:
deletedRules = 0
if self.rucio.deleteRule(rule['id'], purgeReplicas=True):
logging.info("Successfully deleted rule: %s, for block %s.", rule['id'], block)
deletedRules += 1
else:
logging.warning("Failed to delete rule: %s, for block %s. Will retry in the next cycle.", rule['id'], block)
if deletedRules == len(rules):
binds.append({'DELETED': 1, 'BLOCKNAME': block})
logging.info("Successfully deleted all rules for block %s.", block)

# Delete agent created rules locking the block
binds = []
logging.info("Going to delete %d block rules", len(blocksToDelete))
for block in blocksToDelete:
try:
rules = self.rucio.listDataRules(block, scope=self.scope, account=self.rucioAcct)
except WMRucioException as exc:
logging.warning("Unable to retrieve replication rules for block: %s. Will retry in the next cycle.", block)
else:
if not rules:
logging.info("Block rule for: %s has been deleted by previous cycles", block)
binds.append({'DELETED': 1, 'BLOCKNAME': block})
continue
for rule in rules:
deletedRules = 0
if self.rucio.deleteRule(rule['id'], purgeReplicas=True):
logging.info("Successfully deleted rule: %s, for block %s.", rule['id'], block)
deletedRules += 1
else:
logging.warning("Failed to delete rule: %s, for block %s. Will retry in the next cycle.", rule['id'], block)
if deletedRules == len(rules):
binds.append({'DELETED': 1, 'BLOCKNAME': block})
logging.info("Successfully deleted all rules for block %s.", block)

self.markBlocksDeleted.execute(binds)
logging.info("Marked %d blocks as deleted in the database", len(binds))

self.markBlocksDeleted.execute(binds)
logging.info("Marked %d blocks as deleted in the database", len(binds))
return

def insertContainerRules(self):
Expand Down
22 changes: 16 additions & 6 deletions src/python/WMCore/Services/Rucio/Rucio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# coding=utf-8
"""
Rucio Service class developed on top of the native Rucio Client
Expand Down Expand Up @@ -244,10 +244,13 @@ def getReplicaInfoForBlocks(self, **kwargs):
It used to mimic the same PhEDEx wrapper API, listing all the
current locations where the provided input data is.
:kwargs: either a dataset or a block name has to be provided. Not both!
:param kwargs: Supported keyword arguments:
* block: List of either dataset or block names. Not both!,
* deep: Whether or not to lookup for replica info at file level
:return: a list of dictionaries with replica information
"""
kwargs.setdefault("scope", "cms")
kwargs.setdefault("deep", False)

blockNames = []
result = []
Expand All @@ -268,10 +271,17 @@ def getReplicaInfoForBlocks(self, **kwargs):
inputDids.append({"scope": kwargs["scope"], "type": "DATASET", "name": block})

resultDict = {}
for item in self.cli.list_dataset_replicas_bulk(inputDids):
resultDict.setdefault(item['name'], [])
if item['state'].upper() == 'AVAILABLE':
resultDict[item['name']].append(item['rse'])
if kwargs['deep']:
for did in inputDids:
for item in self.cli.list_dataset_replicas(kwargs["scope"], did["name"], deep=kwargs['deep']):
resultDict.setdefault(item['name'], [])
if item['state'].upper() == 'AVAILABLE':
resultDict[item['name']].append(item['rse'])
else:
for item in self.cli.list_dataset_replicas_bulk(inputDids):
resultDict.setdefault(item['name'], [])
if item['state'].upper() == 'AVAILABLE':
resultDict[item['name']].append(item['rse'])

# Finally, convert it to a list of dictionaries, like:
# [{"name": "block_A", "replica": ["nodeA", "nodeB"]},
Expand Down
9 changes: 9 additions & 0 deletions test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ def testGetReplicaInfoForBlocks(self):
for item in res:
self.assertTrue(len(item['replica']) > 0)

#Setting deep=True should yield the same results
res = self.myRucio.getReplicaInfoForBlocks(dataset=DSET, deep=True)
self.assertTrue(isinstance(res, list))
self.assertTrue(len(res) >= 1) # Again, there are 11 replicas
blocks = [item['name'] for item in res]
self.assertTrue(BLOCK in blocks)
for item in res:
self.assertTrue(len(item['replica']) > 0)

def testGetPFN(self):
"""
Test `getPFN` method
Expand Down

0 comments on commit d2b9dd7

Please sign in to comment.