Skip to content

Commit

Permalink
Merge pull request #11917 from amaltaro/fix-11703-bonus
Browse files Browse the repository at this point in the history
Aesthetic changes for storage.json fix provided in 11869
  • Loading branch information
amaltaro authored Feb 29, 2024
2 parents 5827a91 + 691c45e commit 35e53b6
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 379 deletions.
63 changes: 29 additions & 34 deletions src/python/WMCore/Storage/DeleteMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@
Based on StageOutMgr class
"""
from __future__ import print_function
import logging

from builtins import object
from future.utils import viewitems

import logging

from WMCore.Storage.Registry import retrieveStageOutImpl
# do we want seperate exceptions - for the moment no
from WMCore.Storage.StageOutError import StageOutFailure
from WMCore.Storage.StageOutError import StageOutInitError
from WMCore.Storage.RucioFileCatalog import storageJsonPath, readRFC
from WMCore.Storage.SiteLocalConfig import stageOutStr, loadSiteLocalConfig
from WMCore.Storage.StageOutError import StageOutFailure, StageOutInitError
from WMCore.WMException import WMException


from WMCore.Storage.SiteLocalConfig import stageOutStr,loadSiteLocalConfig
from WMCore.Storage.RucioFileCatalog import storageJsonPath,readRFC

class DeleteMgrError(WMException):
"""
_DeleteMgrError_
Expand All @@ -47,13 +42,13 @@ class DeleteMgr(object):
"""

def __init__(self, **overrideParams):

self.logger = overrideParams.pop("logger", logging.getLogger())
self.overrideConf = overrideParams

#pairs of stageOut and Rucio file catalog: [(stageOut1,rfc1),(stageOut2,rfc2), ...]
#a "stageOut" corresponds to a entry in the <stage-out> block in the site-local-config.xml, for example <method volume="KIT_dCache" protocol="WebDAV"/>
#a "rfc" is the correponding RucioFileCatalog instance (RucioFileCatalog.py) of this "stageOut"
# pairs of stageOut and Rucio file catalog: [(stageOut1,rfc1),(stageOut2,rfc2), ...]
# a "stageOut" corresponds to a entry in the <stage-out> block in the site-local-config.xml, for example <method volume="KIT_dCache" protocol="WebDAV"/>
# a "rfc" is the correponding RucioFileCatalog instance (RucioFileCatalog.py) of this "stageOut"
self.stageOuts_rfcs = []
self.numberOfRetries = 3
self.retryPauseTime = 600
Expand All @@ -75,17 +70,17 @@ def initialiseSiteConf(self):
Extract required information from site conf and TFC
"""

self.stageOuts = self.siteCfg.stageOuts

self.logger.info("There are %s stage out definitions.\n" % len(self.stageOuts))

for stageOut in self.stageOuts:
foundNoneAttr = False
for k in ['phedex-node','command','storageSite','volume','protocol']:
for k in ['phedex-node', 'command', 'storageSite', 'volume', 'protocol']:
v = stageOut.get(k)
if v is None:
msg = "Unable to retrieve "+k+" of this stageOut: \n"
msg = "Unable to retrieve " + k + " of this stageOut: \n"
msg += stageOutStr(stageOut) + "\n"
msg += "from site config file.\n"
msg += "Continue to the next stageOut.\n"
Expand All @@ -98,13 +93,13 @@ def initialiseSiteConf(self):
protocol = stageOut.get("protocol")
command = stageOut.get("command")
pnn = stageOut.get("phedex-node")

self.logger.info("\tStage out to : %s using: %s \n" % (pnn, command))

try:
aPath = storageJsonPath(self.siteCfg.siteName,self.siteCfg.subSiteName,storageSite)
rfc = readRFC(aPath,storageSite,volume,protocol)
self.stageOuts_rfcs.append((stageOut,rfc))
aPath = storageJsonPath(self.siteCfg.siteName, self.siteCfg.subSiteName, storageSite)
rfc = readRFC(aPath, storageSite, volume, protocol)
self.stageOuts_rfcs.append((stageOut, rfc))
msg = "Rucio File Catalog has been loaded:\n"
msg += str(self.stageOuts_rfcs[-1][1])
self.logger.info(msg)
Expand All @@ -116,7 +111,7 @@ def initialiseSiteConf(self):
self.logger.exception(msg)
continue

#no Rucio file catalog is initialized
# no Rucio file catalog is initialized
if not self.stageOuts_rfcs:
raise StageOutInitError("===>Can not initialize Rucio file catalog")

