Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt MSPileup data into PileupFetcher - wmagent branch #12202

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/python/Utils/Patterns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Patterns module provides set of CS patterns
"""

import re

class Singleton(type):
"""Implementation of Singleton class"""
Expand All @@ -11,3 +11,14 @@ def __call__(cls, *args, **kwargs):
cls._instances[cls] = \
super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


def getDomainName(urlStr):
"""
Given a URL string, return the domain name.
:param urlStr: URL string
:return: a string with the domain name (e.g. "cmsweb-prod")
"""
domainPattern = re.compile(r'https?://([^/]+)\.cern\.ch')
match = domainPattern.search(urlStr)
return match.group(1) if match else ""
72 changes: 55 additions & 17 deletions src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
of pileup files in the job sandbox for the dataset.

"""
from __future__ import print_function

from future.utils import viewitems

import datetime
import os
import hashlib
Expand All @@ -15,8 +11,10 @@
import logging
from json import JSONEncoder
import WMCore.WMSpec.WMStep as WMStep
from Utils.Patterns import getDomainName
from Utils.Utilities import encodeUnicodeToBytes
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs
from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.WMSpec.Steps.Fetchers.FetcherInterface import FetcherInterface

Expand All @@ -34,7 +32,6 @@ def __init__(self):
Prepare module setup
"""
super(PileupFetcher, self).__init__()
# FIXME: find a way to pass the Rucio account name to this fetcher module
self.rucioAcct = "wmcore_pileup"
self.rucio = None

Expand All @@ -52,41 +49,81 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
"BlockB": {"FileList": [], "PhEDExNodeName": []}, ....}
"""
resultDict = {}
# first, figure out which instance of MSPileup and Rucio to use
pileupInstance = getDomainName(dbsReader.dbsURL)
msPileupUrl = f"https://{pileupInstance}.cern.ch/ms-pileup/data/pileup"
# FIXME: this juggling with Rucio is tough! We can get away without it,
# but for that we would have to use testbed MSPileup against Prod Rucio
if pileupInstance == "cmsweb-prod" or pileupInstance == "cmsweb":
rucioAuthUrl, rucioUrl = "cms-rucio-auth", "cms-rucio"
else:
rucioAuthUrl, rucioUrl = "cms-rucio-auth-int", "cms-rucio-int"
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct,
authUrl=f"https://{rucioAuthUrl}.cern.ch",
hostUrl=f"http://{rucioUrl}.cern.ch")

# iterate over input pileup types (e.g. "cosmics", "minbias")
for pileupType in stepHelper.data.pileup.listSections_():
# the format here is: step.data.pileup.cosmics.dataset = [/some/data/set]
datasets = getattr(getattr(stepHelper.data.pileup, pileupType), "dataset")
# each dataset input can generally be a list, iterate over dataset names
blockDict = {}
for dataset in datasets:

# using the original dataset, resolve blocks, files and number of events with DBS
fCounter = 0
for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True):
blockDict.setdefault(fileInfo['block_name'], {'FileList': [],
'NumberOfEvents': 0,
'PhEDExNodeNames': []})
blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name'])
blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count']
fCounter += 1

self._getDatasetLocation(dataset, blockDict)
logging.info(f"Found {len(blockDict)} blocks in DBS for dataset {dataset} with {fCounter} files")
self._getDatasetLocation(dataset, blockDict, msPileupUrl)

resultDict[pileupType] = blockDict
return resultDict

def _getDatasetLocation(self, dset, blockDict):
def _getDatasetLocation(self, dset, blockDict, msPileupUrl):
"""
Given a dataset name, query PhEDEx or Rucio and resolve the block location
:param dset: string with the dataset name
:param blockDict: dictionary with DBS summary info
:param msPileupUrl: string with the MSPileup url
:return: update blockDict in place
"""
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct)
blockReplicas = self.rucio.getPileupLockedAndAvailable(dset, account=self.rucioAcct)
for blockName, blockLocation in viewitems(blockReplicas):
try:
blockDict[blockName]['PhEDExNodeNames'] = list(blockLocation)
except KeyError:
logging.warning("Block '%s' present in Rucio but not in DBS", blockName)
# fetch the pileup configuration from MSPileup
try:
queryDict = {'query': {'pileupName': dset},
'filters': ['pileupName', 'customName', 'containerFraction', 'currentRSEs']}
doc = getPileupDocs(msPileupUrl, queryDict, method='POST')[0]
msg = f'Pileup dataset {doc["pileupName"]} with:\n\tcustom name: {doc["customName"]},'
msg += f'\n\tcurrent RSEs: {doc["currentRSEs"]}\n\tand container fraction: {doc["containerFraction"]}'
logging.info(msg)
except Exception as ex:
logging.error(f'Error querying MSPileup for dataset {dset}. Details: {str(ex)}')
raise ex

# custom dataset name means there was a container fraction change, use different scope
puScope = 'cms'
if doc["customName"]:
dset = doc["customName"]
puScope = 'group.wmcore'

blockReplicas = self.rucio.getBlocksInContainer(container=dset, scope=puScope)
logging.info(f"Found {len(blockReplicas)} blocks in container {dset} for scope {puScope}")

# Finally, update blocks present in Rucio with the MSPileup currentRSEs.
# Blocks not present in Rucio - hence only in DBS - are meant to be removed.
for blockName in list(blockDict):
if blockName not in blockReplicas:
logging.warning(f"Block {blockName} present in DBS but not in Rucio. Removing it.")
blockDict.pop(blockName)
else:
blockDict[blockName]['PhEDExNodeNames'] = doc["currentRSEs"]
logging.info(f"Final pileup dataset {dset} has a total of {len(blockDict)} blocks.")

def _getCacheFilePath(self, stepHelper):

Expand Down Expand Up @@ -171,7 +208,8 @@ def createPileupConfigFile(self, helper):
"""
Stores pileup JSON configuration file in the working
directory / sandbox.

:param helper: WMStepHelper instance
:return: None
"""
if self._isCacheValid(helper):
# if file already exist don't make a new dbs call and overwrite the file.
Expand Down