Skip to content

Commit

Permalink
Merge pull request #11995 from vkuznet/fix-issue-19994
Browse files Browse the repository at this point in the history
Check status of attachDIDs API and act upon it
  • Loading branch information
amaltaro authored Jun 6, 2024
2 parents c9d0c07 + 21f80fa commit 1169796
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 19 deletions.
12 changes: 12 additions & 0 deletions src/python/Utils/IteratorTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ def grouper(iterable, n):
return iter(lambda: list(islice(iterable, n)), [])


def getChunk(arr, step):
"""
Return chunk of entries from given array and step, it is similar in behavior to grouper
function but instead of returning new list it provides a generator iterable object.
:param arr: input array of data
:param step: step to iterate
:return: generator, set of slices with number of entries equal to step of iteration
"""
for i in range(0, len(arr), step):
yield arr[i:i + step]


def flattenList(doubleList):
"""
Make flat a list of lists.
Expand Down
7 changes: 5 additions & 2 deletions src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def partialPileupTask(self):
newRules = []
self.logger.info("Attaching %d blocks to custom pileup name: %s", len(customBlocks), cname)
if len(customBlocks):
self.rucioClient.attachDIDs(None, cname, customBlocks, scope=self.customRucioScope)
status = self.rucioClient.attachDIDs(None, cname, customBlocks, scope=self.customRucioScope)
if not status:
self.logger.error("Failed to attach DIDs to custom container %s with scope %s", cname, self.customRucioScope)
continue
for rse in doc['expectedRSEs']:
# create new rule for custom DID using pileup document rse
ruleIds = self.rucioClient.createReplicationRule(cname, rse, scope=self.customRucioScope)
Expand Down Expand Up @@ -264,7 +267,7 @@ def partialPileupTask(self):

# update MSPileup document in MongoDB
# here we update our new document in database with document validation
self.mgr.updatePileupDocumentInDatabase(doc, rseList=doc['currentRSEs'])
self.mgr.updatePileupDocumentInDatabase(doc, rseList=doc['expectedRSEs'])
self.logger.info("Pileup name %s had its fraction updated in the partialPileupTask function", doc['pileupName'])
except Exception as exp:
msg = f"Failed to update MSPileup document, {exp}"
Expand Down
43 changes: 27 additions & 16 deletions src/python/WMCore/Services/Rucio/Rucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DataIdentifierAlreadyExists, DuplicateContent, InvalidRSEExpression,
UnsupportedOperation, FileAlreadyExists, RuleNotFound, RSENotFound)
from Utils.MemoryCache import MemoryCache
from Utils.IteratorTools import grouper
from WMCore.Services.Rucio.RucioUtils import (validateMetaData, weightedChoice,
isTapeRSE, dropTapeRSEs)
from WMCore.WMException import WMException
Expand Down Expand Up @@ -346,7 +347,7 @@ def createBlock(self, name, scope='cms', attach=True, **kwargs):
response = self.attachDIDs(kwargs.get('rse'), container, name, scope)
return response

def attachDIDs(self, rse, superDID, dids, scope='cms'):
def attachDIDs(self, rse, superDID, dids, scope='cms', chunkSize=1000):
"""
_attachDIDs_
Expand All @@ -356,28 +357,38 @@ def attachDIDs(self, rse, superDID, dids, scope='cms'):
then it's a container name; if attaching files, then it's a block name)
:param dids: either a string or a list of data identifiers (can be block or files)
:param scope: string with the scope name
:param chunkSize: maximum number of dids to be attached in any single Rucio server call
:return: a boolean to represent whether it succeeded or not
"""
if not isinstance(dids, list):
dids = [dids]
# NOTE: the attaching dids do not create new container within a scope
# and it is safe to use cms scope for it
dids = [{'scope': 'cms', 'name': did} for did in dids]
alldids = [{'scope': 'cms', 'name': did} for did in sorted(dids)]

# report if we use chunk size in rucio attach_dids API call
if len(alldids) > chunkSize:
self.logger.info("Attaching a total of %d DIDs in chunk size of: %d", len(alldids), chunkSize)

for dids in grouper(alldids, chunkSize):
response = False
try:
response = self.cli.attach_dids(scope, superDID, dids=dids, rse=rse)
except DuplicateContent:
self.logger.warning("Dids: %s already attached to: %s", dids, superDID)
response = True
except FileAlreadyExists:
self.logger.warning("Failed to attach files already existent on block: %s", superDID)
response = True
except DataIdentifierNotFound:
self.logger.error("Failed to attach dids: %s. Parent DID %s does not exist.", dids, superDID)
except Exception as ex:
self.logger.error("Exception attaching %s dids to: %s. Error: %s. First 10 dids: %s",
len(dids), superDID, str(ex), dids[:10])
if not response:
# if we had failure with specific chunk of dids we'll return immediately
return response

response = False
try:
response = self.cli.attach_dids(scope, superDID, dids=dids, rse=rse)
except DuplicateContent:
self.logger.warning("Dids: %s already attached to: %s", dids, superDID)
response = True
except FileAlreadyExists:
self.logger.warning("Failed to attach files already existent on block: %s", superDID)
response = True
except DataIdentifierNotFound:
self.logger.error("Failed to attach dids: %s. Parent DID %s does not exist.", dids, superDID)
except Exception as ex:
self.logger.error("Exception attaching dids: %s to: %s. Error: %s",
dids, superDID, str(ex))
return response

def createReplicas(self, rse, files, block, scope='cms', ignoreAvailability=True):
Expand Down
9 changes: 8 additions & 1 deletion test/python/Utils_t/IteratorTools_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import itertools
import unittest

from Utils.IteratorTools import grouper, flattenList, makeListElementsUnique
from Utils.IteratorTools import grouper, flattenList, makeListElementsUnique, getChunk


class IteratorToolsTest(unittest.TestCase):
Expand Down Expand Up @@ -73,6 +73,13 @@ def testNoDupListOfLists(self):
data = [(1, 2), (1, 3), (2, 4), (2, 5), (1, 3), (2, 5)]
self.assertListEqual([(1, 2), (1, 3), (2, 4), (2, 5)], makeListElementsUnique(data))

def testGetChunk(self):
"""
Test the `getChunk` function.
"""
arr = range(10)
for chunk in getChunk(arr, 2):
self.assertTrue(len(chunk) == 2)

if __name__ == '__main__':
unittest.main()

0 comments on commit 1169796

Please sign in to comment.