Expand All @@ -129,13 +124,13 @@ def initialiseOverride(self):
Extract and verify that the Override parameters are all present
"""

overrideConf = {
"command": None,
"option": None,
"phedex-node": None,
"lfn-prefix": None,
}
}

try:
overrideConf['command'] = self.overrideConf['command']
Expand All @@ -158,7 +153,7 @@ def initialiseOverride(self):
msg += " %s : %s\n" % (key, val)
msg += "=====================================================\n"
self.logger.info(msg)

return

def __call__(self, fileToDelete):
Expand All @@ -173,9 +168,9 @@ def __call__(self, fileToDelete):
lfn = fileToDelete['LFN']

deleteSuccess = False

if not self.overrideConf:
logging.info("===> Attempting to delete with %s stage outs", len(self.stageOuts))
self.logger.info("===> Attempting to delete with %s stage outs", len(self.stageOuts))
for stageOut_rfc in self.stageOuts_rfcs:
if not deleteSuccess:
try:
Expand All @@ -186,7 +181,7 @@ def __call__(self, fileToDelete):
except Exception as ex:
continue
else:
logging.info("===> Attempting stage outs from override")
self.logger.info("===> Attempting stage outs from override")
try:
fileToDelete['PNN'] = self.overrideConf['phedex-node']
fileToDelete['PFN'] = self.deleteLFN(lfn)
Expand All @@ -204,7 +199,7 @@ def __call__(self, fileToDelete):
msg = "Unable to delete file:\n"
msg += fileToDelete['LFN']
raise StageOutFailure(msg, **fileToDelete)

def deleteLFN(self, lfn, stageOut_rfc=None):
"""
deleteLFN
Expand All @@ -217,14 +212,14 @@ def deleteLFN(self, lfn, stageOut_rfc=None):
lfn-prefix - the LFN prefix to generate the PFN
phedex-node - the Name of the PNN to which the file is being xferred
"""
if not self.overrideConf:
if not self.overrideConf:
if stageOut_rfc is None:
msg = "Can not delete lfn because of missing stage out information (stageOut_rfc is None): \n %s" % lfn
raise StageOutFailure(msg, LFN=lfn)
#FIXME there is circular import that is why this module is imported here
from WMCore.Storage.StageOutMgr import searchRFC
# FIXME there is circular import that is why this module is imported here
from WMCore.Storage.StageOutMgr import searchRFC
command = stageOut_rfc[0]['command']
pfn = searchRFC(stageOut_rfc[1],lfn)
pfn = searchRFC(stageOut_rfc[1], lfn)
if pfn is None:
msg = "Unable to match lfn to pfn: \n %s" % lfn
raise StageOutFailure(msg, LFN=lfn, STAGEOUT=stageOutStr(stageOut_rfc[0]))
Expand Down
91 changes: 46 additions & 45 deletions src/python/WMCore/Storage/RucioFileCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,12 @@
a RucioFileCatalog instance that can be used to match LFNs to PFNs.
"""

from builtins import next, str, range

import json
import os
import re

import json
from builtins import str, range

from urllib.parse import urlsplit
from xml.dom.minidom import Document

from WMCore.Algorithms.ParseXMLFile import xmlFileToNode

class RucioFileCatalog(dict):
"""
Expand Down Expand Up @@ -128,6 +123,7 @@ def __str__(self):
result += "\n"
return result


def storageJsonPath(currentSite, currentSubsite, storageSite):
"""
Return a path to storage.json from site names
Expand All @@ -136,30 +132,32 @@ def storageJsonPath(currentSite, currentSubsite, storageSite):
:para storageSite: str, name of storage site for a stage-out
:return: str, a path to storage.json (/pathToStorageJson/storage.json)
"""
#get site config
siteConfigPath = os.getenv('SITECONFIG_PATH',None)
# get site config
siteConfigPath = os.getenv('SITECONFIG_PATH', None)
if not siteConfigPath:
raise RuntimeError('SITECONFIG_PATH is not defined')
subPath = ''
#the storage site is where jobs are executed so use local path given in SITECONFIG_PATH to locate storage.json
# the storage site is where jobs are executed so use local path given in SITECONFIG_PATH to locate storage.json
if currentSite == storageSite:
#it is a site (no defined subSite), storage.json is located at the path given in SITECONFIG_PATH
# it is a site (no defined subSite), storage.json is located at the path given in SITECONFIG_PATH
if currentSubsite is None:
subPath = siteConfigPath
#it is a subsite, move one level up
# it is a subsite, move one level up
else:
subPath = siteConfigPath + '/..'
#cross site
# cross site
else:
#it is a site (no defined subSite), move one level up
# it is a site (no defined subSite), move one level up
if currentSubsite is None:
subPath = siteConfigPath + '/../' + storageSite
#it is a subsite, move two levels up
# it is a subsite, move two levels up
else:
subPath = siteConfigPath + '/../../' + storageSite
pathToStorageDescription = subPath + '/storage.json'
pathToStorageDescription = os.path.normpath(os.path.realpath(pathToStorageDescription))#resolve symbolic link and relative path?
return pathToStorageDescription
pathToStorageDescription = os.path.normpath(
os.path.realpath(pathToStorageDescription)) # resolve symbolic link and relative path?
return pathToStorageDescription


def readRFC(filename, storageSite, volume, protocol):
"""
Expand All @@ -173,64 +171,67 @@ def readRFC(filename, storageSite, volume, protocol):

