diff --git a/src/python/WMCore/WMSpec/StdSpecs/StdBase.py b/src/python/WMCore/WMSpec/StdSpecs/StdBase.py index 4cd29ec5a0..d4d88687fe 100644 --- a/src/python/WMCore/WMSpec/StdSpecs/StdBase.py +++ b/src/python/WMCore/WMSpec/StdSpecs/StdBase.py @@ -841,17 +841,24 @@ def addDQMHarvestTask(self, parentTask, parentOutputModuleName, uploadProxy=None return - def setupPileup(self, task, pileupConfig): + def setupPileup(self, task, pileupConfig, stepName=None): """ _setupPileup_ - Setup pileup for every CMSSW step in the task. + Setup pileup for every CMSSW step in the task, unless a stepName + is given - StepChain case - then only setup pileup for that specific + step (cmsRun1, cmsRun2, etc). pileupConfig has the following data structure: {'mc': ['/mc_pd/procds/tier'], 'data': ['/data_pd/procds/tier']} """ for puType, puList in pileupConfig.items(): task.setInputPileupDatasets(puList) + if stepName: + stepHelper = task.getStepHelper(stepName) + stepHelper.setupPileup(pileupConfig, self.dbsUrl) + return + for stepName in task.listAllStepNames(): step = task.getStep(stepName) if step.stepType() != "CMSSW": diff --git a/src/python/WMCore/WMSpec/StdSpecs/StepChain.py b/src/python/WMCore/WMSpec/StdSpecs/StepChain.py index 02709c5581..9438c9114a 100644 --- a/src/python/WMCore/WMSpec/StdSpecs/StepChain.py +++ b/src/python/WMCore/WMSpec/StdSpecs/StepChain.py @@ -210,7 +210,7 @@ def setupGeneratorTask(self, task, taskConf): """ _setupGeneratorTask_ - Set up an initial generator task. + Set up an initial generator task for the top level step (cmsRun1) """ configCacheID = taskConf['ConfigCacheID'] splitAlgorithm = taskConf["SplittingAlgo"] @@ -226,7 +226,7 @@ def setupGeneratorTask(self, task, taskConf): taskConf=taskConf) if taskConf["PileupConfig"]: - self.setupPileup(task, taskConf['PileupConfig']) + self.setupPileup(task, taskConf['PileupConfig'], stepName="cmsRun1") # outputModules were added already, we just want to create merge tasks here if strToBool(taskConf.get('KeepOutput', True)): @@ -239,7 +239,8 @@ def setupTask(self, task, taskConf): _setupTask_ Build the task using the setupProcessingTask from StdBase - and set the parents appropriately to handle a processing task + and set the parents appropriately to handle a processing task, + It's only called for the top level task and top level step (cmsRun1) """ configCacheID = taskConf["ConfigCacheID"] splitAlgorithm = taskConf["SplittingAlgo"] @@ -263,7 +264,7 @@ def setupTask(self, task, taskConf): task.setLumiMask(lumiMask) if taskConf["PileupConfig"]: - self.setupPileup(task, taskConf['PileupConfig']) + self.setupPileup(task, taskConf['PileupConfig'], stepName="cmsRun1") # outputModules were added already, we just want to create merge tasks here if strToBool(taskConf.get('KeepOutput', True)): @@ -322,7 +323,7 @@ def setupNextSteps(self, task, origArgs): # Pileup check taskConf["PileupConfig"] = parsePileupConfig(taskConf["MCPileup"], taskConf["DataPileup"]) if taskConf["PileupConfig"]: - self.setupPileup(task, taskConf['PileupConfig']) + self.setupPileup(task, taskConf['PileupConfig'], stepName=currentCmsRun) # Handling the output modules in order to decide whether we should # stage them out and report them in the Report.pkl file diff --git a/src/python/WMCore/WMSpec/WMTask.py b/src/python/WMCore/WMSpec/WMTask.py index 3c084dfb8a..4b3065667b 100644 --- a/src/python/WMCore/WMSpec/WMTask.py +++ b/src/python/WMCore/WMSpec/WMTask.py @@ -853,6 +853,8 @@ def setInputPileupDatasets(self, dsetName): self.data.input.pileup.datasets.append(dsetName) else: raise ValueError("Pileup dataset must be either a list or basestring") + # make the list unique + self.data.input.pileup.datasets = list(set(self.data.input.pileup.datasets)) def getInputPileupDatasets(self): """ diff --git a/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StepChain_t.py b/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StepChain_t.py index 1cbbafb2bf..2990884738 100644 --- a/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StepChain_t.py +++ b/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StepChain_t.py @@ -516,6 +516,55 @@ def testPileupWithoutInputData(self): pileups = [item for puSet in pileups for item in puSet] self.assertItemsEqual(pileups, [testArguments['Step1']['MCPileup']]) + # task level check + task = testWorkload.getTask(taskName=testArguments['Step1']['StepName']) + self.assertItemsEqual([testArguments['Step1']['MCPileup']], task.getInputPileupDatasets()) + + # step level check + stepHelper = task.getStepHelper('cmsRun1') + puConfig = stepHelper.getPileup() + self.assertItemsEqual([testArguments['Step1']["MCPileup"]], puConfig.mc.dataset) + + def testMultiplePileupDsets(self): + """ + Test a StepChain which uses different Pileup datasets for different + steps in the chain. + """ + testArguments = StepChainWorkloadFactory.getTestArguments() + testArguments.update(deepcopy(REQUEST)) + + configDocs = injectStepChainConfigMC(self.configDatabase) + for s in ['Step1', 'Step2', 'Step3']: + testArguments[s]['ConfigCacheID'] = configDocs[s] + testArguments['Step2']['KeepOutput'] = False + testArguments['Step3']['MCPileup'] = "/Cosmics/ComissioningHI-PromptReco-v1/RECO" + + factory = StepChainWorkloadFactory() + testWorkload = factory.factoryWorkloadConstruction("TestWorkload", testArguments) + + # workload level check + pileups = testWorkload.listPileupDatasets().values() + pileups = [item for puSet in pileups for item in puSet] + self.assertIn(testArguments['Step2']['MCPileup'], pileups) + self.assertIn(testArguments['Step3']['MCPileup'], pileups) + + # task level check + task = testWorkload.getTask(taskName=testArguments['Step1']['StepName']) + pileups = [testArguments['Step2']['MCPileup'], testArguments['Step3']['MCPileup']] + self.assertItemsEqual(pileups, task.getInputPileupDatasets()) + + # step level check + for idx in range(1, testArguments['StepChain'] + 1): + stepName = "cmsRun%s" % idx + stepNum = "Step%s" % idx + stepHelper = task.getStepHelper(stepName) + puConfig = stepHelper.getPileup() + if puConfig: + puConfig = puConfig.mc.dataset + else: + puConfig = [puConfig] # let's call it a [None] then, for the sake of testing + self.assertItemsEqual([testArguments[stepNum].get("MCPileup")], puConfig) + def testStepChainIncludeParentsValidation(self): """ Check that the test arguments pass basic validation,