3
3
of pileup files in the job sandbox for the dataset.
4
4
5
5
"""
6
- from __future__ import print_function
7
-
8
- from future .utils import viewitems
9
-
10
6
import datetime
11
7
import os
12
8
import hashlib
15
11
import logging
16
12
from json import JSONEncoder
17
13
import WMCore .WMSpec .WMStep as WMStep
14
+ from Utils .Patterns import getDomainName
18
15
from Utils .Utilities import encodeUnicodeToBytes
19
16
from WMCore .Services .DBS .DBSReader import DBSReader
17
+ from WMCore .Services .MSPileup .MSPileupUtils import getPileupDocs
20
18
from WMCore .Services .Rucio .Rucio import Rucio
21
19
from WMCore .WMSpec .Steps .Fetchers .FetcherInterface import FetcherInterface
22
20
@@ -34,7 +32,6 @@ def __init__(self):
34
32
Prepare module setup
35
33
"""
36
34
super (PileupFetcher , self ).__init__ ()
37
- # FIXME: find a way to pass the Rucio account name to this fetcher module
38
35
self .rucioAcct = "wmcore_pileup"
39
36
self .rucio = None
40
37
@@ -52,41 +49,81 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
52
49
"BlockB": {"FileList": [], "PhEDExNodeName": []}, ....}
53
50
"""
54
51
resultDict = {}
52
+ # first, figure out which instance of MSPileup and Rucio to use
53
+ pileupInstance = getDomainName (dbsReader .dbsURL )
54
+ msPileupUrl = f"https://{ pileupInstance } .cern.ch/ms-pileup/data/pileup"
55
+ # FIXME: this juggling with Rucio is tough! We can get away without it,
56
+ # but for that we would have to use testbed MSPileup against Prod Rucio
57
+ if pileupInstance == "cmsweb-prod" or pileupInstance == "cmsweb" :
58
+ rucioAuthUrl , rucioUrl = "cms-rucio-auth" , "cms-rucio"
59
+ else :
60
+ rucioAuthUrl , rucioUrl = "cms-rucio-auth-int" , "cms-rucio-int"
61
+ # initialize Rucio here to avoid this authentication on T0-WMAgent
62
+ self .rucio = Rucio (self .rucioAcct ,
63
+ authUrl = f"https://{ rucioAuthUrl } .cern.ch" ,
64
+ hostUrl = f"http://{ rucioUrl } .cern.ch" )
65
+
55
66
# iterate over input pileup types (e.g. "cosmics", "minbias")
56
67
for pileupType in stepHelper .data .pileup .listSections_ ():
57
68
# the format here is: step.data.pileup.cosmics.dataset = [/some/data/set]
58
69
datasets = getattr (getattr (stepHelper .data .pileup , pileupType ), "dataset" )
59
70
# each dataset input can generally be a list, iterate over dataset names
60
71
blockDict = {}
61
72
for dataset in datasets :
62
-
73
+ # using the original dataset, resolve blocks, files and number of events with DBS
74
+ fCounter = 0
63
75
for fileInfo in dbsReader .getFileListByDataset (dataset = dataset , detail = True ):
64
76
blockDict .setdefault (fileInfo ['block_name' ], {'FileList' : [],
65
77
'NumberOfEvents' : 0 ,
66
78
'PhEDExNodeNames' : []})
67
79
blockDict [fileInfo ['block_name' ]]['FileList' ].append (fileInfo ['logical_file_name' ])
68
80
blockDict [fileInfo ['block_name' ]]['NumberOfEvents' ] += fileInfo ['event_count' ]
81
+ fCounter += 1
69
82
70
- self ._getDatasetLocation (dataset , blockDict )
83
+ logging .info (f"Found { len (blockDict )} blocks in DBS for dataset { dataset } with { fCounter } files" )
84
+ self ._getDatasetLocation (dataset , blockDict , msPileupUrl )
71
85
72
86
resultDict [pileupType ] = blockDict
73
87
return resultDict
74
88
75
- def _getDatasetLocation (self , dset , blockDict ):
89
+ def _getDatasetLocation (self , dset , blockDict , msPileupUrl ):
76
90
"""
77
91
Given a dataset name, query PhEDEx or Rucio and resolve the block location
78
92
:param dset: string with the dataset name
79
93
:param blockDict: dictionary with DBS summary info
94
+ :param msPileupUrl: string with the MSPileup url
80
95
:return: update blockDict in place
81
96
"""
82
- # initialize Rucio here to avoid this authentication on T0-WMAgent
83
- self .rucio = Rucio (self .rucioAcct )
84
- blockReplicas = self .rucio .getPileupLockedAndAvailable (dset , account = self .rucioAcct )
85
- for blockName , blockLocation in viewitems (blockReplicas ):
86
- try :
87
- blockDict [blockName ]['PhEDExNodeNames' ] = list (blockLocation )
88
- except KeyError :
89
- logging .warning ("Block '%s' present in Rucio but not in DBS" , blockName )
97
+ # fetch the pileup configuration from MSPileup
98
+ try :
99
+ queryDict = {'query' : {'pileupName' : dset },
100
+ 'filters' : ['pileupName' , 'customName' , 'containerFraction' , 'currentRSEs' ]}
101
+ doc = getPileupDocs (msPileupUrl , queryDict , method = 'POST' )[0 ]
102
+ msg = f'Pileup dataset { doc ["pileupName" ]} with:\n \t custom name: { doc ["customName" ]} ,'
103
+ msg += f'\n \t current RSEs: { doc ["currentRSEs" ]} \n \t and container fraction: { doc ["containerFraction" ]} '
104
+ logging .info (msg )
105
+ except Exception as ex :
106
+ logging .error (f'Error querying MSPileup for dataset { dset } . Details: { str (ex )} ' )
107
+ raise ex
108
+
109
+ # custom dataset name means there was a container fraction change, use different scope
110
+ puScope = 'cms'
111
+ if doc ["customName" ]:
112
+ dset = doc ["customName" ]
113
+ puScope = 'group.wmcore'
114
+
115
+ blockReplicas = self .rucio .getBlocksInContainer (container = dset , scope = puScope )
116
+ logging .info (f"Found { len (blockReplicas )} blocks in container { dset } for scope { puScope } " )
117
+
118
+ # Finally, update blocks present in Rucio with the MSPileup currentRSEs.
119
+ # Blocks not present in Rucio - hence only in DBS - are meant to be removed.
120
+ for blockName in list (blockDict ):
121
+ if blockName not in blockReplicas :
122
+ logging .warning (f"Block { blockName } present in DBS but not in Rucio. Removing it." )
123
+ blockDict .pop (blockName )
124
+ else :
125
+ blockDict [blockName ]['PhEDExNodeNames' ] = doc ["currentRSEs" ]
126
+ logging .info (f"Final pileup dataset { dset } has a total of { len (blockDict )} blocks." )
90
127
91
128
def _getCacheFilePath (self , stepHelper ):
92
129
@@ -171,7 +208,8 @@ def createPileupConfigFile(self, helper):
171
208
"""
172
209
Stores pileup JSON configuration file in the working
173
210
directory / sandbox.
174
-
211
+ :param helper: WMStepHelper instance
212
+ :return: None
175
213
"""
176
214
if self ._isCacheValid (helper ):
177
215
# if file already exist don't make a new dbs call and overwrite the file.
0 commit comments