rfcInstance = RucioFileCatalog()
try:
with open(filename,encoding="utf-8") as jsonFile:
with open(filename, encoding="utf-8") as jsonFile:
jsElements = json.load(jsonFile)
except Exception as ex:
msg = "Error reading storage description file: %s\n" % filename
msg += str(ex)
raise RuntimeError(msg)
#now loop over elements, select the one matched with inputs (storageSite, volume, protocol) and fill lfn-to-pfn
# now loop over elements, select the one matched with inputs (storageSite, volume, protocol) and fill lfn-to-pfn
for jsElement in jsElements:
#check to see if the storageSite and volume matchs with "site" and "volume" in storage.json
if jsElement['site'] == storageSite and jsElement['volume'] == volume:
# check to see if the storageSite and volume matchs with "site" and "volume" in storage.json
if jsElement['site'] == storageSite and jsElement['volume'] == volume:
rfcInstance.preferredProtocol = protocol
#now loop over protocols to add all mappings (needed for chained rule cases)
# now loop over protocols to add all mappings (needed for chained rule cases)
for prot in jsElement['protocols']:
#check if prefix is in protocol block
# check if prefix is in protocol block
if 'prefix' in prot.keys():
#lfn-to-pfn
match = '/(.*)' #match all
result = prot['prefix']+'/$1'
#prefix case should not be chained
chain = None
# lfn-to-pfn
match = '/(.*)' # match all
result = prot['prefix'] + '/$1'
# prefix case should not be chained
chain = None
rfcInstance.addMapping(str(prot['protocol']), str(match), str(result), chain, 'lfn-to-pfn')
#pfn-to-lfn
match = prot['prefix']+'/(.*)'
# pfn-to-lfn
match = prot['prefix'] + '/(.*)'
result = '/$1'
rfcInstance.addMapping(str(prot['protocol']), str(match), str(result), chain, 'pfn-to-lfn')
#here is rules
# here is rules
else:
#loop over rules
# loop over rules
for rule in prot['rules']:
match = rule['lfn']
result = rule['pfn']
chain = rule.get('chain')
rfcInstance.addMapping(str(prot['protocol']), str(match), str(result), chain, 'lfn-to-pfn')
#pfn-to-lfn
match = rule['pfn'].replace('$1','(.*)')
#Update this if pfn-to-lfn is used extensively somewhere. We want lfn starts with '/abc' so remove all characters of regular expressions!!!
result = rule['lfn'].replace('/+','/').replace('^/','/')
#now replace anything inside () with $1, for example (.*) --> $1, (store/.*) --> $1
result = re.sub('\(.*\)','$1',result)
# pfn-to-lfn
match = rule['pfn'].replace('$1', '(.*)')
# Update this if pfn-to-lfn is used extensively somewhere. We want lfn starts with '/abc' so remove all characters of regular expressions!!!
result = rule['lfn'].replace('/+', '/').replace('^/', '/')
# now replace anything inside () with $1, for example (.*) --> $1, (store/.*) --> $1
result = re.sub('\(.*\)', '$1', result)
rfcInstance.addMapping(str(prot['protocol']), str(match), str(result), chain, 'pfn-to-lfn')

return rfcInstance

def rseName(currentSite,currentSubsite,storageSite,volume):

def rseName(currentSite, currentSubsite, storageSite, volume):
"""
Return Rucio storage element name, for example https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L39
Return Rucio storage element name, for example:
https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L39
:currentSite is the site where jobs are executing
:currentSubsite is the sub site if jobs are running here
:storageSite is the site for storage
:volume is the volume name, for example https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L3
:volume is the volume name, for example:
https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L3
"""
rse = None
storageJsonName = storageJsonPath(currentSite,currentSubsite,storageSite)
storageJsonName = storageJsonPath(currentSite, currentSubsite, storageSite)
try:
with open(storageJsonName,encoding="utf-8") as jsonFile:
with open(storageJsonName, encoding="utf-8") as jsonFile:
jsElements = json.load(jsonFile)
except Exception as ex:
msg = "RucioFileCatalog.py:rseName() Error reading storage.json: %s\n" % storageJsonName
msg = "RucioFileCatalog.py:rseName() Error reading storage.json: %s\n" % storageJsonName
msg += str(ex)
raise RuntimeError(msg)
for jsElement in jsElements:
Expand Down
Loading

0 comments on commit 35e53b6

Please sign in to comment.