diff --git a/bin/00_pypi_deploy_prod.sh b/bin/00_pypi_deploy_prod.sh index adb5d468f..a4d67441e 100755 --- a/bin/00_pypi_deploy_prod.sh +++ b/bin/00_pypi_deploy_prod.sh @@ -30,8 +30,8 @@ then exit fi -WMAGENT_TAG=2.3.4 -TIER0_VERSION=3.2.3 +WMAGENT_TAG=2.3.5 +TIER0_VERSION=3.2.5 COUCH_TAG=3.2.2 BASE_DIR=/data/tier0 @@ -159,12 +159,11 @@ sed -i "s+config.RucioInjector.metaDIDProject.*+config.RucioInjector.metaDIDProj echo "config.RucioInjector.blockDeletionDelayHours = 168" >> $config/config.py ##### ##### NOT IN REPLAY ##### -echo "config.RucioInjector.blockDeletionDelayHours = 168" >> $config/config.py echo 'config.BossAir.pluginNames = ["SimpleCondorPlugin"]' >> $config/config.py echo 'config.JobAccountant.maxAllowedRepackOutputSize = 24 * 1024 * 1024 * 1024' >> $config/config.py echo "config.AgentStatusWatcher.runningExpressPercent = 25" >> $config/config.py echo "config.AgentStatusWatcher.runningRepackPercent = 10" >> $config/config.py -echo 'config.TaskArchiver.archiveDelayHours = 2190' >> $config.py +echo 'config.TaskArchiver.archiveDelayHours = 720' >> $config.py ##### NOT IN REPLAY ##### ##### echo "config.DBS3Upload.datasetType = 'VALID'" >> $config/config.py diff --git a/bin/00_pypi_deploy_replay.sh b/bin/00_pypi_deploy_replay.sh index e5e83d7aa..d0d00254e 100644 --- a/bin/00_pypi_deploy_replay.sh +++ b/bin/00_pypi_deploy_replay.sh @@ -15,8 +15,8 @@ then exit fi -WMAGENT_TAG=2.3.4rc11 -TIER0_VERSION=3.2.4rc1 +WMAGENT_TAG=2.3.5 +TIER0_VERSION=3.2.5 COUCH_TAG=3.2.2 BASE_DIR=/data/tier0 @@ -150,7 +150,7 @@ echo "config.RucioInjector.blockDeletionDelayHours = 2" >> $config/config.py #echo 'config.JobAccountant.maxAllowedRepackOutputSize = 24 * 1024 * 1024 * 1024' >> ./config/tier0/config.py #echo "config.AgentStatusWatcher.runningExpressPercent = 25" >> ./config/tier0/config.py #echo "config.AgentStatusWatcher.runningRepackPercent = 10" >> ./config/tier0/config.py -#echo 'config.TaskArchiver.archiveDelayHours = 2190' >> $config.py +#echo 'config.TaskArchiver.archiveDelayHours = 720' >> $config.py ##### ONLY IN PROD ##### ##### echo "config.DBS3Upload.datasetType = 'VALID'" >> $config/config.py diff --git a/bin/00_pypi_patches.sh b/bin/00_pypi_patches.sh index 8a8dc8294..5f9ae5503 100644 --- a/bin/00_pypi_patches.sh +++ b/bin/00_pypi_patches.sh @@ -11,7 +11,8 @@ DEPLOY_DIR=$BASE_DIR/WMAgent.venv3 ################## #Dummy demonstrative Patch for new alma 9 agent -#curl https://patch-diff.githubusercontent.com/raw/dmwm/T0/pull/4961.patch | patch -d $WMA_DEPLOY_DIR/lib/python3.9/site-packages/ -p 3 +#curl https://patch-diff.githubusercontent.com/raw/dmwm/T0/pull/4961.patch | patch -f -d $WMA_DEPLOY_DIR/lib/python3.9/site-packages/ -p 3 +curl https://patch-diff.githubusercontent.com/raw/germanfgv/WMCore/pull/16.patch | patch -f -d $WMA_DEPLOY_DIR/lib/python3.9/site-packages/ -p 3 ###################### diff --git a/bin/00_pypi_tweak_prod_config.sh b/bin/00_pypi_tweak_prod_config.sh index 045b7c830..403655f1a 100644 --- a/bin/00_pypi_tweak_prod_config.sh +++ b/bin/00_pypi_tweak_prod_config.sh @@ -8,7 +8,6 @@ sed -i "s+'team1,team2,cmsdataops'+'tier0production'+g" "$config/config.py" sed -i "s+config.RucioInjector.containerDiskRuleParams.*+config.RucioInjector.containerDiskRuleParams = {}+" "$config/config.py" echo "config.RucioInjector.blockRuleParams = {}" >> $config/config.py sed -i "s+config.RucioInjector.metaDIDProject.*+config.RucioInjector.metaDIDProject = 'Tier0'+" "$config/config.py" -echo "config.RucioInjector.blockDeletionDelayHours = 168" >> $config/config.py ##### NOT IN REPLAY ##### echo "config.RucioInjector.blockDeletionDelayHours = 168" >> $config/config.py echo 'config.BossAir.pluginNames = ["SimpleCondorPlugin"]' >> $config/config.py @@ -37,4 +36,4 @@ if [ "x$DROPBOX_USER" == "x" ] || [ "x$DROPBOX_PASS" == "x" ]; then fi echo 'config.Tier0Feeder.dropboxuser = "'$DROPBOX_USER'"' >> $config/config.py -echo 'config.Tier0Feeder.dropboxpass = "'$DROPBOX_PASS'"' >> $config/config.py \ No newline at end of file +echo 'config.Tier0Feeder.dropboxpass = "'$DROPBOX_PASS'"' >> $config/config.py diff --git a/etc/HIProdOfflineConfiguration.py b/etc/HIProdOfflineConfiguration.py index 13093bce3..a280bcaac 100644 --- a/etc/HIProdOfflineConfiguration.py +++ b/etc/HIProdOfflineConfiguration.py @@ -25,6 +25,7 @@ from T0.RunConfig.Tier0Config import setStreamerPNN from T0.RunConfig.Tier0Config import addSiteConfig from T0.RunConfig.Tier0Config import setStorageSite +from T0.RunConfig.Tier0Config import setHelperAgentStreams # Create the Tier0 configuration object tier0Config = createTier0Config() @@ -38,6 +39,11 @@ # Set the max run number: setInjectMaxRun(tier0Config, 9999999) +# Define streams to ignore. These wont be injected by the MainAgent +setHelperAgentStreams(tier0Config, {'SecondAgent': [], + 'ThirdAgent' : [] + }) + # Settings up sites processingSite = "T2_CH_CERN" storageSite = "T0_CH_CERN_Disk" diff --git a/etc/HIReplayOfflineConfiguration.py b/etc/HIReplayOfflineConfiguration.py index db2649226..e23db8bc3 100644 --- a/etc/HIReplayOfflineConfiguration.py +++ b/etc/HIReplayOfflineConfiguration.py @@ -28,6 +28,7 @@ from T0.RunConfig.Tier0Config import addSiteConfig from T0.RunConfig.Tier0Config import setStorageSite from T0.RunConfig.Tier0Config import setExtraStreamDatasetMap +from T0.RunConfig.Tier0Config import setHelperAgentStreams # Create the Tier0 configuration object tier0Config = createTier0Config() @@ -44,6 +45,10 @@ # 361694:361699,361779 - 2022 HI dry-run test runs setInjectRuns(tier0Config, [374951]) +# Define streams to ignore. These wont be injected by the MainAgent +setHelperAgentStreams(tier0Config, {'SecondAgent': [], + 'ThirdAgent' : [] + }) # Settings up sites processingSite = "T2_CH_CERN" storageSite = "T0_CH_CERN_Disk" diff --git a/etc/ProdOfflineConfiguration.py b/etc/ProdOfflineConfiguration.py index 9d59d763f..279b2ea5e 100644 --- a/etc/ProdOfflineConfiguration.py +++ b/etc/ProdOfflineConfiguration.py @@ -28,7 +28,7 @@ from T0.RunConfig.Tier0Config import addSiteConfig from T0.RunConfig.Tier0Config import setStorageSite from T0.RunConfig.Tier0Config import setExtraStreamDatasetMap - +from T0.RunConfig.Tier0Config import setHelperAgentStreams # Create the Tier0 configuration object tier0Config = createTier0Config() @@ -42,6 +42,11 @@ # Set the max run number: setInjectMaxRun(tier0Config, 9999999) +# Set streams to ignore by agent. These will not be injected by the MainAgent +setHelperAgentStreams(tier0Config, {"SecondAgent" : [], + "ThirdAgent" : []}) + + # Settings up sites processingSite = "T2_CH_CERN" storageSite = "T0_CH_CERN_Disk" @@ -60,7 +65,7 @@ # Data type # Processing site (where jobs run) # PhEDEx locations -setAcquisitionEra(tier0Config, "Run2024G") +setAcquisitionEra(tier0Config, "Run2024H") setEmulationAcquisitionEra(tier0Config, "Emulation2024") setBaseRequestPriority(tier0Config, 251000) setBackfill(tier0Config, None) @@ -101,7 +106,7 @@ # maxRunPreviousConfig = 999999 # Last run before era change 08/09/23 # Defaults for CMSSW version defaultCMSSWVersion = { - 'default': "CMSSW_14_0_14", + 'default': "CMSSW_14_0_16", #'acqEra': {'Run2024F': "CMSSW_14_0_11"}, #'maxRun': {maxRunPreviousConfig: "CMSSW_13_2_2"} } @@ -1109,13 +1114,13 @@ dqm_sequences=["@common", "@ecal", "@jetmet", "@L1TMon", "@hcal", "@L1TEgamma"], alca_producers=["TkAlMinBias","LumiPixelsMinBias"], physics_skims=["LogError", "LogErrorMonitor"], - disk_node="T2_CH_CERN", + disk_node="T2_CH_CERN", scenario=ppScenario) DATASETS = ["EphemeralHLTPhysics0","EphemeralHLTPhysics1", "EphemeralHLTPhysics2", "EphemeralHLTPhysics3", - "EphemeralHLTPhysics4", "EphemeralHLTPhysics5", "EphemeralHLTPhysics6","EphemeralHLTPhysics7" - "EphemeralHLTPhysics8", "EphemeralHLTPhysics9","EphemeralHLTPhysics10","EphemeralHLTPhysics11" - "EphemeralHLTPhysics12","EphemeralHLTPhysics13","EphemeralHLTPhysics14","EphemeralHLTPhysics15" + "EphemeralHLTPhysics4", "EphemeralHLTPhysics5", "EphemeralHLTPhysics6","EphemeralHLTPhysics7", + "EphemeralHLTPhysics8", "EphemeralHLTPhysics9","EphemeralHLTPhysics10","EphemeralHLTPhysics11", + "EphemeralHLTPhysics12","EphemeralHLTPhysics13","EphemeralHLTPhysics14","EphemeralHLTPhysics15", "EphemeralHLTPhysics16","EphemeralHLTPhysics17","EphemeralHLTPhysics18","EphemeralHLTPhysics19"] for dataset in DATASETS: @@ -1130,6 +1135,29 @@ disk_node="T2_CH_CERN", scenario=ppScenario) +## DAQ TRANSFER TEST PDs (fall 2024) +DATASETS_DAQ_TFTEST = ["TestHLTPhysics0","TestHLTPhysics1", "TestHLTPhysics2", "TestHLTPhysics3", + "TestHLTPhysics4", "TestHLTPhysics5", "TestHLTPhysics6","TestHLTPhysics7", + "TestHLTPhysics8", "TestHLTPhysics9","TestHLTPhysics10","TestHLTPhysics11", + "TestHLTPhysics12", "TestHLTPhysics13","TestHLTPhysics14","TestHLTPhysics15", + "TestHLTPhysics16", "TestHLTPhysics17","TestHLTPhysics18","TestHLTPhysics19", + "TestHLTPhysics20", "TestHLTPhysics21","TestHLTPhysics22","TestHLTPhysics23", + "TestHLTPhysics24", "TestHLTPhysics25","TestHLTPhysics26","TestHLTPhysics27", + "TestHLTPhysics28", "TestHLTPhysics29","TestHLTPhysics30","TestHLTPhysics31", + "TestHLTPhysics32", "TestHLTPhysics33","TestHLTPhysics34","TestHLTPhysics35", + "TestHLTPhysics36", "TestHLTPhysics37","TestHLTPhysics38","TestHLTPhysics39"] + +for dataset in DATASETS_DAQ_TFTEST: + addDataset(tier0Config, dataset, + do_reco=False, + raw_to_disk=False, + dqm_sequences=["@none"], + archival_node=None, + tape_node="T0_CH_CERN_MSS", + disk_node="T2_CH_CERN", + scenario=ppScenario) + + ######################################################## ### SpecialRandom PDs ### ######################################################## @@ -1434,6 +1462,8 @@ ignoreStream(tier0Config, "streamDQMRates") ignoreStream(tier0Config, "DQMPPSRandom") +# Set streams to ignore by agent. These will not be injected +setHelperAgentStreams(tier0Config, {"SecondAgent" : DATASETS_DAQ_TFTEST}) ################################### ### currently inactive settings ### diff --git a/etc/ReplayOfflineConfiguration.py b/etc/ReplayOfflineConfiguration.py index edbe435ec..3acee377f 100644 --- a/etc/ReplayOfflineConfiguration.py +++ b/etc/ReplayOfflineConfiguration.py @@ -28,6 +28,7 @@ from T0.RunConfig.Tier0Config import addSiteConfig from T0.RunConfig.Tier0Config import setStorageSite from T0.RunConfig.Tier0Config import setExtraStreamDatasetMap +from T0.RunConfig.Tier0Config import setHelperAgentStreams # Create the Tier0 configuration object tier0Config = createTier0Config() @@ -44,6 +45,11 @@ # Use this in order to limit the number of lumisections to process #setInjectLimit(tier0Config, 10) +# Define streams to ignore. These wont be injected by the MainAgent +setHelperAgentStreams(tier0Config, {'SecondAgent': [], + 'ThirdAgent' : [] + }) + # Settings up sites processingSite = "T2_CH_CERN" storageSite = "T0_CH_CERN_Disk" @@ -110,7 +116,7 @@ # Defaults for CMSSW version defaultCMSSWVersion = { - 'default': "CMSSW_14_0_15" + 'default': "CMSSW_14_0_16" } # Configure ScramArch @@ -1017,9 +1023,9 @@ scenario=ppScenario) DATASETS = ["EphemeralHLTPhysics0","EphemeralHLTPhysics1", "EphemeralHLTPhysics2", "EphemeralHLTPhysics3", - "EphemeralHLTPhysics4", "EphemeralHLTPhysics5", "EphemeralHLTPhysics6","EphemeralHLTPhysics7" - "EphemeralHLTPhysics8", "EphemeralHLTPhysics9","EphemeralHLTPhysics10","EphemeralHLTPhysics11" - "EphemeralHLTPhysics12","EphemeralHLTPhysics13","EphemeralHLTPhysics14","EphemeralHLTPhysics15" + "EphemeralHLTPhysics4", "EphemeralHLTPhysics5", "EphemeralHLTPhysics6","EphemeralHLTPhysics7", + "EphemeralHLTPhysics8", "EphemeralHLTPhysics9","EphemeralHLTPhysics10","EphemeralHLTPhysics11", + "EphemeralHLTPhysics12","EphemeralHLTPhysics13","EphemeralHLTPhysics14","EphemeralHLTPhysics15", "EphemeralHLTPhysics16","EphemeralHLTPhysics17","EphemeralHLTPhysics18","EphemeralHLTPhysics19"] for dataset in DATASETS: @@ -1030,6 +1036,26 @@ write_dqm=False, scenario=ppScenario) +## DAQ TRANSFER TEST PDs (fall 2024) +DATASETS_DAQ_TFTEST = ["TestHLTPhysics0","TestHLTPhysics1", "TestHLTPhysics2", "TestHLTPhysics3", + "TestHLTPhysics4", "TestHLTPhysics5", "TestHLTPhysics6","TestHLTPhysics7", + "TestHLTPhysics8", "TestHLTPhysics9","TestHLTPhysics10","TestHLTPhysics11", + "TestHLTPhysics12", "TestHLTPhysics13","TestHLTPhysics14","TestHLTPhysics15", + "TestHLTPhysics16", "TestHLTPhysics17","TestHLTPhysics18","TestHLTPhysics19", + "TestHLTPhysics20", "TestHLTPhysics21","TestHLTPhysics22","TestHLTPhysics23", + "TestHLTPhysics24", "TestHLTPhysics25","TestHLTPhysics26","TestHLTPhysics27", + "TestHLTPhysics28", "TestHLTPhysics29","TestHLTPhysics30","TestHLTPhysics31", + "TestHLTPhysics32", "TestHLTPhysics33","TestHLTPhysics34","TestHLTPhysics35", + "TestHLTPhysics36", "TestHLTPhysics37","TestHLTPhysics38","TestHLTPhysics39"] + +for dataset in DATASETS_DAQ_TFTEST: + addDataset(tier0Config, dataset, + do_reco=False, + raw_to_disk=False, + dqm_sequences=["@none"], + scenario=ppScenario) + + ######################################################## ### SpecialRandom PDs ### ######################################################## diff --git a/etc/Tier0Config.py b/etc/Tier0Config.py index af191017a..1427aa477 100644 --- a/etc/Tier0Config.py +++ b/etc/Tier0Config.py @@ -39,7 +39,7 @@ config.TaskArchiver.useReqMgrForCompletionCheck = False config.TaskArchiver.dashBoardUrl = "http://dashb-luminosity.cern.ch/dashboard/request.py/putluminositydata" config.TaskArchiver.logLevel = "DEBUG" -config.TaskArchiver.archiveDelayHours = 2190 +config.TaskArchiver.archiveDelayHours = 720 config.TaskArchiver.useWorkQueue = False config.DBS3Upload.primaryDatasetType = "data" @@ -58,7 +58,6 @@ config.AgentStatusWatcher.onlySSB = False config.RucioInjector.blockRuleParams = {} -config.RucioInjector.blockDeletionDelayHours = 168 config.RucioInjector.useDsetReplicaDeep = True config.BossAir.pluginNames = ["SimpleCondorPlugin"] diff --git a/src/python/T0/RunConfig/Tier0Config.py b/src/python/T0/RunConfig/Tier0Config.py index 0e9bafe45..2f47a1d39 100644 --- a/src/python/T0/RunConfig/Tier0Config.py +++ b/src/python/T0/RunConfig/Tier0Config.py @@ -286,6 +286,7 @@ def createTier0Config(): tier0Config.Global.InjectMinRun = None tier0Config.Global.InjectMaxRun = None + tier0Config.Global.HelperAgentStreams = {} tier0Config.Global.SpecifiedStreamNames = None tier0Config.Global.ScramArches = {} @@ -857,6 +858,14 @@ def setInjectLimit(config, injectLimit): config.Global.InjectLimit = injectLimit return +def setHelperAgentStreams(config, helperAgentStreams): + """ + _setSecondaryAgentStreams_ + + Define which streams should not be injected by the agent (for dual agents) + """ + config.Global.HelperAgentStreams = helperAgentStreams + return def setEnableUniqueWorkflowName(config): """ diff --git a/src/python/T0/StorageManager/StorageManagerAPI.py b/src/python/T0/StorageManager/StorageManagerAPI.py index d82c22b54..c7ea7c3cb 100644 --- a/src/python/T0/StorageManager/StorageManagerAPI.py +++ b/src/python/T0/StorageManager/StorageManagerAPI.py @@ -19,7 +19,8 @@ def injectNewData(dbInterfaceStorageManager, minRun = None, maxRun = None, injectRun = None, - injectLimit = None): + injectLimit = None, + agentType = None): """ _injectNewData_ @@ -72,10 +73,17 @@ def injectNewData(dbInterfaceStorageManager, transaction = False) # remove already processed files - newData[:] = [newFile for newFile in newData if newFile['p5_id'] not in knownStreamers] + # Filtering streams for Main and Secondary agents before injecting + # Note that every agent is Main by default + # Note that the secondaryAgentStreams works as a specifyStreams if the agent is secondary + newData[:] = [newFile for newFile in newData if newFile['p5_id'] not in knownStreamers] logging.debug("StoragemanagerAPI: found %d new files", len(newData)) + if agentType: + logging.info("Filtering streamer files for %s", agentType.name) + newData = agentType.filterStreamerFiles(streamerFiles = newData) + newRuns = set() newRunStreams = {} for newFile in newData: @@ -87,6 +95,7 @@ def injectNewData(dbInterfaceStorageManager, if run not in newRunStreams: newRunStreams[run] = set() + if stream not in newRunStreams[run]: newRunStreams[run].add(stream) diff --git a/src/python/T0/__init__.py b/src/python/T0/__init__.py index dca33f28f..444a1d30b 100644 --- a/src/python/T0/__init__.py +++ b/src/python/T0/__init__.py @@ -4,5 +4,5 @@ Core libraries for Workload Management Packages """ -__version__ = '3.2.4.pre2' +__version__ = '3.2.5' __all__ = [] diff --git a/src/python/T0Component/Tier0Feeder/MultipleAgents/BaseAgent.py b/src/python/T0Component/Tier0Feeder/MultipleAgents/BaseAgent.py new file mode 100644 index 000000000..460fe3bdf --- /dev/null +++ b/src/python/T0Component/Tier0Feeder/MultipleAgents/BaseAgent.py @@ -0,0 +1,27 @@ +class BaseAgent(object): + """ + Base agent. + + """ + + def __init__(self, tier0Config): + + object.__init__(self) + self.helperAgentStreams = tier0Config.Global.HelperAgentStreams + + def filterStreamerFiles (self, streamerFiles = []): + """ + _filterStreamerFiles_ + + When running multiple agents, filter out data from Storage Manager to avoid duplicates + """ + return + + def filterHltConfigStreams (self, hltStreamMapping = {}): + """ + _filterHltConfigStreams_ + + When running multiple agents, populate databases with the streams information relevant to the given agent + """ + return + \ No newline at end of file diff --git a/src/python/T0Component/Tier0Feeder/MultipleAgents/HelperAgent.py b/src/python/T0Component/Tier0Feeder/MultipleAgents/HelperAgent.py new file mode 100644 index 000000000..3466fdb58 --- /dev/null +++ b/src/python/T0Component/Tier0Feeder/MultipleAgents/HelperAgent.py @@ -0,0 +1,46 @@ +import logging +from T0Component.Tier0Feeder.MultipleAgents.BaseAgent import BaseAgent + +class HelperAgent(BaseAgent): + """ + Helper agent. The streams processed by this one are explicitely defined in the tier0Config. + helperRole has to match + + Use it with + config.Tier0Feeder.AgentRole = "SecondAgent" (or "ThirdAgent" ..., follow the configuration file) + Note: if the helperName is none or unknown, returned values will be empty and no data is injected + """ + def __init__(self, tier0Config, helperName = None): + + BaseAgent.__init__(self, tier0Config) + self.name = helperName + if self.name and self.name in self.helperAgentStreams: + self.streams = self.helperAgentStreams[self.name] + else: + self.streams = [] + + logging.info("This is a HelperAgent named {}. Processing streams {}".format(self.name, self.streams)) + + def filterStreamerFiles (self, streamerFiles = []): + """ + _filterStreamerFiles_ + + When running multiple agents, filter out data from Storage Manager to avoid duplicates + Returns streamers the agent wants to process + """ + + filteredStreamers = [streamer for streamer in streamerFiles if streamer['stream'] in self.streams] + + return filteredStreamers + + def filterHltConfigStreams (self, hltStreamMapping = {}): + """ + _filterHltConfigStreams_ + + When running multiple agents, populate databases with the streams information relevant to the given agent + Returns modified hltStreamMapping with streams that the agent wants + """ + + filteredHltStreamMapping = {stream: hltStreamMapping[stream] for stream in hltStreamMapping if stream in self.streams} + + return filteredHltStreamMapping \ No newline at end of file diff --git a/src/python/T0Component/Tier0Feeder/MultipleAgents/MainAgent.py b/src/python/T0Component/Tier0Feeder/MultipleAgents/MainAgent.py new file mode 100644 index 000000000..27b3e55af --- /dev/null +++ b/src/python/T0Component/Tier0Feeder/MultipleAgents/MainAgent.py @@ -0,0 +1,50 @@ +import logging +from T0Component.Tier0Feeder.MultipleAgents.BaseAgent import BaseAgent + +class MainAgent(BaseAgent): + """ + Main agent. The streams processed by this agent are implicitely defined in the tier0Config + * This agent will process streams that are not defined in any of the other agents + * tier0Config.Global.HelperAgentStreams = {'SecondAgent' : ["stream_1", "stream_2"], + 'ThirdAgent' : ["stream_3", "stream_4"]} + Note: This agent will ignore all streams in the helper agents, regardless of the helper agents status + """ + def __init__(self, tier0Config): + + BaseAgent.__init__(self, tier0Config) + self.name = "MainAgent" + self.findUnwantedStreams() + logging.info("This is a MainAgent. Ignoring streams {}".format(self.rejectStreams)) + + def filterStreamerFiles (self, streamerFiles): + """ + _filterStreamerFiles_ + + When running multiple agents, filter out data from Storage Manager to avoid duplicates + Returns the streamers the agent want to process + """ + filteredStreamers = [streamer for streamer in streamerFiles if streamer['stream'] not in self.rejectStreams] + + return filteredStreamers + + def filterHltConfigStreams (self, hltStreamMapping = {}): + """ + _filterHltConfigStreams_ + + When running multiple agents, populate databases with the streams information relevant to the given agent + """ + filteredHltStreamMapping = {stream: hltStreamMapping[stream] for stream in hltStreamMapping if stream not in self.rejectStreams} + + return filteredHltStreamMapping + + def findUnwantedStreams (self): + """ + _findUnwantedStreams_ + + Returns a list of streams that will not be processed by any other agent + """ + self.rejectStreams = [] + for agent in self.helperAgentStreams.keys(): + self.rejectStreams += self.helperAgentStreams[agent] + + return \ No newline at end of file diff --git a/src/python/T0Component/Tier0Feeder/MultipleAgents/__init__.py b/src/python/T0Component/Tier0Feeder/MultipleAgents/__init__.py new file mode 100644 index 000000000..1c6b1f439 --- /dev/null +++ b/src/python/T0Component/Tier0Feeder/MultipleAgents/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python + +__all__ = [] \ No newline at end of file diff --git a/src/python/T0Component/Tier0Feeder/Tier0FeederPoller.py b/src/python/T0Component/Tier0Feeder/Tier0FeederPoller.py index 9d1282b43..870cfdb48 100644 --- a/src/python/T0Component/Tier0Feeder/Tier0FeederPoller.py +++ b/src/python/T0Component/Tier0Feeder/Tier0FeederPoller.py @@ -20,6 +20,7 @@ from Utils.Timers import timeFunction from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread from WMCore.DAOFactory import DAOFactory +from WMCore.WMFactory import WMFactory from WMCore.Database.DBFactory import DBFactory from WMCore.WMException import WMException from WMCore.Configuration import loadConfigurationFile @@ -31,6 +32,8 @@ from T0.StorageManager import StorageManagerAPI from T0.RunConfig.Tier0Config import setDeploymentId +from T0Component.Tier0Feeder.MultipleAgents.MainAgent import MainAgent +from T0Component.Tier0Feeder.MultipleAgents.HelperAgent import HelperAgent class Tier0FeederPoller(BaseWorkerThread): @@ -48,6 +51,9 @@ def __init__(self, config): dbinterface = myThread.dbi) self.tier0ConfigFile = config.Tier0Feeder.tier0ConfigFile + + self.agentName = getattr(config.Tier0Feeder, "agentName", None) + self.specDirectory = config.Tier0Feeder.specDirectory self.dropboxuser = getattr(config.Tier0Feeder, "dropboxuser", None) self.dropboxpass = getattr(config.Tier0Feeder, "dropboxpass", None) @@ -100,6 +106,7 @@ def __init__(self, config): logger = logging, dbinterface = dbInterfaceT0DataSvc) + # # Set deployment ID # @@ -154,6 +161,21 @@ def algorithm(self, parameters = None): # # replays call data discovery only once (and ignore data status) # + + # If no helper agents are given in the configuration, + # this is a main agent that will ignore no streams + if not tier0Config.Global.HelperAgentStreams: + logging.info("No HelperAgent provided. This is a MainAgent. Processing all streams") + self.agentName = "MainAgent" + + # If HelperAgent is specified, but the name is not specified, + # a HelperAgent is started and will not process anything + if self.agentName == "MainAgent": + self.agentType = MainAgent(tier0Config) + else: + self.agentType = HelperAgent(tier0Config, helperName=self.agentName) + + try: if tier0Config.Global.InjectRuns == None: StorageManagerAPI.injectNewData(self.dbInterfaceStorageManager, @@ -161,7 +183,8 @@ def algorithm(self, parameters = None): self.dbInterfaceSMNotify, streamerPNN = tier0Config.Global.StreamerPNN, minRun = tier0Config.Global.InjectMinRun, - maxRun = tier0Config.Global.InjectMaxRun) + maxRun = tier0Config.Global.InjectMaxRun, + agentType = self.agentType) else: injectRuns = set() for injectRun in tier0Config.Global.InjectRuns: @@ -173,7 +196,8 @@ def algorithm(self, parameters = None): self.dbInterfaceSMNotify, streamerPNN = tier0Config.Global.StreamerPNN, injectRun = injectRun, - injectLimit= tier0Config.Global.InjectLimit) + injectLimit= tier0Config.Global.InjectLimit, + agentType = self.agentType) self.injectedRuns.add(injectRun) except: # shouldn't happen, just a catch all insurance @@ -210,6 +234,8 @@ def algorithm(self, parameters = None): continue try: + if self.agentType: + hltConfig['mapping'] = self.agentType.filterHltConfigStreams(hltConfig['mapping']) RunConfigAPI.configureRun(tier0Config, run, hltConfig) except: logging.exception("Can't configure for run %d" % (run))