From a61c023d13ad495d740616e38c3a05b37781e830 Mon Sep 17 00:00:00 2001 From: Benedikt Volkel Date: Mon, 23 May 2022 14:43:26 +0200 Subject: [PATCH] Modularise sim WF * offers * explicitly select modules and detectors with --modules and --readout-detectors flags, respectively * better overview and managing of dependencies * introduce a SimulationWorkflow class * automatically resolves dependencies * introduce posiibility to identify a task by a stage and detector source such as ("RECO", "TPC") to make dependencies more clear based on what is actually going on NOTE: Still room for improvement * could do this for as many tasks as possible, atm only DIGI, RECO and MATCH tasks use this feature * task names could automatically be built from that * remove explicit TF suffix from task names and add automatically * does not add tasks for inactive detectors * possibility to compose source strings such as "ITS,TPC,TPC-TRD" automatically from proposed sources --> non-existent sources would be trimmed * some tasks adjusted to their minimal dependencies * small adjustments in o2dpg_qc_finalization_workflow.py to comply with _ format of tasknames per timeframe --- MC/bin/o2dpg_qc_finalization_workflow.py | 4 +- MC/bin/o2dpg_sim_workflow.py | 661 ++++++++++++++--------- 2 files changed, 413 insertions(+), 252 deletions(-) diff --git a/MC/bin/o2dpg_qc_finalization_workflow.py b/MC/bin/o2dpg_qc_finalization_workflow.py index 1a0b68f55..a91f7cb78 100755 --- a/MC/bin/o2dpg_qc_finalization_workflow.py +++ b/MC/bin/o2dpg_qc_finalization_workflow.py @@ -45,7 +45,7 @@ def add_QC_finalization(taskName, qcConfigPath, needs=None): if standalone == True: needs = [] elif needs == None: - needs = [taskName + '_local' + str(tf) for tf in range(1, ntimeframes + 1)] + needs = [taskName + '_local_' + str(tf) for tf in range(1, ntimeframes + 1)] task = createTask(name=QC_finalize_name(taskName), needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000') task['cmd'] = f'o2-qc --config {qcConfigPath} --remote-batch {taskName}.root' + \ @@ -74,7 +74,7 @@ def add_QC_postprocessing(taskName, qcConfigPath, needs, runSpecific, prodSpecif ## The list of remote-batch workflows (reading the merged QC tasks results, applying Checks, uploading them to QCDB) MFTDigitsQCneeds = [] for flp in range(5): - MFTDigitsQCneeds.extend(['mftDigitsQC'+str(flp)+'_local'+str(tf) for tf in range(1, ntimeframes + 1)]) + MFTDigitsQCneeds.extend(['mftDigitsQC'+str(flp)+'_local_'+str(tf) for tf in range(1, ntimeframes + 1)]) add_QC_finalization('mftDigitsQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-digit-0.json', MFTDigitsQCneeds) add_QC_finalization('mftClustersQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-cluster.json') add_QC_finalization('mftAsyncQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-async.json') diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index 71ff17bd3..43594fe43 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -80,6 +80,8 @@ + 'Offset x Number of TimeFrames x OrbitsPerTimeframe (up for further sophistication)', default=0) parser.add_argument('-j',help='number of workers (if applicable)', default=8, type=int) parser.add_argument('-mod',help='Active modules (deprecated)', default='--skipModules ZDC') +parser.add_argument('--modules', nargs="*", help='Active modules', default=['all']) +parser.add_argument('--readout-detectors', nargs="*", help='Active readout detectors', default=['all']) parser.add_argument('--with-ZDC', action='store_true', help='Enable ZDC in workflow') parser.add_argument('-seed',help='random seed number', default=None) parser.add_argument('-o',help='output workflow file', default='workflow.json') @@ -168,11 +170,31 @@ def load_external_config(configfile): print ("** Using generic config **") anchorConfig = create_sim_config(args) +# detectors that are always required +DETECTORS_ALWAYS_ON = ["ITS", "CTP", "FT0", "FV0"] # with this we can tailor the workflow to the presence of # certain detectors -activeDetectors = anchorConfig.get('o2-ctf-reader-workflow-options',{}).get('onlyDet','all') +activeDetectors = anchorConfig.get('o2-ctf-reader-workflow-options',{}).get('onlyDet', None) +# it is a ","-separeted string of detectors when it comes from the anchored configuration, otherwise a list from cmd +activeDetectors = activeDetectors.split(",") if activeDetectors is not None else args.readout_detectors # convert to set/hashmap -activeDetectors = { det:1 for det in activeDetectors.split(",") } +activeDetectors = {det: 1 for det in activeDetectors} +# a list of all enabled modules +activeModules = args.modules +# deactivated modules, for backward compatibility with ZDC +MODULES, inactiveModules = ("--skipModules ZDC", ["ZDC"]) if not args.with_ZDC else ("", []) +if "all" not in activeModules: + activeModules.extend(DETECTORS_ALWAYS_ON) +if "all" not in activeDetectors: + if IS_ANCHORED_RUN: + # Issue a warning for each detector that should be switched on + for dao in DETECTORS_ALWAYS_ON: + if dao not in activeDetectors: + print(f"WARNING: Detector {dao} should always be there. The workflow might crash at some point") + else: + activeDetectors.extend(DETECTORS_ALWAYS_ON) + +print(activeModules) # see if a detector is in list of activeDetectors def isActive(detID): @@ -180,14 +202,13 @@ def isActive(detID): detID == the usual detID string (3 letters) """ if "all" in activeDetectors: - return True + return True if (detID in activeModules or "all" in activeModules) and detID not in inactiveModules else False return detID in activeDetectors def addWhenActive(detID, needslist, appendstring): if isActive(detID): needslist.append(appendstring) - def retrieve_sor(run_number): """ retrieves start of run (sor) @@ -209,6 +230,250 @@ def retrieve_sor(run_number): return int(SOR) +class SimulationWorkflow: + STAGES = ["DIGI", "RECO", "MATCH", "TPCCLUSTER"] + def __init__(self, n_timeframes): + # number of timeframes + self.n_timeframes = n_timeframes + # the workflow + self.workflow = [] + # all present names, map to stage + self.stages_map = [{s: {} for s in SimulationWorkflow.STAGES} for _ in range(n_timeframes + 1)] + # just keep track of names, for convenience + self.task_names = [] + # tasks not added due to inactive detectors + self.tasks_not_added = [] + + def make_timeframe_index(self, timeframe=-1): + """convenience to find the correct TF index for mapping + """ + return timeframe - 1 if timeframe >= 1 else timeframe + + def make_sources_iter(self, sources_any_form): + """convenience to always have an iterable of sources + """ + if isinstance(sources_any_form, str): + # assuming it is a string like "ITS,TPC,TPC-TOF" + return sources_any_form.split(",") + # just make sure we have a list now (could move to tuples) + return [s for s in sources_any_form] + + def yield_stages_sources_iter(self, sources): + """convenience to always have an iterable for yielded sources + """ + if not isinstance(sources[0], str): + # not a string, assume list-like of 2-tuples + return sources + return [sources] + + def find_task_names_for_sources(self, sources, source_map): + """task names for specified sources + + Args: + sources: iterable of 2-tuples + sources_map: dict mapping source tuples to actual names + Returns: + list of task names + """ + if sources.lower() == "any": + # early return after combining any existent sources + task_names = [v[0] for v in source_map.values() if v[1] is not None] + return list(set(task_names)) + + # split required sources + sources = self.make_sources_iter(sources) + + task_names = [] + for s in sources: + if s not in source_map or source_map[s][1] is None: + return None + task_names.append(source_map[s][0]) + return list(set(task_names)) + + def collect_dependent_task_names(self, required_needs, timeframe=-1): + """from required needs derive task names + """ + timeframe_index = self.make_timeframe_index(timeframe) + needs_names = [] + for n in required_needs: + if isinstance(n, str): + # assuming that is the full name of a task we have + n_found = None + for n_search in (n, f"{n}_{timeframe}"): + if n_search not in self.task_names and n_search in self.tasks_not_added: + return None + if n_search in self.task_names: + n_found = n_search + break + if n_found is None: + print(f"ERROR: Unknown dependency {n}") + sys.exit(1) + needs_names.append(n_found) + continue + + elif len(n) != 2: + print(f"ERROR: Expecting a tuple of length 2 but got {len(n)}") + sys.exit(1) + + tf_stages_map = self.stages_map[timeframe_index] + stage, sources = n + if stage not in tf_stages_map: + return None + # now source is a bit different + task_names = self.find_task_names_for_sources(sources, tf_stages_map[stage]) + if task_names is None: + return None + needs_names.extend(task_names) + return list(set(needs_names)) + + def add_task(self, name, needs=[], stage_detector_sources=None, tf=-1, **kwargs): + """add a task + + Args: + name: str + task name without timeframe suffix + needs: list + list with dependent tasks, either specified explicitly by name + or as a tuple such as ("RECO", "TPC") + stage_detector_sources: 2-tuple or list of 2-tuples + the stage(s) and detector source(s) yield by this task, e.g. + ("RECO", "TPC") or [("RECO", "TRD"), ("MATCH", "ITS-TPC-TRD")] + tf: int + timeframe + kwargs: key-word args + forwarded to createTask + Returns: + dict: The task dictionary + """ + if tf > self.n_timeframes: + print(f"ERROR: Defined for {self.n_timeframes}, however, task should be constructed for timeframe {timeframe}") + sys.exit(1) + + timeframe_index = self.make_timeframe_index(tf) + + # some basic checks + if tf >= 1: + name = f"{name}_{tf}" + if name in self.task_names: + print(f"ERROR: Task name {name} already registered") + sys.exit(1) + + # the user is allowed to specify a condition under which the task is added + # pop that since we forward kwargs dorectly to create_task later which does not understand this argument + to_be_added = True and kwargs.pop("condition", True) + needs = self.collect_dependent_task_names(needs, tf) + if needs is None: + to_be_added = False + stages_sources = [] if not stage_detector_sources else self.yield_stages_sources_iter(stage_detector_sources) + for stage, detector_sources in stages_sources: + if stage not in self.stages_map[timeframe_index]: + print(f"ERROR: unknown stage {stage}") + sys.exit(1) + detector_sources = self.make_sources_iter(detector_sources) + for ds in detector_sources: + add = True + for single_det in ds.split("-"): + if not isActive(single_det): + add = False + break + self.stages_map[timeframe_index][stage][ds] = (name, None if not add or not to_be_added else len(self.workflow) - 1) + if add and to_be_added: + to_be_added = True + # create task either way so that the user can refer to it anytime + task = createTask(name=name, needs=needs, tf=tf, **kwargs) + if not to_be_added: + print(f"Task {name} not added since all specified detectors are deactivated") + self.tasks_not_added.append(name) + return task + + self.task_names.append(name) + self.workflow.append(task) + + return task + + def add_predefined_tasks(self, tasks): + """pass a list of pre-defined tasks + """ + for task in tasks: + timeframe = task["timeframe"] + name = task["name"] + try: + name_split = name.split("_") + timeframe_assumed = int(name_split[-1]) + if timeframe_assumed == timeframe: + name = "_".join(name_split[:-1]) + except ValueError: + # don't do anything, assume that is the task name, nothing we can do + pass + # although it looks a bit cumbersome, this is the clean way to re-define the task via the member function + task_new = self.add_task(name, tf=timeframe, needs=task["needs"], cwd=task["cwd"], lab=task["labels"], cpu=task["resources"]["cpu"], mem=task["resources"]["mem"], relative_cpu=task["resources"]["relative_cpu"]) + task_new["cmd"] = task["cmd"] + + def make_sources_string_reco_match(self, proposed_sources, timeframe, strict_sources=None): + """correct proposed RECO or MATCH sources to what is actually available + + Args: + proposed_sources: str or iter + the proposed string of the for ITS,TPC,TPC-TRD or an + equivalent iterable such as a list of ["ITS", "TPC", "TPC-TRD"] + sources will be skimmed by all sources that are not available + timeframe: int + timeframe + strict_source: iter or None + if not None, these sources are strictly required and the workflow creation will fail if sources not present + Returns: + str: trimmed sources or same as strict_sources if that is not None + """ + strict = False + if strict_sources: + proposed_sources = strict_sources + strict = True + proposed_sources_iter = self.make_sources_iter(proposed_sources) + match_sources = [ps for ps in proposed_sources_iter if "-" in ps] + single_sources = [ps for ps in proposed_sources_iter if "-" not in ps] + new_sources = [] + if timeframe < 1: + print(f"ERROR: Sources can only be extracted per timeframe, therefore require timeframe > 1 but {timeframe} was given") + sys.exit(1) + timeframe_index = self.make_timeframe_index(timeframe) + tf_stages_map = self.stages_map[timeframe_index] + for sources, stage in zip((single_sources, match_sources), ("RECO", "MATCH")): + if stage not in tf_stages_map: + if sources and strict: + print(f"ERROR: Sources {sources} were requested for stage {stage} but stage not present") + sys.exit(1) + continue + stages_map = tf_stages_map[stage] + for s in sources: + if s not in stages_map or stages_map[s][1] is None: + if strict: + print(f"ERROR: Source {ps} was strictly required but it is not present") + sys.exit(1) + continue + new_sources.append(s) + return ",".join(new_sources) + + def make_sources_string_active_detectors(self, proposed_sources, timeframe): + """make a source string only based on single active detectors + Args: + proposed_sources: str or iter + the proposed string of the for ITS,FT0,FV0,CTP or an + equivalent iterable such as a list of ["ITS", "FT0", "FV0", "CTP"] + sources will be skimmed by all sources that are not available + Returns: + str: trimmed sources + """ + proposed_sources_iter = self.make_sources_iter(proposed_sources) + new_sources = [] + if timeframe < 1: + print(f"ERROR: Sources can only be extracted per timeframe, therefore require timeframe > 1 but {timeframe} was given") + sys.exit(1) + for s in proposed_sources_iter: + if not isActive(s): + continue + new_sources.append(s) + return ",".join(new_sources) + # ----------- START WORKFLOW CONSTRUCTION ----------------------------- # set the time to start of run (if no timestamp specified) @@ -218,7 +483,6 @@ def retrieve_sor(run_number): NTIMEFRAMES=int(args.tf) NWORKERS=args.j -MODULES = "--skipModules ZDC" if not args.with_ZDC else "" SIMENGINE=args.e BFIELD=args.field RNDSEED=args.seed # typically the argument should be the jobid, but if we get None the current time is used for the initialisation @@ -249,6 +513,9 @@ def getDPL_global_options(bigshm=False): if (includeLocalQC or includeFullQC) and not isdir(qcdir): mkdir(qcdir) +WORKFLOW = SimulationWorkflow(NTIMEFRAMES) + + if doembedding: if not usebkgcache: # ---- do background transport task ------- @@ -306,7 +573,8 @@ def getDPL_global_options(bigshm=False): CONFKEYBKG=' --configKeyValues "' + args.confKeyBkg + '"' # Background PYTHIA configuration - BKG_CONFIG_task=createTask(name='genbkgconf') + BKG_CONFIG_task = WORKFLOW.add_task('genbkgconf') + BKG_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"' if GENBKG == 'pythia8': print('Background generator seed: ', SIMSEED) @@ -323,30 +591,25 @@ def getDPL_global_options(bigshm=False): # TODO: we need a proper config container/manager so as to combine these local configs with external configs etc. CONFKEYBKG='--configKeyValues "GeneratorPythia8.config=pythia8bkg.cfg;' + args.confKeyBkg + '"' - workflow['stages'].append(BKG_CONFIG_task) - # background task configuration INIBKG='' if args.iniBkg!= '': INIBKG=' --configFile ' + args.iniBkg - BKGtask=createTask(name='bkgsim', lab=["GEANT"], needs=[BKG_CONFIG_task['name']], cpu=NWORKERS ) + BKGtask = WORKFLOW.add_task('bkgsim', needs=[BKG_CONFIG_task['name']], lab=["GEANT"], cpu=NWORKERS ) BKGtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + SIMENGINE + ' -j ' + str(NWORKERS) + ' -n ' + str(NBKGEVENTS) \ + ' -g ' + str(GENBKG) + ' ' + str(MODULES) + ' -o bkg ' + str(INIBKG) \ + ' --field ' + str(BFIELD) + ' ' + str(CONFKEYBKG) \ - + ('',' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) + + ('',' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) + ' -m ' + " ".join(activeModules) if not "all" in activeDetectors: BKGtask['cmd'] += ' --readoutDetectors ' + " ".join(activeDetectors) - workflow['stages'].append(BKGtask) - # check if we should upload background event - if args.upload_bkg_to!=None: - BKGuploadtask=createTask(name='bkgupload', needs=[BKGtask['name']], cpu='0') + if args.upload_bkg_to is not None: + BKGuploadtask = WORKFLOW.add_task('bkgupload', needs=[BKGtask['name']], cpu='0') BKGuploadtask['cmd']='alien.py mkdir ' + args.upload_bkg_to + ';' BKGuploadtask['cmd']+='alien.py cp -f bkg* ' + args.upload_bkg_to + ';' - workflow['stages'].append(BKGuploadtask) else: # here we are reusing existing background events from ALIEN @@ -360,39 +623,33 @@ def getDPL_global_options(bigshm=False): # we can introduce a "retry" feature in the copy process) # Step 1: header and link files - BKG_HEADER_task=createTask(name='bkgdownloadheader', cpu='0', lab=['BKGCACHE']) + BKG_HEADER_task = WORKFLOW.add_task('bkgdownloadheader', cpu='0', lab=['BKGCACHE']) BKG_HEADER_task['cmd']='alien.py cp ' + args.use_bkg_from + 'bkg_MCHeader.root .' BKG_HEADER_task['cmd']=BKG_HEADER_task['cmd'] + ';alien.py cp ' + args.use_bkg_from + 'bkg_geometry.root .' BKG_HEADER_task['cmd']=BKG_HEADER_task['cmd'] + ';alien.py cp ' + args.use_bkg_from + 'bkg_grp.root .' - workflow['stages'].append(BKG_HEADER_task) # a list of smaller sensors (used to construct digitization tasks in a parametrized way) -smallsensorlist = [ "ITS", "TOF", "FDD", "MCH", "MID", "MFT", "HMP", "EMC", "PHS", "CPV" ] -if args.with_ZDC: - smallsensorlist += [ "ZDC" ] +smallsensorlist = [ "ITS", "TOF", "FDD", "MCH", "MID", "MFT", "HMP", "EMC", "PHS", "CPV", "ZDC"] # a list of detectors that serve as input for the trigger processor CTP --> these need to be processed together for now ctp_trigger_inputlist = [ "FT0", "FV0" ] BKG_HITDOWNLOADER_TASKS={} for det in [ 'TPC', 'TRD' ] + smallsensorlist + ctp_trigger_inputlist: if usebkgcache: - BKG_HITDOWNLOADER_TASKS[det] = createTask(str(det) + 'hitdownload', cpu='0', lab=['BKGCACHE']) + BKG_HITDOWNLOADER_TASKS[det] = WORKFLOW.add_task(str(det) + 'hitdownload', cpu='0', lab=['BKGCACHE']) BKG_HITDOWNLOADER_TASKS[det]['cmd'] = 'alien.py cp ' + args.use_bkg_from + 'bkg_Hits' + str(det) + '.root .' - workflow['stages'].append(BKG_HITDOWNLOADER_TASKS[det]) else: BKG_HITDOWNLOADER_TASKS[det] = None if usebkgcache: - BKG_KINEDOWNLOADER_TASK = createTask(name='bkgkinedownload', cpu='0', lab=['BKGCACHE']) + BKG_KINEDOWNLOADER_TASK = WORKFLOW.add_task('bkgkinedownload', cpu='0', lab=['BKGCACHE']) BKG_KINEDOWNLOADER_TASK['cmd'] = 'alien.py cp ' + args.use_bkg_from + 'bkg_Kine.root .' - workflow['stages'].append(BKG_KINEDOWNLOADER_TASK) # We download some binary files, necessary for processing # Eventually, these files/objects should be queried directly from within these tasks? -MATBUD_DOWNLOADER_TASK = createTask(name='matbuddownloader', cpu='0') +MATBUD_DOWNLOADER_TASK = WORKFLOW.add_task('matbuddownloader', cpu='0') MATBUD_DOWNLOADER_TASK['cmd'] = '[ -f matbud.root ] || ${O2_ROOT}/bin/o2-ccdb-downloadccdbfile --host http://alice-ccdb.cern.ch/ -p GLO/Param/MatLUT -o matbud.root --no-preserve-path --timestamp ' + str(args.timestamp) -workflow['stages'].append(MATBUD_DOWNLOADER_TASK) # loop over timeframes for tf in range(1, NTIMEFRAMES + 1): @@ -467,7 +724,7 @@ def getDPL_global_options(bigshm=False): exit(1) # produce the signal configuration - SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir) + SGN_CONFIG_task= WORKFLOW.add_task('gensgnconf', tf=tf, cwd=timeframeworkdir) SGN_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"' if GENERATOR == 'pythia8' and PROCESS!='': SGN_CONFIG_task['cmd'] = '${O2DPG_ROOT}/MC/config/common/pythia8/utils/mkpy8cfg.py \ @@ -495,8 +752,6 @@ def getDPL_global_options(bigshm=False): # print('o2dpg_sim_workflow: Error! configuration file not provided') # exit(1) - workflow['stages'].append(SGN_CONFIG_task) - # ----------------- # transport signals # ----------------- @@ -511,22 +766,21 @@ def getDPL_global_options(bigshm=False): signalneeds = signalneeds + [ BKGtask['name'] ] else: signalneeds = signalneeds + [ BKG_HEADER_task['name'] ] - SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=5/8, n_workers=NWORKERS, mem='2000') + SGNtask = WORKFLOW.add_task('sgnsim', needs=signalneeds, tf=tf, cwd=timeframeworkdir, lab=["GEANT"], relative_cpu=5/8, n_workers=NWORKERS, mem='2000') SGNtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' --seed ' + str(TFSEED) \ + ' --field ' + str(BFIELD) + ' -j ' + str(NWORKERS) + ' -g ' + str(GENERATOR) \ + ' ' + str(TRIGGER) + ' ' + str(CONFKEY) + ' ' + str(INIFILE) \ - + ' -o ' + signalprefix + ' ' + embeddinto \ + + ' -o ' + signalprefix + ' ' + embeddinto + ' -m ' + " ".join(activeModules) \ + ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) if not "all" in activeDetectors: SGNtask['cmd'] += ' --readoutDetectors ' + " ".join(activeDetectors) - workflow['stages'].append(SGNtask) # some tasks further below still want geometry + grp in fixed names, so we provide it here # Alternatively, since we have timeframe isolation, we could just work with standard o2sim_ files # We need to be careful here and distinguish between embedding and non-embedding cases # (otherwise it can confuse itstpcmatching, see O2-2026). This is because only one of the GRPs is updated during digitization. if doembedding: - LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[BKG_HEADER_task['name'] if usebkgcache else BKGtask['name'] ], tf=tf, cwd=timeframeworkdir, cpu='0',mem='0') + LinkGRPFileTask = WORKFLOW.add_task('linkGRP', needs=[BKG_HEADER_task['name'] if usebkgcache else BKGtask['name'] ], tf=tf, cwd=timeframeworkdir, cpu='0',mem='0') LinkGRPFileTask['cmd']=''' ln -nsf ../bkg_grp.root o2sim_grp.root; ln -nsf ../bkg_grpecs.root o2sim_grpecs.root; @@ -539,9 +793,8 @@ def getDPL_global_options(bigshm=False): ln -nsf ../bkg_grpecs.root bkg_grpecs.root ''' else: - LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[SGNtask['name']], tf=tf, cwd=timeframeworkdir, cpu='0', mem='0') + LinkGRPFileTask = WORKFLOW.add_task('linkGRP', needs=[SGNtask['name']], tf=tf, cwd=timeframeworkdir, cpu='0', mem='0') LinkGRPFileTask['cmd']='ln -nsf ' + signalprefix + '_grp.root o2sim_grp.root ; ln -nsf ' + signalprefix + '_geometry.root o2sim_geometry.root; ln -nsf ' + signalprefix + '_geometry-aligned.root o2sim_geometry-aligned.root' - workflow['stages'].append(LinkGRPFileTask) # ------------------ # digitization steps @@ -633,7 +886,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): # This task creates the basic setup for all digitizers! all digitization configKeyValues need to be given here - ContextTask = createTask(name='digicontext_'+str(tf), needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='1') + ContextTask = WORKFLOW.add_task('digicontext', needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='1') # this is just to have the digitizer ini file ContextTask['cmd'] = '${O2_ROOT}/bin/o2-sim-digitizer-workflow --only-context --interactionRate ' + str(INTRATE) \ + ' ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ @@ -650,27 +903,21 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): if BCPATTERN != '': ContextTask['cmd'] += ' --bcPatternFile "' + BCPATTERN + '"' - workflow['stages'].append(ContextTask) - tpcdigineeds=[ContextTask['name'], LinkGRPFileTask['name']] if usebkgcache: tpcdigineeds += [ BKG_HITDOWNLOADER_TASKS['TPC']['name'] ] - TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='9000') + TPCDigitask = WORKFLOW.add_task('tpcdigi', needs=tpcdigineeds, stage_detector_sources=("DIGI", "TPC"), tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='9000') TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding] TPCDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TPC --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS) + ' --incontext ' + str(CONTEXTFILE) + ' --tpc-chunked-writer --disable-write-ini ' + putConfigValuesNew(["TPCGasParam"]) - workflow['stages'].append(TPCDigitask) trddigineeds = [ContextTask['name']] if usebkgcache: trddigineeds += [ BKG_HITDOWNLOADER_TASKS['TRD']['name'] ] - TRDDigitask=createTask(name='trddigi_'+str(tf), needs=trddigineeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000') + TRDDigitask = WORKFLOW.add_task('trddigi', needs=trddigineeds, stage_detector_sources=("DIGI", "TRD"), tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000') TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding] TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TRD --interactionRate ' + str(INTRATE) + putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS}) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' - if isActive("TRD"): - workflow['stages'].append(TRDDigitask) + # these are digitizers which are single threaded def createRestDigiTask(name, det='ALLSMALLER'): @@ -681,56 +928,45 @@ def createRestDigiTask(name, det='ALLSMALLER'): if usebkgcache: for d in itertools.chain(smallsensorlist, ctp_trigger_inputlist): tneeds += [ BKG_HITDOWNLOADER_TASKS[d]['name'] ] - t = createTask(name=name, needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + t = WORKFLOW.add_task(name, needs=tneeds, stage_detector_sources=("DIGI", smallsensorlist), tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') t['cmd'] = ('','ln -nfs ../bkg_Hits*.root . ;')[doembedding] - # t['cmd'] += commondigicmd + ' --skipDet TPC,TRD,FT0,FV0,CTP ' - t['cmd'] += commondigicmd + ' --onlyDet TOF,CPV,EMC,HMP,PHS,ITS,MFT,MID,MCH,FDD' - t['cmd'] += ' --ccdb-tof-sa ' - t['cmd'] += (' --combine-devices ','')[args.no_combine_dpl_devices] - workflow['stages'].append(t) + onlyDet = WORKFLOW.make_sources_string_active_detectors(smallsensorlist, tf) + if onlyDet: + t['cmd'] += commondigicmd + f' --onlyDet {onlyDet}' + t['cmd'] += ' --ccdb-tof-sa ' + t['cmd'] += (' --combine-devices ', '')[args.no_combine_dpl_devices] return t else: # here we create individual digitizers - if isActive(det): - if usebkgcache: + if usebkgcache: tneeds += [ BKG_HITDOWNLOADER_TASKS[det]['name'] ] - t = createTask(name=name, needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') - t['cmd'] = ('','ln -nfs ../bkg_Hits' + str(det) + '.root . ;')[doembedding] - t['cmd'] += commondigicmd + ' --onlyDet ' + str(det) - if det == 'TOF': - t['cmd'] += ' --ccdb-tof-sa' - workflow['stages'].append(t) - return t + t = WORKFLOW.add_task(name, stage_detector_sources=("DIGI", det), needs=tneeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + t['cmd'] = ('','ln -nfs ../bkg_Hits' + str(det) + '.root . ;')[doembedding] + t['cmd'] += commondigicmd + ' --onlyDet ' + str(det) + if det == 'TOF': + t['cmd'] += ' --ccdb-tof-sa' + return t - det_to_digitask={} + det_to_digitask = {} - if not args.no_combine_smaller_digi==True: - det_to_digitask['ALLSMALLER']=createRestDigiTask("restdigi_"+str(tf)) + if not args.no_combine_smaller_digi: + det_to_digitask['ALLSMALLER']=createRestDigiTask("restdigi") for det in smallsensorlist: - name=str(det).lower() + "digi_" + str(tf) - t = det_to_digitask['ALLSMALLER'] if (not args.no_combine_smaller_digi==True) else createRestDigiTask(name, det) + name=str(det).lower() + "digi" + t = det_to_digitask['ALLSMALLER'] if not args.no_combine_smaller_digi else createRestDigiTask(name, det) det_to_digitask[det]=t # detectors serving CTP need to be treated somewhat special since CTP needs # these inputs at the same time --> still need to be made better tneeds = [ContextTask['name']] - t = createTask(name="ft0fv0ctp_digi_" + str(tf), needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + sources = WORKFLOW.make_sources_string_active_detectors("FT0,FV0,CTP", tf) + t = WORKFLOW.add_task("ft0fv0ctp_digi", stage_detector_sources=("DIGI", "FT0,FV0,CTP"), needs=tneeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1', condition=sources=="FT0,FV0,CTP") t['cmd'] = ('','ln -nfs ../bkg_HitsFT0.root . ; ln -nfs ../bkg_HitsFV0.root . ;')[doembedding] - t['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet FT0,FV0,CTP --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' + putConfigValuesNew() + (' --combine-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(t) + t['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet ' + sources + ' --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' + putConfigValuesNew() + (' --combine-devices', '')[args.no_combine_dpl_devices] det_to_digitask["FT0"]=t det_to_digitask["FV0"]=t - def getDigiTaskName(det): - t = det_to_digitask.get(det) - if t == None: - return "undefined" - return t['name'] - # ----------- # reco # ----------- @@ -741,178 +977,132 @@ def getDigiTaskName(det): tpcclustertasks=[] sectorpertask=6 for s in range(0,35,sectorpertask): - taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + '_' + str(tf) + taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) tpcclustertasks.append(taskname) - tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000') + tpcclussect = WORKFLOW.add_task(taskname, needs=[("DIGI", "TPC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000') tpcclussect['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS) tpcclussect['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type digitizer --output-type clusters,send-clusters-per-sector --outfile tpc-native-clusters-part' + str((int)(s/sectorpertask)) + '.root --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "5000000000" } - workflow['stages'].append(tpcclussect) - - TPCCLUSMERGEtask=createTask(name='tpcclustermerge_'+str(tf), needs=tpcclustertasks, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='10000') + TPCCLUSMERGEtask = WORKFLOW.add_task(name='tpcclustermerge', stage_detector_sources=("TPCCLUSTER", "TPC"), needs=tpcclustertasks, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='10000') TPCCLUSMERGEtask['cmd']='${O2_ROOT}/bin/o2-commonutils-treemergertool -i tpc-native-clusters-part*.root -o tpc-native-clusters.root -t tpcrec' #--asfriend preferable but does not work - workflow['stages'].append(TPCCLUSMERGEtask) - tpcreconeeds.append(TPCCLUSMERGEtask['name']) else: - tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000') + tpcclus = WORKFLOW.add_task('tpccluster', needs=[("DIGI", "TPC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000') tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS) tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam"],{"GPU_proc.ompThreads" : 1}) - workflow['stages'].append(tpcclus) - tpcreconeeds.append(tpcclus['name']) - TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000') + TPCRECOtask = WORKFLOW.add_task('tpcreco', stage_detector_sources=("RECO", "TPC"), needs=[("TPCCLUSTER", "TPC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000') TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam"], {"GPU_proc.ompThreads":NWORKERS}) - workflow['stages'].append(TPCRECOtask) - ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[getDigiTaskName("ITS"), MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + ITSRECOtask = WORKFLOW.add_task('itsreco', stage_detector_sources=("RECO", "ITS"), needs=[("DIGI", "ITS"), MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') ITSRECOtask['cmd'] = '${O2_ROOT}/bin/o2-its-reco-workflow --trackerCA --tracking-mode async ' + getDPL_global_options() \ + putConfigValuesNew(["ITSVertexerParam", "ITSAlpideParam", 'ITSClustererParam'], {"NameConf.mDirMatLUT" : ".."}) - workflow['stages'].append(ITSRECOtask) - - FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[getDigiTaskName("FT0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') + FT0RECOtask = WORKFLOW.add_task('ft0reco', stage_detector_sources=("RECO", "FT0"), needs=[("DIGI", "FT0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') FT0RECOtask['cmd'] = '${O2_ROOT}/bin/o2-ft0-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(FT0RECOtask) - ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name'], FT0RECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', relative_cpu=3/8) + ITSTPCMATCHtask = WORKFLOW.add_task('itstpcMatch', stage_detector_sources=("MATCH", "ITS-TPC"), needs=[("RECO", "ITS"), ("RECO", "TPC"), ("RECO", "FT0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', relative_cpu=3/8) ITSTPCMATCHtask['cmd']= '${O2_ROOT}/bin/o2-tpcits-match-workflow ' + getDPL_global_options(bigshm=True) + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\" --use-ft0' + putConfigValuesNew(['MFTClustererParam', 'ITSCATrackerParam', 'tpcitsMatch', 'TPCGasParam', 'ITSClustererParam'], {"NameConf.mDirMatLUT" : ".."}) - workflow['stages'].append(ITSTPCMATCHtask) - TRDTRACKINGtask = createTask(name='trdreco_'+str(tf), needs=[TRDDigitask['name'], ITSTPCMATCHtask['name'], TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + trd_track_sources = WORKFLOW.make_sources_string_reco_match("TPC,ITS-TPC", tf, anchorConfig.get("o2-trd-global-tracking-options",{}).get("track-sources", None)) + trd_tracking_needs = [("DIGI", "TRD"), ("RECO", "TPC")] + trd_yields_sources = [("RECO", "TRD")] + if "ITS-TPC" in trd_track_sources: + trd_tracking_needs.append(("MATCH", "ITS-TPC")) + trd_yields_sources.append(("MATCH", "TPC-TRD,ITS-TPC-TRD")) + else: + trd_yields_sources.append(("MATCH", "TPC-TRD")) + TRDTRACKINGtask = WORKFLOW.add_task('trdreco', needs=trd_tracking_needs, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') TRDTRACKINGtask['cmd'] = '${O2_ROOT}/bin/o2-trd-tracklet-transformer ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(TRDTRACKINGtask) # FIXME This is so far a workaround to avoud a race condition for trdcalibratedtracklets.root - TRDTRACKINGtask2 = createTask(name='trdreco2_'+str(tf), needs=[TRDTRACKINGtask['name'],MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + trd_track_sources = f"--track-sources {trd_track_sources}" if trd_track_sources else "" + TRDTRACKINGtask2 = WORKFLOW.add_task(name='trdreco2', stage_detector_sources=trd_yields_sources, needs=[TRDTRACKINGtask['name'],MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') TRDTRACKINGtask2['cmd'] = '${O2_ROOT}/bin/o2-trd-global-tracking ' + getDPL_global_options(bigshm=True) \ + putConfigValuesNew(['ITSClustererParam', 'ITSCATrackerParam', 'TPCGasParam'], {"NameConf.mDirMatLUT" : ".."}) \ - + " --track-sources " + anchorConfig.get("o2-trd-global-tracking-options",{}).get("track-sources","all") - workflow['stages'].append(TRDTRACKINGtask2) + + " " + trd_track_sources - TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], getDigiTaskName("TOF")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + TOFRECOtask = WORKFLOW.add_task('tofreco', stage_detector_sources=("RECO", "TOF"), needs=[("DIGI", "TOF")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') TOFRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tof-reco-workflow --use-ccdb ' + getDPL_global_options() + putConfigValuesNew() - workflow['stages'].append(TOFRECOtask) - - toftpcmatchneeds = [TOFRECOtask['name'], TPCRECOtask['name']] - toftracksrcdefault = "TPC,ITS-TPC" - if isActive('TRD'): - toftpcmatchneeds.append(TRDTRACKINGtask2['name']) - toftracksrcdefault+=",TPC-TRD,ITS-TPC-TRD" - TOFTPCMATCHERtask = createTask(name='toftpcmatch_'+str(tf), needs=toftpcmatchneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') + + toftracksrcdefault = WORKFLOW.make_sources_string_reco_match("TPC,ITS-TPC,TPC-TRD,ITS-TPC-TRD", tf, anchorConfig.get("o2-tof-matcher-workflow-options",{}).get("track-sources",None)) + tof_match_needs = [("RECO", "TOF")] + if "TRD" in toftracksrcdefault: + tof_match_needs.append(("RECO", "TRD")) + if "TPC" in toftracksrcdefault: + tof_match_needs.append(("MATCH", "ITS-TPC")) + TOFTPCMATCHERtask = WORKFLOW.add_task('toftpcmatch', stage_detector_sources=("MATCH", "TPC-TOF"), needs=tof_match_needs, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000', condition=toftracksrcdefault) TOFTPCMATCHERtask['cmd'] = '${O2_ROOT}/bin/o2-tof-matcher-workflow ' + getDPL_global_options() \ + putConfigValuesNew(["ITSClustererParam", 'TPCGasParam', 'ITSCATrackerParam', 'MFTClustererParam']) \ - + " --track-sources " + anchorConfig.get("o2-tof-matcher-workflow-options",{}).get("track-sources",toftracksrcdefault) + (' --combine-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(TOFTPCMATCHERtask) + + " --track-sources " + toftracksrcdefault + (' --combine-devices', '')[args.no_combine_dpl_devices] - MFTRECOtask = createTask(name='mftreco_'+str(tf), needs=[getDigiTaskName("MFT")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MFTRECOtask = WORKFLOW.add_task('mftreco', stage_detector_sources=("RECO", "MFT"), needs=[("DIGI", "MFT")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MFTRECOtask['cmd'] = '${O2_ROOT}/bin/o2-mft-reco-workflow ' + getDPL_global_options() + putConfigValuesNew(['MFTTracking', 'MFTAlpideParam', 'ITSClustererParam','MFTClustererParam']) if args.mft_assessment_full == True: MFTRECOtask['cmd']+= ' --run-assessment ' - workflow['stages'].append(MFTRECOtask) # MCH reco: needing access to kinematics ... so some extra logic needed here - mchreconeeds = [getDigiTaskName("MCH")] + mchreconeeds = [("DIGI", "MCH")] if usebkgcache: mchreconeeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] - MCHRECOtask = createTask(name='mchreco_'+str(tf), needs=[getDigiTaskName("MCH")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MCHRECOtask = WORKFLOW.add_task(name='mchreco', stage_detector_sources=("RECO", "MCH"), needs=mchreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MCHRECOtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] MCHRECOtask['cmd'] += '${O2_ROOT}/bin/o2-mch-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(MCHRECOtask) - MIDRECOtask = createTask(name='midreco_'+str(tf), needs=[getDigiTaskName("MID")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MIDRECOtask = WORKFLOW.add_task('midreco', stage_detector_sources=("RECO", "MID"), needs=[("DIGI", "MID")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MIDRECOtask['cmd'] = '${O2_ROOT}/bin/o2-mid-digits-reader-workflow | ${O2_ROOT}/bin/o2-mid-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(MIDRECOtask) - FDDRECOtask = createTask(name='fddreco_'+str(tf), needs=[getDigiTaskName("FDD")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + FDDRECOtask = WORKFLOW.add_task('fddreco', stage_detector_sources=("RECO", "FDD"), needs=[("DIGI", "FDD")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') FDDRECOtask['cmd'] = '${O2_ROOT}/bin/o2-fdd-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(FDDRECOtask) - FV0RECOtask = createTask(name='fv0reco_'+str(tf), needs=[getDigiTaskName("FV0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + FV0RECOtask = WORKFLOW.add_task('fv0reco', stage_detector_sources=("RECO", "FV0"), needs=[("DIGI", "FV0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') FV0RECOtask['cmd'] = '${O2_ROOT}/bin/o2-fv0-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(FV0RECOtask) # calorimeters - EMCRECOtask = createTask(name='emcalreco_'+str(tf), needs=[getDigiTaskName("EMC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + EMCRECOtask = WORKFLOW.add_task('emcalreco', stage_detector_sources=("RECO", "EMC"), needs=[("DIGI", "EMC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') EMCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-emcal-reco-workflow --input-type digits --output-type cells --infile emcaldigits.root ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(EMCRECOtask) - PHSRECOtask = createTask(name='phsreco_'+str(tf), needs=[getDigiTaskName("PHS")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + PHSRECOtask = WORKFLOW.add_task('phsreco', stage_detector_sources=("RECO", "PHS"), needs=[("DIGI", "PHS")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') PHSRECOtask['cmd'] = '${O2_ROOT}/bin/o2-phos-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(PHSRECOtask) - CPVRECOtask = createTask(name='cpvreco_'+str(tf), needs=[getDigiTaskName("CPV")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + CPVRECOtask = WORKFLOW.add_task('cpvreco', stage_detector_sources=("RECO", "CPV"), needs=[("DIGI", "CPV")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') CPVRECOtask['cmd'] = '${O2_ROOT}/bin/o2-cpv-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(CPVRECOtask) - if args.with_ZDC: - ZDCRECOtask = createTask(name='zdcreco_'+str(tf), needs=[getDigiTaskName("ZDC")], tf=tf, cwd=timeframeworkdir, lab=["ZDC"]) - ZDCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-zdc-digits-reco ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(ZDCRECOtask) + ZDCRECOtask = WORKFLOW.add_task('zdcreco', stage_detector_sources=("RECO", "ZDC"), needs=[("DIGI", "ZDC")], tf=tf, cwd=timeframeworkdir, lab=["ZDC"]) + ZDCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-zdc-digits-reco ' + getDPL_global_options() + putConfigValues() ## forward matching - MCHMIDMATCHtask = createTask(name='mchmidMatch_'+str(tf), needs=[MCHRECOtask['name'], MIDRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MCHMIDMATCHtask = WORKFLOW.add_task('mchmidMatch', stage_detector_sources=("MATCH", "MCH-MID"), needs=[("RECO", "MCH"), ("RECO", "MID")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MCHMIDMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-muon-tracks-matcher-workflow ' + getDPL_global_options() - workflow['stages'].append(MCHMIDMATCHtask) - MFTMCHMATCHtask = createTask(name='mftmchMatch_'+str(tf), needs=[MCHMIDMATCHtask['name'], MFTRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MFTMCHMATCHtask = WORKFLOW.add_task('mftmchMatch', stage_detector_sources=("MATCH", "MFT-MCH"), needs=[("RECO", "MCH"), ("RECO", "MFT")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MFTMCHMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValuesNew(['ITSAlpideConfig','MFTAlpideConfig'],{"FwdMatching.useMIDMatch":"true"}) if args.fwdmatching_assessment_full == True: MFTMCHMATCHtask['cmd']+= ' | o2-globalfwd-assessment-workflow ' MFTMCHMATCHtask['cmd']+= getDPL_global_options() - workflow['stages'].append(MFTMCHMATCHtask) - if args.fwdmatching_save_trainingdata == True: - MFTMCHMATCHTraintask = createTask(name='mftmchMatchTrain_'+str(tf), needs=[MCHMIDMATCHtask['name'], MFTRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + if args.fwdmatching_save_trainingdata == True: # TODO This seems to be exactly the same as the previous task + MFTMCHMATCHTraintask = WORKFLOW.add_task(name='mftmchMatchTrain', needs=[("MATCH", "MCH-MID"), ("RECO", "MFT")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MFTMCHMATCHTraintask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValuesNew(['ITSAlpideConfig','MFTAlpideConfig'],{"FwdMatching.useMIDMatch":"true"}) MFTMCHMATCHTraintask['cmd']+= getDPL_global_options() - workflow['stages'].append(MFTMCHMATCHTraintask) - - pvfinderneeds = [ITSTPCMATCHtask['name']] - if isActive('FT0'): - pvfinderneeds += [FT0RECOtask['name']] - if isActive('TOF'): - pvfinderneeds += [TOFTPCMATCHERtask['name']] - if isActive('MFT'): - pvfinderneeds += [MFTRECOtask['name']] - if isActive('MCH'): - pvfinderneeds += [MCHRECOtask['name']] - if isActive('TRD'): - pvfinderneeds += [TRDTRACKINGtask2['name']] - if isActive('FDD'): - pvfinderneeds += [FDDRECOtask['name']] - if isActive('MFT') and isActive('MCH'): - pvfinderneeds += [MFTMCHMATCHtask['name']] - - # Take None as default, we only add more if nothing from anchorConfig - pvfinder_sources = anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertexing-sources", None) - pvfinder_matching_sources = anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertex-track-matching-sources", None) - if not pvfinder_sources: - pvfinder_sources = "ITS,ITS-TPC,ITS-TPC-TRD,ITS-TPC-TOF" - if isActive("MID"): - pvfinder_sources += ",MID" - pvfinderneeds += [MIDRECOtask['name']] - if not pvfinder_matching_sources: - pvfinder_matching_sources = "ITS,MFT,TPC,ITS-TPC,MCH,MFT-MCH,TPC-TOF,TPC-TRD,ITS-TPC-TRD,ITS-TPC-TOF" - if isActive("MID"): - pvfinder_matching_sources += ",MID" - pvfinderneeds += [MIDRECOtask['name']] - - PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000') + + pvfinder_sources = WORKFLOW.make_sources_string_reco_match("ITS,ITS-TPC,ITS-TPC-TRD,ITS-TPC-TOF,MID", tf, anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertexing-sources", None)) + pvfinder_matching_sources = WORKFLOW.make_sources_string_reco_match("ITS,MFT,TPC,ITS-TPC,MCH,MFT-MCH,TPC-TOF,TPC-TRD,ITS-TPC-TRD,ITS-TPC-TOF,MID", tf, anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertex-track-matching-sources", None)) + + PVFINDERtask = WORKFLOW.add_task('pvfinder', needs=[("MATCH", "ANY"), ("RECO", "ANY")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000', condition=pvfinder_sources and pvfinder_matching_sources) PVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-primary-vertexing-workflow ' \ + getDPL_global_options() + putConfigValuesNew(['ITSAlpideParam','MFTAlpideParam', 'pvertexer', 'TPCGasParam'], {"NameConf.mDirMatLUT" : ".."}) - PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(PVFINDERtask) + PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices', '')[args.no_combine_dpl_devices] if includeFullQC or includeLocalQC: def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): - task = createTask(name=taskName + '_local' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000') + task = WORKFLOW.add_task(taskName + '_local', needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000') objectsFile = objectsFile if len(objectsFile) > 0 else taskName + '.root' # the --local-batch argument will make QC Tasks store their results in a file and merge with any existing objects task['cmd'] = f'{readerCommand} | o2-qc --config {configFilePath}' + \ @@ -921,23 +1111,23 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): ' ' + getDPL_global_options() # Prevents this task from being run for multiple TimeFrames at the same time, thus trying to modify the same file. task['semaphore'] = objectsFile - workflow['stages'].append(task) ### MFT # to be enabled once MFT Digits should run 5 times with different configurations for flp in range(5): addQCPerTF(taskName='mftDigitsQC' + str(flp), - needs=[getDigiTaskName("MFT")], + needs=[("DIGI", "MFT")], readerCommand='o2-qc-mft-digits-root-file-reader --mft-digit-infile=mftdigits.root', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-digit-' + str(flp) + '.json', objectsFile='mftDigitsQC.root') + addQCPerTF(taskName='mftClustersQC', - needs=[MFTRECOtask['name']], + needs=[("RECO", "MFT")], readerCommand='o2-global-track-cluster-reader --track-types none --cluster-types MFT', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-cluster.json') addQCPerTF(taskName='mftAsyncQC', - needs=[MFTRECOtask['name']], + needs=[("RECO", "MFT")], readerCommand='o2-global-track-cluster-reader --track-types MFT --cluster-types MFT', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-async.json') @@ -947,42 +1137,41 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): # readerCommand=, # configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-tracking-direct.json') addQCPerTF(taskName='tpcStandardQC', - needs=[TPCRECOtask['name']], - readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --tpc-native-cluster-reader "--infile tpc-native-clusters.root" --input-type clusters,tracks', - # readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --input-type tracks', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-standard-direct.json') + needs=[("RECO", "TPC")], + readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --tpc-native-cluster-reader "--infile tpc-native-clusters.root" --input-type clusters,tracks', + # readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --input-type tracks', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-standard-direct.json') ### TRD addQCPerTF(taskName='trdDigitsQC', - needs=[TRDDigitask['name']], + needs=[("DIGI", "TRD")], readerCommand='o2-trd-trap-sim', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/trd-digits-task.json') ### TOF addQCPerTF(taskName='tofDigitsQC', - needs=[getDigiTaskName("TOF")], + needs=[("DIGI", "TOF")], readerCommand='${O2_ROOT}/bin/o2-tof-reco-workflow --input-type digits --output-type none', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofdigits.json', objectsFile='tofDigitsQC.root') addQCPerTF(taskName='tofft0PIDQC', - needs=[ITSTPCMATCHtask['name'], TOFRECOtask['name'], FT0RECOtask['name']], + needs=[("MATCH", "ITS-TPC"), ("RECO", "TOF"), ("RECO", "FT0")], readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC,TPC,ITS-TPC-TOF,TPC-TOF" --cluster-types FT0', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/pidft0tof.json') addQCPerTF(taskName='tofPIDQC', - needs=[ITSTPCMATCHtask['name'], TOFRECOtask['name']], + needs=[("MATCH", "ITS-TPC"), ("RECO", "TOF")], readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC,TPC,ITS-TPC-TOF,TPC-TOF" --cluster-types none', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/pidtof.json') ### EMCAL - if isActive('EMC'): - addQCPerTF(taskName='emcDigitsQC', - needs=[EMCRECOtask['name']], - readerCommand='o2-emcal-cell-reader-workflow --infile emccells.root', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/emc-digits-task.json') + addQCPerTF(taskName='emcDigitsQC', + needs=[("RECO", "EMC")], + readerCommand='o2-emcal-cell-reader-workflow --infile emccells.root', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/emc-digits-task.json') ### FT0 addQCPerTF(taskName='RecPointsQC', - needs=[FT0RECOtask['name']], + needs=[("RECO", "FT0")], readerCommand='o2-ft0-recpoints-reader-workflow --infile o2reco_ft0.root', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/ft0-reconstruction-config.json') @@ -992,22 +1181,20 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): readerCommand='o2-primary-vertex-reader-workflow', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/vertexing-qc-direct-mc.json') addQCPerTF(taskName='ITSTPCmatchQC', - needs=[ITSTPCMATCHtask['name']], + needs=[("MATCH", "ITS-TPC")], readerCommand='o2-global-track-cluster-reader --track-types "TPC,ITS-TPC"', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/ITSTPCmatchedTracks_direct_MC.json') - if isActive('TOF'): - addQCPerTF(taskName='TOFMatchQC', - needs=[TOFTPCMATCHERtask['name']], - readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC" --cluster-types none', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_ITSTPCTOF_TPCTOF_direct_MC.json') - if isActive('TOF') and isActive('TRD'): - addQCPerTF(taskName='TOFMatchWithTRDQC', - needs=[TOFTPCMATCHERtask['name']], - readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC,ITS-TPC-TRD,ITS-TPC-TRD-TOF,TPC-TRD,TPC-TRD-TOF" --cluster-types none', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_AllTypes_direct_MC.json') + addQCPerTF(taskName='TOFMatchQC', + needs=[("MATCH", "ITS-TPC"), ("MATCH", "TPC-TOF")], + readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC" --cluster-types none', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_ITSTPCTOF_TPCTOF_direct_MC.json') + addQCPerTF(taskName='TOFMatchWithTRDQC', + needs=[("MATCH", "TPC-TOF"), ("MATCH", "ITS-TPC"), ("RECO", "TRD")], + readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC,ITS-TPC-TRD,ITS-TPC-TRD-TOF,TPC-TRD,TPC-TRD-TOF" --cluster-types none', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_AllTypes_direct_MC.json') ### ITS addQCPerTF(taskName='ITSTrackSimTask', - needs=[ITSRECOtask['name']], + needs=[("RECO", "ITS")], readerCommand='o2-global-track-cluster-reader --track-types "ITS" --cluster-types "ITS"', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/its-mc-tracks-qc.json') @@ -1017,49 +1204,24 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): if COLTYPE == "PbPb" or (doembedding and COLTYPEBKG == "PbPb"): svfinder_threads = ' --threads 3 ' svfinder_cpu = 3 - SVFINDERtask = createTask(name='svfinder_'+str(tf), needs=[PVFINDERtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000') + svfinder_sources = WORKFLOW.make_sources_string_reco_match("ITS,ITS-TPC,TPC-TRD,TPC-TOF,ITS-TPC-TRD,ITS-TPC-TOF,MID", tf, anchorConfig.get("o2-secondary-vertexing-workflow-options",{}).get("vertexing-sources", None)) + SVFINDERtask = WORKFLOW.add_task('svfinder', needs=[PVFINDERtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000', condition=svfinder_sources) SVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-secondary-vertexing-workflow ' SVFINDERtask['cmd'] += getDPL_global_options(bigshm=True) + svfinder_threads + putConfigValuesNew(['svertexer'], {"NameConf.mDirMatLUT" : ".."}) - # Take None as default, we only add more if nothing from anchorConfig - svfinder_sources = anchorConfig.get("o2-secondary-vertexing-workflow-options",{}).get("vertexing-sources", None) - if not svfinder_sources: - svfinder_sources = "ITS,ITS-TPC,TPC-TRD,TPC-TOF,ITS-TPC-TRD,ITS-TPC-TOF" - if isActive("MID"): - svfinder_sources += ",MID" - SVFINDERtask['cmd'] += ' --vertexing-sources ' + svfinder_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(SVFINDERtask) + SVFINDERtask['cmd'] += ' --vertexing-sources ' + svfinder_sources + (' --combine-source-devices', '')[args.no_combine_dpl_devices] # ----------- # produce AOD # ----------- - # TODO This needs further refinement, sources and dependencies should be constructed dynamically - aodinfosources = 'ITS,MFT,MCH,TPC,ITS-TPC,MFT-MCH,ITS-TPC-TOF,TPC-TOF,FT0,FDD,CTP,TPC-TRD,ITS-TPC-TRD,EMC' - aodneeds = [PVFINDERtask['name'], SVFINDERtask['name']] - if isActive('FV0'): - aodneeds += [ FV0RECOtask['name'] ] - aodinfosources += ',FV0' - if isActive('TOF'): - aodneeds += [ TOFRECOtask['name'] ] - if isActive('TRD'): - aodneeds += [ TRDTRACKINGtask2['name'] ] - if isActive('EMC'): - aodneeds += [ EMCRECOtask['name'] ] - if isActive('CPV'): - aodneeds += [ CPVRECOtask['name'] ] - if isActive('PHS'): - aodneeds += [ PHSRECOtask['name'] ] - if isActive('MID'): - aodneeds += [ MIDRECOtask['name'] ] - aodinfosources += ',MID' - if args.with_ZDC and isActive('ZDC'): - aodneeds += [ ZDCRECOtask['name'] ] - aodinfosources += ',ZDC' + aodneeds = [PVFINDERtask['name'], SVFINDERtask['name'], ("DIGI", "CTP"), ("RECO", "ANY"), ("MATCH", "ANY")] if usebkgcache: - aodneeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] + aodneeds.append(BKG_KINEDOWNLOADER_TASK['name']) + aodinfosources = WORKFLOW.make_sources_string_reco_match('ITS,MFT,MCH,TPC,ITS-TPC,MFT-MCH,ITS-TPC-TOF,TPC-TOF,FT0,FDD,TPC-TRD,ITS-TPC-TRD,EMC,FV0,MID,ZDC', tf, anchorConfig.get("o2-aod-producer-workflow-options",{}).get("info-sources", None)) + AODtask = WORKFLOW.add_task('aod', needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1', condition=aodinfosources) + aodinfosources += ",CTP" aod_df_id = '{0:03}'.format(tf) - AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1') AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; ${O2_ROOT}/bin/o2-aod-producer-workflow --reco-mctracks-only 1 --aod-writer-keep dangling --aod-writer-resfile AO2D' # next line needed for meta data writing (otherwise lost) @@ -1069,7 +1231,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): if args.run_anchored == False: AODtask['cmd'] += ' --aod-timeframe-id ${ALIEN_PROC_ID}' + aod_df_id AODtask['cmd'] += ' ' + getDPL_global_options(bigshm=True) - AODtask['cmd'] += ' --info-sources ' + anchorConfig.get("o2-aod-producer-workflow-options",{}).get("info-sources",str(aodinfosources)) + AODtask['cmd'] += ' --info-sources ' + aodinfosources AODtask['cmd'] += ' --lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}' AODtask['cmd'] += ' --anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}' AODtask['cmd'] += ' --anchor-prod ${ALIEN_JDL_MCANCHOR:-unknown}' @@ -1077,8 +1239,6 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): if environ.get('O2DPG_AOD_NOTRUNCATE') != None or environ.get('ALIEN_JDL_O2DPG_AOD_NOTRUNCATE') != None: AODtask['cmd'] += ' --enable-truncation 0' # developer option to suppress precision truncation - workflow['stages'].append(AODtask) - # AOD merging / combination step (as individual stages) --> for the moment deactivated in favor or more stable global merging """ aodmergerneeds = [ AODtask['name'] ] @@ -1107,30 +1267,31 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): # taking away digits, clusters and other stuff as soon as possible. # TODO: cleanup by labels or task names if args.early_tf_cleanup == True: - TFcleanup = createTask(name='tfcleanup_'+str(tf), needs= [ AOD_merge_task['name'] ], tf=tf, cwd=timeframeworkdir, lab=["CLEANUP"], mem='0', cpu='1') + TFcleanup = WORKFLOW.add_task('tfcleanup', needs=[AOD_merge_task['name']], tf=tf, cwd=timeframeworkdir, lab=["CLEANUP"], mem='0', cpu='1') TFcleanup['cmd'] = 'rm *digi*.root;' TFcleanup['cmd'] += 'rm *cluster*.root' - workflow['stages'].append(TFcleanup); + WORKFLOW.workflow.append(TFcleanup); # AOD merging as one global final step aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)] -AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1') +AOD_merge_task = WORKFLOW.add_task('aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1') AOD_merge_task['cmd'] = ' [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; ' AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; ' AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root' -workflow['stages'].append(AOD_merge_task) job_merging = False if includeFullQC: - workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag)) + WORKFLOW.add_predefined_tasks(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag)) if includeAnalysis: # include analyses and potentially final QC upload tasks - add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True) + analysis_tasks = [] + add_analysis_tasks(analysis_tasks, needs=[AOD_merge_task["name"]], is_mc=True) if QUALITYCONTROL_ROOT: - add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC") - -dump_workflow(workflow["stages"], args.o) + add_analysis_qc_upload_tasks(analysis_tasks, args.productionTag, args.run, "passMC") + WORKFLOW.add_predefined_tasks(analysis_tasks) + +dump_workflow(WORKFLOW.workflow, args.o) exit (0)