diff --git a/src/python/Utils/Patterns.py b/src/python/Utils/Patterns.py index 2947c94928..1b930fb731 100644 --- a/src/python/Utils/Patterns.py +++ b/src/python/Utils/Patterns.py @@ -1,7 +1,7 @@ """ Patterns module provides set of CS patterns """ - +import re class Singleton(type): """Implementation of Singleton class""" @@ -11,3 +11,14 @@ def __call__(cls, *args, **kwargs): cls._instances[cls] = \ super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] + + +def getDomainName(urlStr): + """ + Given a URL string, return the domain name. + :param urlStr: URL string + :return: a string with the domain name (e.g. "cmsweb-prod") + """ + domainPattern = re.compile(r'https?://([^/]+)\.cern\.ch') + match = domainPattern.search(urlStr) + return match.group(1) if match else "" diff --git a/src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py b/src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py index eacf4c69fa..a4410ebe54 100644 --- a/src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py +++ b/src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py @@ -3,10 +3,6 @@ of pileup files in the job sandbox for the dataset. """ -from __future__ import print_function - -from future.utils import viewitems - import datetime import os import hashlib @@ -15,8 +11,10 @@ import logging from json import JSONEncoder import WMCore.WMSpec.WMStep as WMStep +from Utils.Patterns import getDomainName from Utils.Utilities import encodeUnicodeToBytes from WMCore.Services.DBS.DBSReader import DBSReader +from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs from WMCore.Services.Rucio.Rucio import Rucio from WMCore.WMSpec.Steps.Fetchers.FetcherInterface import FetcherInterface @@ -34,7 +32,6 @@ def __init__(self): Prepare module setup """ super(PileupFetcher, self).__init__() - # FIXME: find a way to pass the Rucio account name to this fetcher module self.rucioAcct = "wmcore_pileup" self.rucio = None @@ -52,6 +49,20 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader): "BlockB": {"FileList": [], "PhEDExNodeName": []}, ....} """ resultDict = {} + # first, figure out which instance of MSPileup and Rucio to use + pileupInstance = getDomainName(dbsReader.dbsURL) + msPileupUrl = f"https://{pileupInstance}.cern.ch/ms-pileup/data/pileup" + # FIXME: this juggling with Rucio is tough! We can get away without it, + # but for that we would have to use testbed MSPileup against Prod Rucio + if pileupInstance == "cmsweb-prod" or pileupInstance == "cmsweb": + rucioAuthUrl, rucioUrl = "cms-rucio-auth", "cms-rucio" + else: + rucioAuthUrl, rucioUrl = "cms-rucio-auth-int", "cms-rucio-int" + # initialize Rucio here to avoid this authentication on T0-WMAgent + self.rucio = Rucio(self.rucioAcct, + authUrl=f"https://{rucioAuthUrl}.cern.ch", + hostUrl=f"http://{rucioUrl}.cern.ch") + # iterate over input pileup types (e.g. "cosmics", "minbias") for pileupType in stepHelper.data.pileup.listSections_(): # the format here is: step.data.pileup.cosmics.dataset = [/some/data/set] @@ -59,34 +70,60 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader): # each dataset input can generally be a list, iterate over dataset names blockDict = {} for dataset in datasets: - + # using the original dataset, resolve blocks, files and number of events with DBS + fCounter = 0 for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True): blockDict.setdefault(fileInfo['block_name'], {'FileList': [], 'NumberOfEvents': 0, 'PhEDExNodeNames': []}) blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name']) blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count'] + fCounter += 1 - self._getDatasetLocation(dataset, blockDict) + logging.info(f"Found {len(blockDict)} blocks in DBS for dataset {dataset} with {fCounter} files") + self._getDatasetLocation(dataset, blockDict, msPileupUrl) resultDict[pileupType] = blockDict return resultDict - def _getDatasetLocation(self, dset, blockDict): + def _getDatasetLocation(self, dset, blockDict, msPileupUrl): """ Given a dataset name, query PhEDEx or Rucio and resolve the block location :param dset: string with the dataset name :param blockDict: dictionary with DBS summary info + :param msPileupUrl: string with the MSPileup url :return: update blockDict in place """ - # initialize Rucio here to avoid this authentication on T0-WMAgent - self.rucio = Rucio(self.rucioAcct) - blockReplicas = self.rucio.getPileupLockedAndAvailable(dset, account=self.rucioAcct) - for blockName, blockLocation in viewitems(blockReplicas): - try: - blockDict[blockName]['PhEDExNodeNames'] = list(blockLocation) - except KeyError: - logging.warning("Block '%s' present in Rucio but not in DBS", blockName) + # fetch the pileup configuration from MSPileup + try: + queryDict = {'query': {'pileupName': dset}, + 'filters': ['pileupName', 'customName', 'containerFraction', 'currentRSEs']} + doc = getPileupDocs(msPileupUrl, queryDict, method='POST')[0] + msg = f'Pileup dataset {doc["pileupName"]} with:\n\tcustom name: {doc["customName"]},' + msg += f'\n\tcurrent RSEs: {doc["currentRSEs"]}\n\tand container fraction: {doc["containerFraction"]}' + logging.info(msg) + except Exception as ex: + logging.error(f'Error querying MSPileup for dataset {dset}. Details: {str(ex)}') + raise ex + + # custom dataset name means there was a container fraction change, use different scope + puScope = 'cms' + if doc["customName"]: + dset = doc["customName"] + puScope = 'group.wmcore' + + blockReplicas = self.rucio.getBlocksInContainer(container=dset, scope=puScope) + logging.info(f"Found {len(blockReplicas)} blocks in container {dset} for scope {puScope}") + + # Finally, update blocks present in Rucio with the MSPileup currentRSEs. + # Blocks not present in Rucio - hence only in DBS - are meant to be removed. + for blockName in list(blockDict): + if blockName not in blockReplicas: + logging.warning(f"Block {blockName} present in DBS but not in Rucio. Removing it.") + blockDict.pop(blockName) + else: + blockDict[blockName]['PhEDExNodeNames'] = doc["currentRSEs"] + logging.info(f"Final pileup dataset {dset} has a total of {len(blockDict)} blocks.") def _getCacheFilePath(self, stepHelper): @@ -171,7 +208,8 @@ def createPileupConfigFile(self, helper): """ Stores pileup JSON configuration file in the working directory / sandbox. - + :param helper: WMStepHelper instance + :return: None """ if self._isCacheValid(helper): # if file already exist don't make a new dbs call and overwrite the file.