From 1416be042835c12526f8ef444d5108183401bae9 Mon Sep 17 00:00:00 2001 From: Juraj Smiesko Date: Thu, 26 Oct 2023 15:33:21 +0200 Subject: [PATCH] Adding test files per process for histmaker --- e4hsource/test/histmaker_source.py | 8 +- man/man1/fccanalysis-run.1 | 4 +- man/man1/fccanalysis.1 | 2 +- ...ccanalysis-file.7 => fccanalysis-script.7} | 31 +- python/FCCAnalysisRun.py | 272 +++++++++--------- python/process.py | 27 +- 6 files changed, 181 insertions(+), 163 deletions(-) rename man/man7/{fccanalysis-file.7 => fccanalysis-script.7} (82%) diff --git a/e4hsource/test/histmaker_source.py b/e4hsource/test/histmaker_source.py index 476e1017217..07535a6480f 100644 --- a/e4hsource/test/histmaker_source.py +++ b/e4hsource/test/histmaker_source.py @@ -1,6 +1,9 @@ # list of processes (mandatory) processList = { - 'p8_ee_WW_ecm240': {'output': 'p8_ee_WW_ecm240_out'} + 'p8_ee_WW_ecm240': {'output': 'p8_ee_WW_ecm240_out', + 'testfile': '/eos/experiment/fcc/ee/generation/' + 'DelphesEvents/winter2023/IDEA/' + 'p8_ee_WW_ecm240/events_192112516.root'} } # Production tag when running over EDM4Hep centrally produced events, this @@ -24,9 +27,6 @@ # How to read input files useDataSource = True -testFile = '/eos/experiment/fcc/ee/generation/DelphesEvents/winter2023/' \ - 'IDEA/p8_ee_WW_ecm240/events_192112516.root' - # build_graph function that contains the analysis logic, cuts and histograms # (mandatory) def build_graph(df, dataset): diff --git a/man/man1/fccanalysis-run.1 b/man/man1/fccanalysis-run.1 index a50ee6c785d..39e1a1be490 100644 --- a/man/man1/fccanalysis-run.1 +++ b/man/man1/fccanalysis-run.1 @@ -22,7 +22,7 @@ stages \fIfccanalysis-run\fR is used\&. When using \fIfccanalysis-run\fR the analysis is running in the managed mode, where the RDataFrame provided is steered by the framework and users can control some aspects of the running with additional global attributes, see -\fBfccanalysis-file\fR(8). +\fBfccanalysis-script\fR(7). .SH OPTIONS .PP @@ -118,7 +118,7 @@ Controls search path for the process dictionaries. The default value is \fI/cvmfs/fcc.cern.ch/FCCDicts/\fR\&. .RE .SH SEE ALSO -fccanalysis(1), fccanalysis-file(7) +fccanalysis(1), fccanalysis-script(7) .SH BUGS Many .SH AUTHORS diff --git a/man/man1/fccanalysis.1 b/man/man1/fccanalysis.1 index 6d380fabe4f..c657f3d3557 100644 --- a/man/man1/fccanalysis.1 +++ b/man/man1/fccanalysis.1 @@ -68,7 +68,7 @@ Runs the finalization analysis file provided\&. Generate plots based on the plots analysis file provided\&. .RE .SH SEE ALSO -fccanalysis-run(1), fccanalysis-file(7) +fccanalysis-run(1), fccanalysis-script(7) .SH BUGS Many .SH AUTHORS diff --git a/man/man7/fccanalysis-file.7 b/man/man7/fccanalysis-script.7 similarity index 82% rename from man/man7/fccanalysis-file.7 rename to man/man7/fccanalysis-script.7 index d7c89ef10a1..dfbe018a19f 100644 --- a/man/man7/fccanalysis-file.7 +++ b/man/man7/fccanalysis-script.7 @@ -1,34 +1,34 @@ -.\" Manpage for fccanalysis-file +.\" Manpage for fccanalysis-script .\" Contact fcc-experiments-sw-dev@cern.ch to correct errors or typos. -.TH FCCANALYSIS\-FILE 7 "15 Aug 2023" "0.7.0" "fccanalysis-file man page" +.TH FCCANALYSIS\-SCRIPT 7 "26 Oct 2023" "0.7.0" "fccanalysis-script man page" .SH NAME -fccanalysis\-file \- analysis file specification +fccanalysis\-script \- analysis script specification .SH SYNOPSIS .sp * .sp .SH DESCRIPTION -The analysis file is expected to be a valid Python script containing either -part of or the full analysis. There are two basic modes how to run a FCC +The analysis script is expected to be a valid Python script containing either +part of or the full analysis. There are two basic modes how to run the FCC analysis, one is to run in the managed mode like so: .PP .RS 4 -fcc run +fcc run .RE .PP or .PP .RS 4 -fcc final +fcc final .RE .PP where user needs to provide minimal number of variables and settings. In this mode the RDataFrame is managed for the user and it can be controlled by defining -several global attributes in the analysis file. The other mode is to run the -analysis file as regular python script: +several global attributes in the analysis script. The other mode is to run the +analysis script as regular python script: .PP .RS 4 -python +python .RE .PP here user has full control over the RDataFrame, but has to create all necessary @@ -47,7 +47,7 @@ indicated in the $FCCDICTSDIR environment variable. .PP \fBprodTag\fR .RS 4 -Provides information where to find input files. There are several way how to +Provides information where to find input files. There are several ways how to find the information, one of them uses YAML file which is being searched for in the subfolders of $FCCDICTSDIR. .RE @@ -78,6 +78,13 @@ The analysis RDataFrame can be split into several chunks\&. .br Default value: 1 .RE +\fItestfile\fR +.RS 4 +Specifies the test file which can be used in test mode, see +fccanalysis-run(1)\&. +.br +Default value: None +.RE .RE .PP \fBoutputDir\fR @@ -105,7 +112,7 @@ Default value: False .PP This section is under construction. You are invited to help :) .SH SEE ALSO -fccanalysis(1), fccanalysis-run(1) +fccanalysis(1), \fBfccanalysis-run\fR(1) .SH BUGS Many .SH AUTHORS diff --git a/python/FCCAnalysisRun.py b/python/FCCAnalysisRun.py index ad7fe5ff23c..b305e161f8e 100644 --- a/python/FCCAnalysisRun.py +++ b/python/FCCAnalysisRun.py @@ -12,14 +12,14 @@ import numpy as np from anafile import getElement, getElementDict -from process import getProcessInfo, get_process_dict +from process import getProcessInfo, get_process_dict, get_entries LOGGER = logging.getLogger('FCCAnalyses.run') # __________________________________________________________ -def get_entries(infilepath): +def get_processed_entries(infilepath): ''' Get number of original entries and number of actual entries in the file ''' @@ -30,8 +30,8 @@ def get_entries(infilepath): try: processEvents = infile.Get('eventsProcessed').GetVal() except AttributeError: - LOGGER.warning('Input file is missing information about ' - 'original number of events!') + LOGGER.debug('Input file is missing information about original number ' + 'of events!') eventsTTree = 0 try: @@ -134,12 +134,12 @@ def SubmitToCondor(cmd,nbtrials): return 0 # __________________________________________________________ -def initialize(args, rdfModule, analysisFile): +def initialize(args, rdf_module, anascript_path): # for convenience and compatibility with user code ROOT.gInterpreter.Declare("using namespace FCCAnalyses;") - geometryFile = getElement(rdfModule, "geometryFile") - readoutName = getElement(rdfModule, "readoutName") + geometryFile = getElement(rdf_module, "geometryFile") + readoutName = getElement(rdf_module, "readoutName") if geometryFile!="" and readoutName!="": ROOT.CaloNtupleizer.loadGeometry(geometryFile, readoutName) @@ -149,7 +149,7 @@ def initialize(args, rdfModule, analysisFile): if isinstance(args.ncpus, int) and args.ncpus >= 1: ncpus = args.ncpus else: - ncpus = getElement(rdfModule, "nCPUS") + ncpus = getElement(rdf_module, "nCPUS") if ncpus < 0: # use all available threads ROOT.EnableImplicitMT() ncpus = ROOT.GetThreadPoolSize() @@ -163,17 +163,17 @@ def initialize(args, rdfModule, analysisFile): LOGGER.info('No multithreading enabled. Running in single thread...') # custom header files - includePaths = getElement(rdfModule, "includePaths") + includePaths = getElement(rdf_module, "includePaths") if includePaths: ROOT.gInterpreter.ProcessLine(".O3") - basepath = os.path.dirname(os.path.abspath(analysisFile))+"/" + basepath = os.path.dirname(os.path.abspath(anascript_path))+"/" for path in includePaths: LOGGER.info('Loading %s...', path) ROOT.gInterpreter.Declare(f'#include "{basepath}/{path}"') # check if analyses plugins need to be loaded before anything # still in use? - analysesList = getElement(rdfModule, "analysesList") + analysesList = getElement(rdf_module, "analysesList") if analysesList and len(analysesList) > 0: _ana = [] for analysis in analysesList: @@ -189,7 +189,7 @@ def initialize(args, rdfModule, analysisFile): _ana.append(getattr(ROOT, analysis).dictionary) # __________________________________________________________ -def runRDF(rdfModule, inputlist, outFile, nevt, args): +def runRDF(rdf_module, inputlist, outFile, nevt, args): if args.use_data_source: print('----> Info: Loading events through EDM4hep RDataSource...') ROOT.gSystem.Load("libe4hsource") @@ -223,10 +223,10 @@ def runRDF(rdfModule, inputlist, outFile, nevt, args): dframe = dframe.Range(0, args.nevents) try: - dframe2 = getElement(rdfModule.RDFanalysis, "analysers")(dframe) + dframe2 = getElement(rdf_module.RDFanalysis, "analysers")(dframe) branch_list = ROOT.vector('string')() - blist = getElement(rdfModule.RDFanalysis, "output")() + blist = getElement(rdf_module.RDFanalysis, "output")() for bname in blist: branch_list.push_back(bname) # Registering Count before Snapshot to awoid additional event loops @@ -241,7 +241,7 @@ def runRDF(rdfModule, inputlist, outFile, nevt, args): # __________________________________________________________ -def sendToBatch(rdfModule, chunkList, process, analysisFile): +def sendToBatch(rdf_module, chunkList, process, anascript_path): localDir = os.environ["LOCAL_DIR"] current_date = datetime.datetime.fromtimestamp( datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') @@ -260,10 +260,10 @@ def sendToBatch(rdfModule, chunkList, process, analysisFile): 'installed!\nAborting job submission...') sys.exit(3) - outputDir = getElement(rdfModule, "outputDir") - outputDirEos = getElement(rdfModule, "outputDirEos") - eosType = getElement(rdfModule, "eosType") - userBatchConfig = getElement(rdfModule, "userBatchConfig") + outputDir = getElement(rdf_module, "outputDir") + outputDirEos = getElement(rdf_module, "outputDirEos") + eosType = getElement(rdf_module, "eosType") + userBatchConfig = getElement(rdf_module, "userBatchConfig") if outputDir!="" and outputDir[-1]!="/": outputDir+="/" @@ -299,9 +299,9 @@ def sendToBatch(rdfModule, chunkList, process, analysisFile): frun.write('cd job{}_chunk{}\n'.format(process,ch)) if not os.path.isabs(outputDir): - frun.write(localDir + '/bin/fccanalysis run {} --batch --output {}chunk{}.root --files-list '.format(analysisFile, outputDir, ch)) + frun.write(localDir + '/bin/fccanalysis run {} --batch --output {}chunk{}.root --files-list '.format(anascript_path, outputDir, ch)) else: - frun.write(localDir + '/bin/fccanalysis run {} --batch --output {}{}/chunk{}.root --files-list '.format(analysisFile, outputDir, process,ch)) + frun.write(localDir + '/bin/fccanalysis run {} --batch --output {}{}/chunk{}.root --files-list '.format(anascript_path, outputDir, process,ch)) for ff in range(len(chunkList[ch])): frun.write(' %s'%(chunkList[ch][ff])) @@ -338,9 +338,9 @@ def sendToBatch(rdfModule, chunkList, process, analysisFile): frun_condor.write('requirements = ( (OpSysAndVer =?= "CentOS7") && (Machine =!= LastRemoteHost) && (TARGET.has_avx2 =?= True) )\n') frun_condor.write('on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)\n') frun_condor.write('max_retries = 3\n') - frun_condor.write('+JobFlavour = "{}"\n'.format(getElement(rdfModule, "batchQueue"))) - frun_condor.write('+AccountingGroup = "{}"\n'.format(getElement(rdfModule, "compGroup"))) - frun_condor.write('RequestCpus = {}\n'.format(getElement(rdfModule, "nCPUS"))) + frun_condor.write('+JobFlavour = "{}"\n'.format(getElement(rdf_module, "batchQueue"))) + frun_condor.write('+AccountingGroup = "{}"\n'.format(getElement(rdf_module, "compGroup"))) + frun_condor.write('RequestCpus = {}\n'.format(getElement(rdf_module, "nCPUS"))) frun_condor.write('queue filename matching files {}\n'.format(condor_file_str)) frun_condor.close() @@ -378,18 +378,18 @@ def apply_filepath_rewrites(filepath): # __________________________________________________________ -def runLocal(rdfModule, infile_list, args): +def runLocal(rdf_module, infile_list, args): # Check whether to use RDataSource to load the events - use_data_source = getElement(rdfModule, "useDataSource") + use_data_source = getElement(rdf_module, "useDataSource") if use_data_source: args.use_data_source = True - use_legacy_source = getElement(rdfModule, "useLegacyDataSource") + use_legacy_source = getElement(rdf_module, "useLegacyDataSource") if use_legacy_source: args.use_legacy_source = True # Create list of files to be processed - file_list = ROOT.vector('string')() info_msg = 'Creating dataframe object from files:\n\t' + file_list = ROOT.vector('string')() nevents_orig = 0 # Amount of events processed in previous stage (= 0 if it is the first stage) nevents_local = 0 # The amount of events in the input file(s) for filepath in infile_list: @@ -425,7 +425,7 @@ def runLocal(rdfModule, infile_list, args): else: LOGGER.info('Number of local events: %s', f'{nevents_local:,}') - output_dir = getElement(rdfModule, "outputDir") + output_dir = getElement(rdf_module, "outputDir") if not args.batch: if os.path.isabs(args.output): LOGGER.warning('Provided output path is absolute, "outputDir" ' @@ -437,7 +437,7 @@ def runLocal(rdfModule, infile_list, args): # Run RDF start_time = time.time() - outn = runRDF(rdfModule, file_list, outfile_path, nevents_local, args) + outn = runRDF(rdf_module, file_list, outfile_path, nevents_local, args) outn = outn.GetValue() outfile = ROOT.TFile(outfile_path, 'update') @@ -466,7 +466,7 @@ def runLocal(rdfModule, infile_list, args): LOGGER.info(info_msg) if args.bench: - analysis_name = getElement(rdfModule, 'analysisName') + analysis_name = getElement(rdf_module, 'analysisName') if not analysis_name: analysis_name = args.anafile_path @@ -489,22 +489,22 @@ def runLocal(rdfModule, infile_list, args): saveBenchmark('benchmarks_bigger_better.json', bench_evt_per_sec) -#__________________________________________________________ -def runStages(args, rdfModule, preprocess, analysisFile): +# __________________________________________________________ +def runStages(args, rdf_module, preprocess, anascript_path): ''' Run regular stage. ''' # Set ncpus, load header files, custom dicts, ... - initialize(args, rdfModule, analysisFile) + initialize(args, rdf_module, anascript_path) # Check if outputDir exist and if not create it - outputDir = getElement(rdfModule, "outputDir") + outputDir = getElement(rdf_module, "outputDir") if not os.path.exists(outputDir) and outputDir: os.system("mkdir -p {}".format(outputDir)) # Check if outputDir exist and if not create it - outputDirEos = getElement(rdfModule,"outputDirEos") + outputDirEos = getElement(rdf_module,"outputDirEos") if not os.path.exists(outputDirEos) and outputDirEos: os.system("mkdir -p {}".format(outputDirEos)) @@ -512,11 +512,11 @@ def runStages(args, rdfModule, preprocess, analysisFile): # will exit after) if args.test: LOGGER.info('Running over test file...') - testfile_path = getElement(rdfModule, "testFile") + testfile_path = getElement(rdf_module, "testFile") directory, _ = os.path.split(args.output) if directory: os.system("mkdir -p {}".format(directory)) - runLocal(rdfModule, [testfile_path], args) + runLocal(rdf_module, [testfile_path], args) sys.exit(0) # Check if files are specified, and if so run the analysis on it/them (this @@ -526,20 +526,20 @@ def runStages(args, rdfModule, preprocess, analysisFile): directory, _ = os.path.split(args.output) if directory: os.system(f'mkdir -p {directory}') - runLocal(rdfModule, args.files_list, args) + runLocal(rdf_module, args.files_list, args) sys.exit(0) # Check if batch mode and set start and end file from original list - run_batch = getElement(rdfModule, "runBatch") + run_batch = getElement(rdf_module, "runBatch") # Check if the process list is specified - process_list = getElement(rdfModule, "processList") + process_list = getElement(rdf_module, "processList") for process_name in process_list: file_list, event_list = getProcessInfo( process_name, - getElement(rdfModule, "prodTag"), - getElement(rdfModule, "inputDir")) + getElement(rdf_module, "prodTag"), + getElement(rdf_module, "inputDir")) if len(file_list) <= 0: LOGGER.error('No files to process!\nAborting...') @@ -588,18 +588,18 @@ def runStages(args, rdfModule, preprocess, analysisFile): LOGGER.warning('\033[4m\033[1m\033[91mRunning on batch with ' 'only one chunk might not be optimal\033[0m') - sendToBatch(rdfModule, chunk_list, process_name, analysisFile) + sendToBatch(rdf_module, chunk_list, process_name, anascript_path) else: # Running locally LOGGER.info('Running locally...') if len(chunk_list) == 1: args.output = '{}.root'.format(output_stem) - runLocal(rdfModule, chunk_list[0], args) + runLocal(rdf_module, chunk_list[0], args) else: for index, chunk in enumerate(chunk_list): args.output = '{}/chunk{}.root'.format(output_stem, index) - runLocal(rdfModule, chunk, args) + runLocal(rdf_module, chunk, args) # __________________________________________________________ @@ -626,20 +626,20 @@ def testfile(f): # __________________________________________________________ -def runFinal(rdfModule): - proc_dict_location = getElement(rdfModule, "procDict", True) +def runFinal(rdf_module): + proc_dict_location = getElement(rdf_module, "procDict", True) if not proc_dict_location: LOGGER.error('Location of the procDict not provided!\nAborting...') sys.exit(3) procDict = get_process_dict(proc_dict_location) - procDictAdd = getElement(rdfModule, "procDictAdd", True) + procDictAdd = getElement(rdf_module, "procDictAdd", True) for procAdd in procDictAdd: if getElementDict(procDict, procAdd) == None: procDict[procAdd] = procDictAdd[procAdd] - ROOT.ROOT.EnableImplicitMT(getElement(rdfModule, "nCPUS", True)) + ROOT.ROOT.EnableImplicitMT(getElement(rdf_module, "nCPUS", True)) nevents_real = 0 start_time = time.time() @@ -650,7 +650,7 @@ def runFinal(rdfModule): saveTab=[] efficiencyList=[] - inputDir = getElement(rdfModule,"inputDir", True) + inputDir = getElement(rdf_module,"inputDir", True) if not inputDir: LOGGER.error('The inputDir variable is mandatory for the final stage ' 'of the analysis!\nAborting...') @@ -658,19 +658,19 @@ def runFinal(rdfModule): if inputDir[-1]!="/":inputDir+="/" - outputDir = getElement(rdfModule,"outputDir", True) + outputDir = getElement(rdf_module,"outputDir", True) if outputDir!="": if outputDir[-1]!="/":outputDir+="/" if not os.path.exists(outputDir) and outputDir!='': os.system("mkdir -p {}".format(outputDir)) - cutList = getElement(rdfModule,"cutList", True) + cutList = getElement(rdf_module,"cutList", True) length_cuts_names = max([len(cut) for cut in cutList]) - cutLabels = getElement(rdfModule,"cutLabels", True) + cutLabels = getElement(rdf_module,"cutLabels", True) # save a table in a separate tex file - saveTabular = getElement(rdfModule,"saveTabular", True) + saveTabular = getElement(rdf_module,"saveTabular", True) if saveTabular: # option to rewrite the cuts in a better way for the table. otherwise, take them from the cutList if cutLabels: @@ -682,7 +682,7 @@ def runFinal(rdfModule): saveTab.append(cutNames) efficiencyList.append(cutNames) - for process_id in getElement(rdfModule, "processList", True): + for process_id in getElement(rdf_module, "processList", True): processEvents[process_id] = 0 eventsTTree[process_id] = 0 @@ -694,7 +694,7 @@ def runFinal(rdfModule): infilepath) else: LOGGER.info('Open file:\n\t%s', infilepath) - processEvents[process_id], eventsTTree[process_id] = get_entries(infilepath) + processEvents[process_id], eventsTTree[process_id] = get_processed_entries(infilepath) fileListRoot.push_back(infilepath) indirpath = inputDir + process_id @@ -703,7 +703,7 @@ def runFinal(rdfModule): flist = glob.glob(indirpath + '/chunk*.root') for filepath in flist: info_msg += '\n\t' + filepath - chunkProcessEvents, chunkEventsTTree = get_entries(filepath) + chunkProcessEvents, chunkEventsTTree = get_processed_entries(filepath) processEvents[process_id] += chunkProcessEvents eventsTTree[process_id] += chunkEventsTTree fileListRoot.push_back(filepath) @@ -719,12 +719,12 @@ def runFinal(rdfModule): info_msg += f'\n\t- {process_id}: {n_events:,}' LOGGER.info(info_msg) - histoList = getElement(rdfModule, "histoList", True) - doScale = getElement(rdfModule, "doScale", True) - intLumi = getElement(rdfModule, "intLumi", True) + histoList = getElement(rdf_module, "histoList", True) + doScale = getElement(rdf_module, "doScale", True) + intLumi = getElement(rdf_module, "intLumi", True) - doTree = getElement(rdfModule, "doTree", True) - for pr in getElement(rdfModule, "processList", True): + doTree = getElement(rdf_module, "doTree", True) + for pr in getElement(rdf_module, "processList", True): LOGGER.info('Running over process: %s', pr) if processEvents[pr] == 0: @@ -734,7 +734,7 @@ def runFinal(rdfModule): RDF = ROOT.ROOT.RDataFrame df = RDF("events", processList[pr]) - defineList = getElement(rdfModule,"defineList", True) + defineList = getElement(rdf_module,"defineList", True) if len(defineList) > 0: LOGGER.info('Registering extra DataFrame defines...') for define in defineList: @@ -918,13 +918,13 @@ def runFinal(rdfModule): LOGGER.info(info_msg) -def runHistmaker(args, rdfModule, analysisFile): +def runHistmaker(args, rdf_module, anascript_path): # set ncpus, load header files, custom dicts, ... - initialize(args, rdfModule, analysisFile) + initialize(args, rdf_module, anascript_path) # load process dictionary - proc_dict_location = getElement(rdfModule, "procDict", True) + proc_dict_location = getElement(rdf_module, "procDict", True) if not proc_dict_location: LOGGER.error('Location of the procDict not provided.\nAborting...') sys.exit(3) @@ -932,83 +932,75 @@ def runHistmaker(args, rdfModule, analysisFile): procDict = get_process_dict(proc_dict_location) # check if outputDir exist and if not create it - outputDir = getElement(rdfModule,"outputDir") + outputDir = getElement(rdf_module,"outputDir") if not os.path.exists(outputDir) and outputDir!='': os.system("mkdir -p {}".format(outputDir)) - doScale = getElement(rdfModule,"doScale", True) - intLumi = getElement(rdfModule,"intLumi", True) + doScale = getElement(rdf_module,"doScale", True) + intLumi = getElement(rdf_module,"intLumi", True) # check if the process list is specified, and create graphs for them - processList = getElement(rdfModule,"processList") - graph_function = getattr(rdfModule, "build_graph") + processList = getElement(rdf_module,"processList") + graph_function = getattr(rdf_module, "build_graph") results = [] # all the histograms hweights = [] # all the weights evtcounts = [] # event count of the input file eventsProcessedDict = {} # number of events processed per process, in a potential previous step # Check whether to load events through datasource - use_legacy_source = getElement(rdfModule, 'useLegacyDataSource') + use_legacy_source = getElement(rdf_module, 'useLegacyDataSource') if use_legacy_source: args.use_legacy_source = True - use_data_source = getElement(rdfModule, 'useDataSource') + use_data_source = getElement(rdf_module, 'useDataSource') if use_data_source: args.use_data_source = True - for process in processList: - prod_tag = getElement(rdfModule, 'prodTag') - input_dir = getElement(rdfModule, 'inputDir') - fileList, eventList = getProcessInfo(process, prod_tag, input_dir) + for process_name, process_dict in processList.items(): + fileList = [] + eventList = [] + if args.test and getElementDict(process_dict, 'testfile'): + testfile_path = getElementDict(process_dict, 'testfile') + fileList = [testfile_path] + eventList = [get_entries(testfile_path)] + LOGGER.info('Will run over 100 events from the following test ' + 'file:\n\t%s', testfile_path) + + else: + fileList, eventList = getProcessInfo( + process_name, + getElement(rdf_module, "prodTag"), + getElement(rdf_module, "inputDir")) if len(fileList) == 0: LOGGER.error('No files to process!\nAborting...') sys.exit(3) - if args.test: - testFile = getElement(rdfModule, "testFile") - if testFile: - print('----> Info: Will use 100 events from the following ' - 'test file:') - print(' ' + testFile) - fileList = [testFile] - eventList = [100] - - # get the number of events processed, in a potential previous step - fileListRoot = ROOT.vector('string')() - nevents_meta = 0 # amount of events processed in previous stage (= 0 if it is the first stage) - for fileName in fileList: - if not args.use_data_source and not args.use_legacy_source: - fileName = apply_filepath_rewrites(fileName) - fileListRoot.push_back(fileName) - tf=ROOT.TFile.Open(str(fileName),"READ") - tf.cd() - for key in tf.GetListOfKeys(): - if 'eventsProcessed' == key.GetName(): - nevents_meta += tf.eventsProcessed.GetVal() - break - eventsProcessedDict[process] = nevents_meta - - processDict={} fraction = 1 - output = process + output = process_name chunks = 1 + testfile_path = None try: - processDict = processList[process] - if getElementDict(processList[process], 'fraction') != None: fraction = getElementDict(processList[process], 'fraction') - if getElementDict(processList[process], 'output') != None: output = getElementDict(processList[process], 'output') - if getElementDict(processList[process], 'chunks') != None: chunks = getElementDict(processList[process], 'chunks') + if getElementDict(process_dict, 'fraction') is not None: + fraction = getElementDict(process_dict, 'fraction') + if getElementDict(process_dict, 'output') is not None: + output = getElementDict(process_dict, 'output') + if getElementDict(process_dict, 'chunks') is not None: + chunks = getElementDict(process_dict, 'chunks') except TypeError: LOGGER.warning('No values set for process %s will use default ' - 'values!', process) - if fraction < 1:fileList = getsubfileList(fileList, eventList, fraction) + 'values!', process_name) + if fraction < 1: + fileList = getsubfileList(fileList, eventList, fraction) # get the number of events processed, in a potential previous step fileListRoot = ROOT.vector('string')() nevents_meta = 0 # amount of events processed in previous stage (= 0 if it is the first stage) + print(fileList) for fileName in fileList: - if not (args.use_source or args.use_legacy_source): + print(fileName) + if not (args.use_data_source or args.use_legacy_source): fileName = apply_filepath_rewrites(fileName) fileListRoot.push_back(fileName) - if getElement(rdfModule,"prodTag") == None: # skip check for processed events in case of first stage + if getElement(rdf_module,"prodTag") == None: # skip check for processed events in case of first stage tf=ROOT.TFile.Open(str(fileName),"READ") tf.cd() for key in tf.GetListOfKeys(): @@ -1017,8 +1009,8 @@ def runHistmaker(args, rdfModule, analysisFile): break if args.test: break - eventsProcessedDict[process] = nevents_meta - info_msg = f'Add process "{process}" with:' + eventsProcessedDict[process_name] = nevents_meta + info_msg = f'Add process "{process_name}" with:' info_msg += f'\n\tfraction = {fraction}' info_msg += f'\n\tnFiles = {len(fileListRoot):,}' info_msg += f'\n\toutput = {output}\n\tchunks = {chunks}' @@ -1052,7 +1044,7 @@ def runHistmaker(args, rdfModule, analysisFile): else: dframe = ROOT.ROOT.RDataFrame("events", fileListRoot) evtcount = dframe.Count() - res, hweight = graph_function(dframe, process) + res, hweight = graph_function(dframe, process_name) results.append(res) hweights.append(hweight) evtcounts.append(evtcount) @@ -1140,9 +1132,9 @@ def runHistmaker(args, rdfModule, analysisFile): #__________________________________________________________ -def runPlots(analysisFile): +def runPlots(anascript_path): import doPlots as dp - dp.run(analysisFile) + dp.run(anascript_path) #__________________________________________________________ def runValidate(jobdir): @@ -1193,11 +1185,14 @@ def run(mainparser, subparser=None): if subparser: setup_run_parser(subparser) args, _ = mainparser.parse_known_args() - #check that the analysis file exists - analysisFile = args.anafile_path - if not os.path.isfile(analysisFile): + + # Work with absolute path of the analysis script + anascript_path = os.path.abspath(args.anafile_path) + + # Check that the analysis script exists + if not os.path.isfile(anascript_path): LOGGER.error('Script %s does not exist!\nSpecify a valid analysis ' - 'script in the command line arguments', analysisFile) + 'script in the command line arguments', anascript_path) sys.exit(3) LOGGER.info('Loading analyzers from libFCCAnalyses...') @@ -1236,31 +1231,36 @@ def run(mainparser, subparser=None): ROOT.Experimental.ELogLevel.kDebug+10) # Load the analysis - analysisFile = os.path.abspath(analysisFile) - LOGGER.info('Loading analysis file:\n%s', analysisFile) - rdfSpec = importlib.util.spec_from_file_location("rdfanalysis", analysisFile) - rdfModule = importlib.util.module_from_spec(rdfSpec) - rdfSpec.loader.exec_module(rdfModule) + LOGGER.info('Loading analysis script:\n%s', anascript_path) + try: + rdf_spec = importlib.util.spec_from_file_location('rdfanalysis', + anascript_path) + rdf_module = importlib.util.module_from_spec(rdf_spec) + rdf_spec.loader.exec_module(rdf_module) + except SyntaxError as err: + LOGGER.error('Syntax error encountered in the analysis script:\n%s', + err) + sys.exit(3) if hasattr(args, 'command'): if args.command == "run": - if hasattr(rdfModule, "build_graph") and hasattr(rdfModule, "RDFanalysis"): + if hasattr(rdf_module, "build_graph") and hasattr(rdf_module, "RDFanalysis"): LOGGER.error('Analysis file ambiguous!\nBoth "RDFanalysis" ' 'class and "build_graph" function are defined.') sys.exit(3) - elif hasattr(rdfModule, "build_graph") and not hasattr(rdfModule, "RDFanalysis"): - runHistmaker(args, rdfModule, analysisFile) - elif not hasattr(rdfModule, "build_graph") and hasattr(rdfModule, "RDFanalysis"): - runStages(args, rdfModule, args.preprocess, analysisFile) + elif hasattr(rdf_module, "build_graph") and not hasattr(rdf_module, "RDFanalysis"): + runHistmaker(args, rdf_module, anascript_path) + elif not hasattr(rdf_module, "build_graph") and hasattr(rdf_module, "RDFanalysis"): + runStages(args, rdf_module, args.preprocess, anascript_path) else: LOGGER.error('Analysis file does not contain required ' 'objects!\nProvide either "RDFanalysis" class or ' '"build_graph" function.') sys.exit(3) elif args.command == "final": - runFinal(rdfModule) + runFinal(rdf_module) elif args.command == "plots": - runPlots(analysisFile) + runPlots(anascript_path) return LOGGER.warning('Running the old way...\nThis way of running the analysis ' @@ -1278,7 +1278,7 @@ def run(mainparser, subparser=None): if args.preprocess: LOGGER.error('Can not have --preprocess with --final, exit') sys.exit(3) - runFinal(rdfModule) + runFinal(rdf_module) elif args.plots: if args.final: @@ -1287,7 +1287,7 @@ def run(mainparser, subparser=None): if args.preprocess: LOGGER.error('Can not have --preprocess with --plots, exit') sys.exit(3) - runPlots(analysisFile) + runPlots(anascript_path) elif args.validate: runValidate(args.jobdir) @@ -1300,7 +1300,7 @@ def run(mainparser, subparser=None): if args.final: LOGGER.error('Can not have --final with --preprocess, exit') sys.exit(3) - runStages(args, rdfModule, args.preprocess, analysisFile) + runStages(args, rdf_module, args.preprocess, anascript_path) # __________________________________________________________ diff --git a/python/process.py b/python/process.py index 6b5d144e8dc..b33f11b69a7 100644 --- a/python/process.py +++ b/python/process.py @@ -12,14 +12,25 @@ ROOT.gROOT.SetBatch(True) + LOGGER = logging.getLogger('FCCAnalyses.process_info') -def getEntries(f): - tf=ROOT.TFile.Open(f,"READ") - tf.cd() - tt=tf.Get("events") - nevents=tt.GetEntries() - tf.Close() + +def get_entries(infile_path): + ''' + Retrieves number of entries in the "events" TTree from the provided ROOT + file. + ''' + infile = ROOT.TFile.Open(infile_path, 'READ') + nevents = 0 + try: + nevents = infile.Get("events").GetEntries() + except AttributeError: + LOGGER.error('Input file is missing "events" TTree!\nAborting...') + sys.exit(3) + finally: + infile.Close() + return nevents @@ -69,13 +80,13 @@ def getProcessInfoFiles(process, inputDir): if os.path.isfile(filetest): filelist.append(filetest) - eventlist.append(getEntries(filetest)) + eventlist.append(get_entries(filetest)) if os.path.isdir(dirtest): flist=glob.glob(dirtest+"/*.root") for f in flist: filelist.append(f) - eventlist.append(getEntries(f)) + eventlist.append(get_entries(f)) return filelist, eventlist