diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 558e613..ca43f18 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -59,6 +59,7 @@ jobs: - name: Set LEGEND_METADATA variable run: | echo "LEGEND_METADATA=$GITHUB_WORKSPACE/inputs" >> $GITHUB_ENV + - name: Clone legend-metadata uses: actions/checkout@v4 with: @@ -68,6 +69,10 @@ jobs: token: ${{ secrets.CLONE_LEGEND_METADATA }} path: ${{ env.LEGEND_METADATA }} + - name: Recursively update legend-metadata submodules + run: | + cd "$LEGEND_METADATA" && git submodule update --recursive --remote + - name: Run data production tests run: ./tests/runprod/run-all.sh diff --git a/dataflow-config.yaml b/dataflow-config.yaml index 97c9aa8..4a494ea 100644 --- a/dataflow-config.yaml +++ b/dataflow-config.yaml @@ -1,4 +1,5 @@ -legend_metadata_version: main +# legend_metadata_version: main +allow_none_par: false paths: sandbox_path: $_/sandbox @@ -74,8 +75,8 @@ execenv: arg: /data2/public/prodenv/containers/legendexp_legend-base_latest_20241110203225.sif env: PRODENV: $PRODENV + NUMBA_CACHE_DIR: $_/.snakemake/numba-cache LGDO_BOUNDSCHECK: "false" - # LGDO_CACHE: "false" DSPEED_BOUNDSCHECK: "false" PYGAMA_PARALLEL: "false" PYGAMA_FASTMATH: "false" @@ -86,6 +87,7 @@ execenv: arg: --image legendexp/legend-base:latest env: PRODENV: $PRODENV + NUMBA_CACHE_DIR: $_/.snakemake/numba-cache HDF5_USE_FILE_LOCKING: "false" LGDO_BOUNDSCHECK: "false" DSPEED_BOUNDSCHECK: "false" diff --git a/pyproject.toml b/pyproject.toml index 1a282ca..ce5aa76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ dynamic = ["version"] dependencies = [ "colorlog", "dbetto>=1.2", - "pygama>=2", + "pygama>=2.0.5", "dspeed>=1.6", "pylegendmeta>=1.2", "legend-pydataobj>=1.11.6", @@ -116,6 +116,7 @@ par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_ par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal" par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck" par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser" +par-spms-dsp-trg-thr = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr" [tool.uv.workspace] exclude = ["generated", "inputs", "software", "workflow"] diff --git a/tests/runprod/test-argon-char-dataprod.sh b/tests/runprod/test-argon-char-dataprod.sh new file mode 100755 index 0000000..7e91d43 --- /dev/null +++ b/tests/runprod/test-argon-char-dataprod.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# IMPORTANT: this script must be executed from the legend-dataflow directory + +# shellcheck disable=SC1091 +source "$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)/conftest.sh" + +rawdir="$(get_dataflow_config_value paths.tier_raw)" +mkdir -p "${rawdir}" || exit 1 + +function mkdir_n_touch() { + mkdir -p "$(dirname "${1}")" || return 1 + touch "${1}" || return 1 +} + +rawfiles=( + anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5 + anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5 + acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5 +) + +( + cd "${rawdir}" || exit 1 + for file in "${rawfiles[@]}"; do + mkdir_n_touch "$file" + done +) + +_smk_opts=( + --touch + --config allow_none_par=true + --workflow-profile workflow/profiles/default +) + +run_test_command snakemake "${_smk_opts[@]}" "all-p13-*-evt.gen" || exit 1 diff --git a/tests/runprod/test-evt.sh b/tests/runprod/test-evt.sh index e67b6ec..9521a9d 100755 --- a/tests/runprod/test-evt.sh +++ b/tests/runprod/test-evt.sh @@ -24,9 +24,6 @@ rawfiles=( cal/p03/r001/l200-p03-r001-cal-20230317T211819Z-tier_raw.lh5 cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5 cal/p03/r002/l200-p03-r002-cal-20230324T161401Z-tier_raw.lh5 - anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5 - anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5 - acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5 ) ( @@ -47,4 +44,5 @@ _smk_opts=( --workflow-profile workflow/profiles/default ) -run_test_command snakemake "${_smk_opts[@]}" "all-*-evt.gen" || exit 1 +run_test_command snakemake "${_smk_opts[@]}" "all-p03-*-evt.gen" || exit 1 +run_test_command snakemake "${_smk_opts[@]}" "all-p04-*-evt.gen" || exit 1 diff --git a/workflow/Snakefile b/workflow/Snakefile index 2e432cc..a8fedad 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -35,10 +35,10 @@ basedir = workflow.basedir time = datetime.now().strftime("%Y%m%dT%H%M%SZ") -if not Path(meta).exists(): - LegendMetadata(meta).checkout(config.legend_metadata_version) - -metadata = LegendMetadata(meta, lazy=True) +# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist +metadata = LegendMetadata(meta) +if "legend_metadata_version" in config: + metadata.checkout(config.legend_metadata_version) part = CalGrouping(config, Path(det_status) / "cal_groupings.yaml") @@ -51,12 +51,14 @@ wildcard_constraints: timestamp=r"\d{8}T\d{6}Z", +include: "rules/channel_merge.smk" include: "rules/filelist_gen.smk" include: "rules/chanlist_gen.smk" include: "rules/common.smk" include: "rules/main.smk" include: "rules/tcm.smk" include: "rules/dsp_pars_geds.smk" +include: "rules/dsp_pars_spms.smk" include: "rules/dsp.smk" include: "rules/psp_pars_geds.smk" include: "rules/psp.smk" @@ -79,12 +81,16 @@ localrules: onstart: print("INFO: starting workflow") + # Make sure some packages are initialized before we begin to avoid race conditions + # https://numba.readthedocs.io/en/stable/developer/caching.html#cache-sharing if not workflow.touch: - for pkg in ["dspeed", "lgdo", "matplotlib"]: - shell(execenv.execenv_pyexe(config, "python") + "-c 'import " + pkg + "'") + shell( + execenv.execenv_pyexe(config, "python") + + "-c 'import dspeed, lgdo, matplotlib, pygama'" + ) - # Log parameter catalogs in validity files + # Log parameter catalogs in validity files hit_par_cat_file = Path(utils.pars_path(config)) / "hit" / "validity.yaml" if hit_par_cat_file.is_file(): hit_par_cat_file.unlink() diff --git a/workflow/Snakefile-build-raw b/workflow/Snakefile-build-raw index eef2752..f94b65e 100644 --- a/workflow/Snakefile-build-raw +++ b/workflow/Snakefile-build-raw @@ -12,6 +12,7 @@ from legenddataflow import patterns as patt from legenddataflow import utils, execenv, ParsKeyResolve from datetime import datetime from dbetto import AttrsDict +from legendmeta import LegendMetadata utils.subst_vars_in_snakemake_config(workflow, config) config = AttrsDict(config) @@ -26,8 +27,10 @@ meta = utils.metadata_path(config) time = datetime.now().strftime("%Y%m%dT%H%M%SZ") -if not Path(meta_path).exists(): - LegendMetadata(meta_path).checkout(config.legend_metadata_version) +# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist +metadata = LegendMetadata(meta_path, lazy=True) +if "legend_metadata_version" in config: + metadata.checkout(config.legend_metadata_version) wildcard_constraints: diff --git a/workflow/rules/chanlist_gen.smk b/workflow/rules/chanlist_gen.smk index 68b4268..0292dd4 100644 --- a/workflow/rules/chanlist_gen.smk +++ b/workflow/rules/chanlist_gen.smk @@ -13,20 +13,23 @@ from legenddataflow import execenv_pyexe from legenddataflow.utils import filelist_path -def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps): +# FIXME: the system argument should always be explicitly supplied +def get_chanlist( + setup, keypart, workflow, config, det_status, chan_maps, system="geds" +): key = ChannelProcKey.parse_keypart(keypart) flist_path = filelist_path(setup) os.makedirs(flist_path, exist_ok=True) output_file = os.path.join( flist_path, - f"all-{key.experiment}-{key.period}-{key.run}-cal-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}", + f"all-{key.experiment}-{key.period}-{key.run}-{key.datatype}-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}", ) os.system( execenv_pyexe(config, "create-chankeylist") + f"--det-status {det_status} --channelmap {chan_maps} --timestamp {key.timestamp} " - f"--datatype cal --output-file {output_file}" + f"--datatype {key.datatype} --output-file {output_file} --system {system}" ) with open(output_file) as r: @@ -36,12 +39,25 @@ def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps): def get_par_chanlist( - setup, keypart, tier, basedir, det_status, chan_maps, name=None, extension="yaml" + setup, + keypart, + tier, + basedir, + det_status, + chan_maps, + datatype="cal", + system="geds", + name=None, + extension="yaml", ): - chan_list = get_chanlist(setup, keypart, workflow, config, det_status, chan_maps) + chan_list = get_chanlist( + setup, keypart, workflow, config, det_status, chan_maps, system + ) - par_pattern = get_pattern_pars_tmp_channel(setup, tier, name, extension) + par_pattern = get_pattern_pars_tmp_channel( + setup, tier, name, datatype=datatype, extension=extension + ) filenames = ChannelProcKey.get_channel_files(keypart, par_pattern, chan_list) diff --git a/workflow/rules/channel_merge.smk b/workflow/rules/channel_merge.smk index 92e07de..b0506c5 100644 --- a/workflow/rules/channel_merge.smk +++ b/workflow/rules/channel_merge.smk @@ -1,15 +1,10 @@ -from legenddataflow.patterns import ( - get_pattern_pars_tmp_channel, - get_pattern_plts_tmp_channel, - get_pattern_plts, - get_pattern_tier, - get_pattern_pars_tmp, - get_pattern_pars, -) -from legenddataflow.utils import set_last_rule_name import inspect + +from legenddataflow import patterns +from legenddataflow.utils import set_last_rule_name from legenddataflow.execenv import execenv_pyexe + def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): if lh5_tier is None: lh5_tier = tier @@ -24,7 +19,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): chan_maps, ), output: - get_pattern_plts(config, tier), + patterns.get_pattern_plts(config, tier), group: f"merge-{tier}" shell: @@ -47,7 +42,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): extension="pkl", ), output: - get_pattern_pars( + patterns.get_pattern_pars( config, tier, name="objects", @@ -76,7 +71,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): ), output: temp( - get_pattern_pars_tmp( + patterns.get_pattern_pars_tmp( config, tier, datatype="cal", @@ -91,6 +86,35 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): set_last_rule_name(workflow, f"build_pars_{tier}_db") + rule: + """Merge pars for SiPM channels in a single pars file.""" + input: + lambda wildcards: get_par_chanlist( + config, + f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels", + tier, + basedir, + det_status, + chan_maps, + datatype=wildcards.datatype, + system="spms" + ), + output: + patterns.get_pattern_pars( + config, + tier, + name="spms", + datatype="{datatype}", + ), + group: + f"merge-{tier}" + shell: + execenv_pyexe(config, "merge-channels") + \ + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_pars_spms_{tier}_db") + rule: input: in_files=lambda wildcards: get_par_chanlist( @@ -102,13 +126,13 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): chan_maps, extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default, ), - in_db=get_pattern_pars_tmp( + in_db=patterns.get_pattern_pars_tmp( config, tier, datatype="cal", ) if lh5_merge is True else [], - plts=get_pattern_plts(config, tier), - objects=get_pattern_pars( + plts=patterns.get_pattern_plts(config, tier), + objects=patterns.get_pattern_pars( config, tier, name="objects", @@ -116,13 +140,13 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None): check_in_cycle=check_in_cycle, ), output: - out_file=get_pattern_pars( + out_file=patterns.get_pattern_pars( config, tier, - extension="lh5" if lh5_merge is True else inspect.signature(get_pattern_pars).parameters['extension'].default, + extension="lh5" if lh5_merge is True else inspect.signature(patterns.get_pattern_pars).parameters['extension'].default, check_in_cycle=check_in_cycle, ), - out_db=get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [], + out_db=patterns.get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [], group: f"merge-{tier}" run: diff --git a/workflow/rules/dsp.smk b/workflow/rules/dsp.smk index 8841e99..787aec5 100644 --- a/workflow/rules/dsp.smk +++ b/workflow/rules/dsp.smk @@ -7,15 +7,11 @@ Snakemake rules for processing dsp tier. from legenddataflow.pars_loading import ParsCatalog from legenddataflow.create_pars_keylist import ParsKeyResolve from pathlib import Path -from legenddataflow.patterns import ( - get_pattern_plts, - get_pattern_tier, - get_pattern_pars_tmp, - get_pattern_log, - get_pattern_pars, -) +from legenddataflow import patterns as patt from legenddataflow.execenv import execenv_pyexe +build_merge_rules("dsp", lh5_merge=True) + dsp_par_catalog = ParsKeyResolve.get_par_catalog( ["-*-*-*-cal"], get_pattern_tier(config, "raw", check_in_cycle=False), @@ -23,29 +19,33 @@ dsp_par_catalog = ParsKeyResolve.get_par_catalog( ) -include: "channel_merge.smk" +def _make_input_pars_file(wildcards): + """Prepare the input pars files for the `build_dsp` rule.""" + # first get the files from the catalog + filelist = dsp_par_catalog.get_par_file(config, wildcards.timestamp, "dsp") + # then add the spms par files + if wildcards.datatype not in ("cal", "xtc"): + filelist += [ + patt.get_pattern_pars(config, "dsp", name="spms", datatype="{datatype}") + ] -build_merge_rules("dsp", lh5_merge=True) + return filelist rule build_dsp: input: - raw_file=get_pattern_tier(config, "raw", check_in_cycle=False), - pars_file=ancient( - lambda wildcards: dsp_par_catalog.get_par_file( - config, wildcards.timestamp, "dsp" - ) - ), + raw_file=patt.get_pattern_tier(config, "raw", check_in_cycle=False), + pars_files=ancient(lambda wildcards: _make_input_pars_file(wildcards)), params: timestamp="{timestamp}", datatype="{datatype}", ro_input=lambda _, input: {k: ro(v) for k, v in input.items()}, output: - tier_file=get_pattern_tier(config, "dsp", check_in_cycle=check_in_cycle), - db_file=get_pattern_pars_tmp(config, "dsp_db"), + tier_file=patt.get_pattern_tier(config, "dsp", check_in_cycle=check_in_cycle), + db_file=patt.get_pattern_pars_tmp(config, "dsp_db"), log: - get_pattern_log(config, "tier_dsp", time), + patt.get_pattern_log(config, "tier_dsp", time), group: "tier-dsp" resources: @@ -61,4 +61,4 @@ rule build_dsp: "--input {params.ro_input[raw_file]} " "--output {output.tier_file} " "--db-file {output.db_file} " - "--pars-file {params.ro_input[pars_file]} " + "--pars-file {params.ro_input[pars_files]}" diff --git a/workflow/rules/dsp_pars_spms.smk b/workflow/rules/dsp_pars_spms.smk new file mode 100644 index 0000000..44145de --- /dev/null +++ b/workflow/rules/dsp_pars_spms.smk @@ -0,0 +1,42 @@ +"""DSP parameter generation for SiPM data""" + +from pathlib import Path + +from legenddataflow import patterns as patt +from legenddataflow import utils, execenv_pyexe + + +rule build_pars_dsp_tau_spms: + input: + raw_file=get_pattern_tier(config, "raw", check_in_cycle=False), + pardb=lambda wildcards: get_input_par_file(config, wildcards, "dsp", "par_dsp"), + params: + timestamp="{timestamp}", + datatype="{datatype}", + channel="{channel}", + raw_table_name=lambda wildcards: get_table_name( + metadata, + config, + wildcards.datatype, + wildcards.timestamp, + wildcards.channel, + "raw", + ), + wildcard_constraints: + datatype=r"\b(?!cal\b|xtc\b)\w+\b", + output: + temp(patt.get_pattern_pars_tmp_channel(config, "dsp", datatype="{datatype}")), + log: + patt.get_pattern_log_channel(config, "pars_spms", time, datatype="{datatype}"), + group: + "par-dsp" + shell: + execenv_pyexe(config, "par-spms-dsp-trg-thr") + "--config-path {configs} " + "--raw-file {input.raw_file} " + "--dsp-db {input.pardb} " + "--datatype {params.datatype} " + "--timestamp {params.timestamp} " + "--sipm-name {params.channel} " + "--raw-table-name {params.raw_table_name} " + "--output-file {output} " + "--logfile {log} " diff --git a/workflow/rules/hit.smk b/workflow/rules/hit.smk index 83bd231..892d927 100644 --- a/workflow/rules/hit.smk +++ b/workflow/rules/hit.smk @@ -22,10 +22,6 @@ hit_par_catalog = ParsKeyResolve.get_par_catalog( {"cal": ["par_hit"], "lar": ["par_hit"]}, ) - -include: "channel_merge.smk" - - build_merge_rules("hit", lh5_merge=False) @@ -52,11 +48,11 @@ rule build_hit: shell: execenv_pyexe(config, "build-tier-hit") + f"--configs {ro(configs)} " "--metadata {meta} " - "--log {log} " "--tier {params.tier} " "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--pars-file {params.ro_input[pars_file]} " "--output {output.tier_file} " "--input {params.ro_input[dsp_file]} " - "--db-file {output.db_file}" + "--db-file {output.db_file} " + "--log {log}" diff --git a/workflow/rules/pht.smk b/workflow/rules/pht.smk index bc99682..87aedd4 100644 --- a/workflow/rules/pht.smk +++ b/workflow/rules/pht.smk @@ -25,10 +25,6 @@ pht_par_catalog = ParsKeyResolve.get_par_catalog( intier = "psp" - -include: "channel_merge.smk" - - build_merge_rules("pht", lh5_merge=False) diff --git a/workflow/rules/psp.smk b/workflow/rules/psp.smk index 3a3a131..400a6e9 100644 --- a/workflow/rules/psp.smk +++ b/workflow/rules/psp.smk @@ -22,10 +22,6 @@ psp_par_catalog = ParsKeyResolve.get_par_catalog( {"cal": ["par_psp"], "lar": ["par_psp"]}, ) - -include: "channel_merge.smk" - - build_merge_rules("psp", lh5_merge=True, lh5_tier="dsp") diff --git a/workflow/src/legenddataflow/cfgtools.py b/workflow/src/legenddataflow/cfgtools.py new file mode 100644 index 0000000..2cc3d13 --- /dev/null +++ b/workflow/src/legenddataflow/cfgtools.py @@ -0,0 +1,12 @@ +from typing import Mapping + + +def get_channel_config( + mapping: Mapping, channel: str, default_key: str = "__default__" +): + """Get channel key from mapping with default. + + Returns the value at key `channel`, if existing, otherwise return value at + `default_key`. + """ + return mapping.get(channel, mapping[default_key]) diff --git a/workflow/src/legenddataflow/execenv.py b/workflow/src/legenddataflow/execenv.py index 3cfc11c..b99d917 100644 --- a/workflow/src/legenddataflow/execenv.py +++ b/workflow/src/legenddataflow/execenv.py @@ -234,63 +234,45 @@ def _runcmd(cmd_expr, cmd_env, **kwargs): python = sys.executable if cmd_prefix == [] else "python" python_venv, _ = execenv_pyexe(config_dict, "python", as_string=False) - has_uv = False - try: - # is uv already available? - _runcmd( - [*cmd_prefix, "uv", "--version"], - cmd_env, - capture_output=True, - ) - has_uv = True - # we'll use the existing uv - uv_expr = [*cmd_prefix, "uv"] - except (subprocess.CalledProcessError, FileNotFoundError): - # we'll use uv from the virtualenv (installed below) - uv_expr = [*python_venv, "-m", "uv", "--quiet"] - - # configure venv - if has_uv: - # if uv is available, just use it to create the venv - cmd_expr = [*cmd_prefix, "uv", "--quiet", "venv", path_install] - else: - # otherwise use python-venv - cmd_expr = [*cmd_prefix, python, "-m", "venv", path_install] + # we'll use uv from the virtualenv (installed below) + uv_expr = [*python_venv, "-m", "uv", "--quiet"] + + # otherwise use python-venv + cmd_expr = [*cmd_prefix, python, "-m", "venv", path_install] log.info(f"configuring virtual environment in {path_install}") _runcmd(cmd_expr, cmd_env) - if not has_uv: - cmd_expr = [ - *python_venv, - "-m", - "pip", - "--quiet", - "--no-cache-dir", - "install", - "--upgrade", - "--", - "pip", - ] - - log.info("upgrading pip") - _runcmd(cmd_expr, cmd_env) - - # install uv - cmd_expr = [ - *python_venv, - "-m", - "pip", - "--quiet", - "--no-cache-dir", - "install", - "--no-warn-script-location", - "--", - "uv", - ] - - log.info("installing uv") - _runcmd(cmd_expr, cmd_env) + cmd_expr = [ + *python_venv, + "-m", + "pip", + "--quiet", + "--no-cache-dir", + "install", + "--upgrade", + "--", + "pip", + ] + + log.info("upgrading pip") + _runcmd(cmd_expr, cmd_env) + + # install uv + cmd_expr = [ + *python_venv, + "-m", + "pip", + "--quiet", + "--no-cache-dir", + "install", + "--no-warn-script-location", + "--", + "uv", + ] + + log.info("installing uv") + _runcmd(cmd_expr, cmd_env) # and finally install legenddataflow with all dependencies # this must be done within the execenv, since jobs will be run within it @@ -298,11 +280,10 @@ def _runcmd(cmd_expr, cmd_env, **kwargs): cmd_expr = [ *uv_expr, "pip", - "--no-cache", + # "--no-cache", "install", "--prefix", path_install, - "--", str(config_loc), ] if args.editable: diff --git a/workflow/src/legenddataflow/patterns.py b/workflow/src/legenddataflow/patterns.py index a74fc28..3a8a9b1 100644 --- a/workflow/src/legenddataflow/patterns.py +++ b/workflow/src/legenddataflow/patterns.py @@ -121,27 +121,35 @@ def get_pattern_tier(setup, tier, check_in_cycle=True): return file_pattern -def get_pattern_pars(setup, tier, name=None, extension="yaml", check_in_cycle=True): +def get_pattern_pars( + setup, tier, name=None, datatype="cal", extension="yaml", check_in_cycle=True +): + if datatype is None: + datatype = "{datatype}" if tier in ["raw", "tcm", "dsp", "hit", "ann", "evt", "psp", "pht", "pan", "pet"]: if name is not None: return ( Path(get_pars_path(setup, tier)) - / "cal" + / datatype / "{period}" / "{run}" / ( - "{experiment}-{period}-{run}-cal-{timestamp}-par_" + "{experiment}-{period}-{run}-" + + datatype + + "-{timestamp}-par_" + f"{tier}_{name}.{extension}" ) ) else: file_pattern = ( Path(get_pars_path(setup, tier)) - / "cal" + / datatype / "{period}" / "{run}" / ( - "{experiment}-{period}-{run}-cal-{timestamp}-par_" + "{experiment}-{period}-{run}-" + + datatype + + "-{timestamp}-par_" + f"{tier}.{extension}" ) ) @@ -154,12 +162,14 @@ def get_pattern_pars(setup, tier, name=None, extension="yaml", check_in_cycle=Tr ): if name is None: return ( - "/tmp/{experiment}-{period}-{run}-cal-{timestamp}-" + "/tmp/{experiment}-{period}-{run}-" + + datatype + + "-{timestamp}-" + f"par_{tier}.{extension}" ) else: return ( - "/tmp/{experiment}-{period}-{run}-cal-{timestamp}-" + "/tmp/{experiment}-{period}-{run}-" + datatype + "-{timestamp}-" f"par_{tier}_{name}.{extension}" ) else: @@ -237,15 +247,23 @@ def get_pattern_pars_tmp(setup, tier, name=None, datatype=None, extension="yaml" ) -def get_pattern_pars_tmp_channel(setup, tier, name=None, extension="yaml"): +def get_pattern_pars_tmp_channel( + setup, tier, name=None, datatype="cal", extension="yaml" +): + if datatype is None: + datatype = "{datatype}" if name is None: return Path(f"{tmp_par_path(setup)}") / ( - "{experiment}-{period}-{run}-cal-{timestamp}-{channel}-par_" + "{experiment}-{period}-{run}-" + + datatype + + "-{timestamp}-{channel}-par_" + f"{tier}.{extension}" ) else: return Path(f"{tmp_par_path(setup)}") / ( - "{experiment}-{period}-{run}-cal-{timestamp}-{channel}-par_" + "{experiment}-{period}-{run}-" + + datatype + + "-{timestamp}-{channel}-par_" + f"{tier}_{name}.{extension}" ) @@ -303,13 +321,15 @@ def get_pattern_log(setup, processing_step, time): ) -def get_pattern_log_channel(setup, processing_step, time): +def get_pattern_log_channel(setup, processing_step, time, datatype="cal"): return ( Path(f"{tmp_log_path(setup)}") / time / processing_step / ( - "{experiment}-{period}-{run}-cal-{timestamp}-{channel}-" + "{experiment}-{period}-{run}-" + + datatype + + "-{timestamp}-{channel}-" + processing_step + ".log" ) diff --git a/workflow/src/legenddataflow/scripts/create_chankeylist.py b/workflow/src/legenddataflow/scripts/create_chankeylist.py index 62f5c1e..113f125 100644 --- a/workflow/src/legenddataflow/scripts/create_chankeylist.py +++ b/workflow/src/legenddataflow/scripts/create_chankeylist.py @@ -2,30 +2,38 @@ from pathlib import Path from dbetto import TextDB -from legendmeta import LegendMetadata def create_chankeylist() -> None: argparser = argparse.ArgumentParser() - argparser.add_argument("--det-status", help="det_status", type=str, required=True) - argparser.add_argument("--datatype", help="Datatype", type=str, required=True) - argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) - argparser.add_argument("--channelmap", help="Channel Map", type=str, required=True) - argparser.add_argument("--output-file", help="output_file", type=str, required=True) + argparser.add_argument("--det-status", help="det_status", required=True) + argparser.add_argument("--datatype", help="Datatype", required=True) + argparser.add_argument("--timestamp", help="Timestamp", required=True) + argparser.add_argument("--channelmap", help="Channel Map", required=True) + argparser.add_argument("--system", help="geds, spms, pmts, ...", required=True) + argparser.add_argument("--output-file", required=True) + args = argparser.parse_args() - det_status = TextDB(args.det_status, lazy=True) - status_map = det_status.statuses.on(args.timestamp, system=args.datatype) + status_map = TextDB(args.det_status, lazy=True).statuses.on( + args.timestamp, system=args.datatype + ) + chmap = TextDB(args.channelmap, lazy=True).channelmaps.on(args.timestamp) + + # only restrict to a certain system (geds, spms, ...) + channels = [] + for channel, status in status_map.items(): + # start with channels marked as processable in the status map + if status.processable is False: + continue - channel_map = LegendMetadata(args.channelmap, lazy=True) - chmap = channel_map.channelmaps.on(args.timestamp) + if channel not in chmap: + msg = f"{channel} is marked as processable but is not found in the channel map (on {args.timestamp})" + raise RuntimeError(msg) - channels = [ - chan - for chan in status_map - if status_map[chan]["processable"] is True and chmap[chan].system == "geds" - ] + if chmap[channel].system == args.system: + channels.append(channel) if len(channels) == 0: print("WARNING: No channels found") # noqa: T201 diff --git a/workflow/src/legenddataflow/scripts/par/spms/dsp/trigger_threshold.py b/workflow/src/legenddataflow/scripts/par/spms/dsp/trigger_threshold.py new file mode 100644 index 0000000..cd6a33e --- /dev/null +++ b/workflow/src/legenddataflow/scripts/par/spms/dsp/trigger_threshold.py @@ -0,0 +1,90 @@ +import argparse +from pathlib import Path + +import hist +import numpy as np +from dbetto import AttrsDict, Props, TextDB, utils +from dspeed import build_processing_chain +from lgdo import lh5 + +from ..... import cfgtools +from .....log import build_log + + +def par_spms_dsp_trg_thr() -> None: + # CLI interface + argparser = argparse.ArgumentParser() + argparser.add_argument("--raw-file", required=True) + argparser.add_argument("--raw-table-name", required=True) + argparser.add_argument("--output-file", required=True) + argparser.add_argument("--config-path", required=True) + argparser.add_argument("--datatype", required=True) + argparser.add_argument("--timestamp", required=True) + argparser.add_argument("--sipm-name", required=True) + argparser.add_argument("--dsp-db", nargs="*", default=[]) + argparser.add_argument("--logfile") + args = argparser.parse_args() + + # dataflow configs + df_configs = ( + TextDB(args.config_path, lazy=True) + .on(args.timestamp, system=args.datatype) + .snakemake_rules.pars_spms_dsp_trg_thr + ) + + # setup logging + log = build_log(df_configs, args.logfile) + + log.debug("reading in the configuration files") + config = df_configs.inputs + dsp_config = utils.load_dict( + cfgtools.get_channel_config(config.processing_chain, args.sipm_name) + ) + settings = AttrsDict( + utils.load_dict(cfgtools.get_channel_config(config.settings, args.sipm_name)) + ) + + # read raw file list + log.debug("reading in the raw waveforms") + data = lh5.read( + args.raw_table_name, + args.raw_file, + field_mask=["waveform_bit_drop"], + n_rows=settings.n_events, + ) + + # get DSP database from overrides + _db_dict = Props.read_from(args.dsp_db).get(args.sipm_name, {}) + + # run the DSP with the provided configuration + log.debug("running the DSP chain") + chain, _, dsp_output = build_processing_chain(data, dsp_config, db_dict=_db_dict) + chain.execute() + + log.debug("analyzing DSP outputs") + # get output of the current processor + wf_current = dsp_output.wf_current.values.view_as("np").flatten() + # determine a cutoff for the histogram used to extract the FWHM + low_cutoff, high_cutoff = np.quantile(wf_current, [0.005, 0.995]) + + # make histogram of the curr values + h = ( + hist.new.Regular(settings.n_baseline_bins, low_cutoff, high_cutoff) + .Double() + .fill(wf_current) + ) + + # determine FWHM + counts = h.view() + idx_over_half = np.where(counts >= np.max(counts) / 2)[0] + + edges = h.axes[0].edges + fwhm = edges[idx_over_half[-1]] - edges[idx_over_half[0]] + + if fwhm <= 0: + msg = "determined FWHM of baseline derivative distribution is zero or negative" + raise RuntimeError(msg) + + log.debug(f"writing out baseline_curr_fwhm = {fwhm}") + Path(args.output_file).parent.mkdir(parents=True, exist_ok=True) + Props.write_to(args.output_file, {"baseline_curr_fwhm": float(fwhm)}) diff --git a/workflow/src/legenddataflow/scripts/tier/dsp.py b/workflow/src/legenddataflow/scripts/tier/dsp.py index 60f35c7..c20262b 100644 --- a/workflow/src/legenddataflow/scripts/tier/dsp.py +++ b/workflow/src/legenddataflow/scripts/tier/dsp.py @@ -1,7 +1,6 @@ import argparse import re import time -import warnings from pathlib import Path import numpy as np @@ -13,13 +12,11 @@ from ...log import build_log -warnings.filterwarnings(action="ignore", category=RuntimeWarning) - -def replace_list_with_array(dic): +def _replace_list_with_array(dic): for key, value in dic.items(): if isinstance(value, dict): - dic[key] = replace_list_with_array(value) + dic[key] = _replace_list_with_array(value) elif isinstance(value, list): dic[key] = np.array(value, dtype="float32") else: @@ -28,62 +25,77 @@ def replace_list_with_array(dic): def build_tier_dsp() -> None: + # CLI config argparser = argparse.ArgumentParser() - argparser.add_argument("--configs", help="configs path", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) - argparser.add_argument("--log", help="log file", type=str) + argparser.add_argument( + "--configs", help="path to dataflow config files", required=True + ) + argparser.add_argument("--metadata", help="metadata repository path", required=True) + argparser.add_argument("--log", help="log file name") - argparser.add_argument("--datatype", help="Datatype", type=str, required=True) - argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) - argparser.add_argument("--tier", help="Tier", type=str, required=True) + argparser.add_argument("--datatype", help="datatype", required=True) + argparser.add_argument("--timestamp", help="timestamp", required=True) + argparser.add_argument("--tier", help="tier", required=True) argparser.add_argument( - "--pars-file", help="database file for detector", nargs="*", default=[] + "--pars-file", help="database file for HPGes", nargs="*", default=[] ) - argparser.add_argument("--input", help="input file", type=str) + argparser.add_argument("--input", help="input file") - argparser.add_argument("--output", help="output file", type=str) - argparser.add_argument("--db-file", help="db file", type=str) + argparser.add_argument("--output", help="output file") + argparser.add_argument("--db-file", help="database file") args = argparser.parse_args() - configs = TextDB(args.configs, lazy=True) - config_dict = configs.on(args.timestamp, system=args.datatype)["snakemake_rules"] + df_configs = TextDB(args.configs, lazy=True) + config_dict = df_configs.on(args.timestamp, system=args.datatype).snakemake_rules + if args.tier in ["dsp", "psp"]: - config_dict = config_dict["tier_dsp"] + config_dict = config_dict.tier_dsp elif args.tier in ["ann", "pan"]: - config_dict = config_dict["tier_ann"] + config_dict = config_dict.tier_ann else: - msg = f"Tier {args.tier} not supported" + msg = f"tier {args.tier} not supported" raise ValueError(msg) log = build_log(config_dict, args.log) - channel_dict = config_dict["inputs"]["processing_chain"] - settings_dict = config_dict["options"].get("settings", {}) + settings_dict = config_dict.options.get("settings", {}) if isinstance(settings_dict, str): settings_dict = Props.read_from(settings_dict) - meta = LegendMetadata(path=args.metadata) - chan_map = meta.channelmap(args.timestamp, system=args.datatype) - - if args.tier in ["ann", "pan"]: - channel_dict = { - f"ch{chan_map[chan].daq.rawid:07}/dsp": Props.read_from(file) - for chan, file in channel_dict.items() - } - else: - channel_dict = { - f"ch{chan_map[chan].daq.rawid:07}/raw": Props.read_from(file) - for chan, file in channel_dict.items() + chan_map = LegendMetadata(args.metadata).channelmap( + args.timestamp, system=args.datatype + ) + chan_cfg_map = config_dict.inputs.processing_chain + + # if the dictionary only contains one __default__ key, build the channel + # list from the (processable) channel map and assign the default config + if list(chan_cfg_map.keys()) == ["__default__"]: + chan_cfg_map = { + chan: chan_cfg_map.__default__ + for chan in chan_map.group("analysis.processable")[True].map("name") } + + # now construct the dictionary of DSP configs for build_dsp() + channel_dict = {} + for chan, file in chan_cfg_map.items(): + if chan_map[chan].analysis.processable is False: + msg = f"channel {chan} is set to non-processable in the channel map" + raise RuntimeError(msg) + + tbl = "dsp" if args.tier in ["ann", "pan"] else "raw" + channel_dict[f"ch{chan_map[chan].daq.rawid:07}/{tbl}"] = Props.read_from(file) + + # par files db_files = [ par_file for par_file in args.pars_file if Path(par_file).suffix in (".json", ".yaml", ".yml") ] - database_dic = Props.read_from(db_files, subst_pathvar=True) - database_dic = replace_list_with_array(database_dic) + database_dic = _replace_list_with_array( + Props.read_from(db_files, subst_pathvar=True) + ) database_dic = { (f"ch{chan_map[chan].daq.rawid:07}" if chan in chan_map else chan): dic for chan, dic in database_dic.items() diff --git a/workflow/src/legenddataflow/scripts/tier/evt.py b/workflow/src/legenddataflow/scripts/tier/evt.py index 195fbd6..c18a3d2 100644 --- a/workflow/src/legenddataflow/scripts/tier/evt.py +++ b/workflow/src/legenddataflow/scripts/tier/evt.py @@ -1,11 +1,10 @@ import argparse import json -import time from pathlib import Path import lgdo.lh5 as lh5 import numpy as np -from dbetto import Props, TextDB +from dbetto import AttrsDict, Props, TextDB from legendmeta import LegendMetadata from lgdo.types import Array from pygama.evt import build_evt @@ -15,67 +14,49 @@ sto = lh5.LH5Store() -def find_matching_values_with_delay(arr1, arr2, jit_delay): - matching_values = [] - - # Create an array with all possible delay values - delays = np.arange(0, int(1e9 * jit_delay)) * jit_delay - - for delay in delays: - arr2_delayed = arr2 + delay - - # Find matching values and indices - mask = np.isin(arr1, arr2_delayed, assume_unique=True) - matching_values.extend(arr1[mask]) - - return np.unique(matching_values) - - def build_tier_evt() -> None: argparser = argparse.ArgumentParser() - argparser.add_argument("--hit-file", help="hit file", type=str) - argparser.add_argument("--dsp-file", help="dsp file", type=str) - argparser.add_argument("--tcm-file", help="tcm file", type=str) - argparser.add_argument("--ann-file", help="ann file") - argparser.add_argument("--xtc-file", help="xtc file", type=str) - argparser.add_argument("--par-files", help="par files", nargs="*") - - argparser.add_argument("--datatype", help="Datatype", type=str, required=True) - argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) - argparser.add_argument("--tier", help="Tier", type=str, required=True) - - argparser.add_argument("--configs", help="configs", type=str, required=True) - argparser.add_argument("--metadata", help="metadata path", type=str, required=True) - argparser.add_argument("--log", help="log_file", type=str) - - argparser.add_argument("--output", help="output file", type=str) + argparser.add_argument("--hit-file") + argparser.add_argument("--dsp-file") + argparser.add_argument("--tcm-file") + argparser.add_argument("--ann-file", nargs="*") + argparser.add_argument("--xtc-file", nargs="*") + argparser.add_argument("--par-files", nargs="*") + + argparser.add_argument("--datatype", required=True) + argparser.add_argument("--timestamp", required=True) + argparser.add_argument("--tier", required=True) + + argparser.add_argument("--configs", required=True) + argparser.add_argument("--metadata", required=True) + argparser.add_argument("--log") + + argparser.add_argument("--output") args = argparser.parse_args() - # load in config - configs = TextDB(args.configs, lazy=True) - if args.tier in ("evt", "pet"): - rule_dict = configs.on(args.timestamp, system=args.datatype)["snakemake_rules"][ - "tier_evt" - ] - - else: - msg = "unknown tier" + if args.tier not in ("evt", "pet"): + msg = f"unsupported tier {args.tier}" raise ValueError(msg) - config_dict = rule_dict["inputs"] - evt_config_file = config_dict["evt_config"] - - log = build_log(rule_dict, args.log) - - meta = LegendMetadata(args.metadata, lazy=True) - chmap = meta.channelmap(args.timestamp) + # load in config + df_config = ( + TextDB(args.configs, lazy=True) + .on(args.timestamp, system=args.datatype) + .snakemake_rules.tier_evt + ) + log = build_log(df_config, args.log) - evt_config = Props.read_from(evt_config_file) + chmap = LegendMetadata(args.metadata, lazy=True).channelmap(on=args.timestamp) + evt_config = AttrsDict(Props.read_from(df_config.inputs.evt_config)) if args.datatype in ("phy", "xtc"): - exp_string = evt_config["operations"]["geds___energy"]["expression"] + if len(args.xtc_file) == 0: + msg = f"datatype is {args.datatype} but no xtc file was supplied" + raise ValueError(msg) + + exp_string = evt_config.operations.geds___energy.expression exp_string = exp_string.replace( - 'xtalk_matrix_filename=""', f'xtalk_matrix_filename="{args.xtc_file}"' + 'xtalk_matrix_filename=""', f'xtalk_matrix_filename="{args.xtc_file[0]}"' ) exp_string = exp_string.replace( 'cal_par_files=""', f"cal_par_files={args.par_files}" @@ -96,7 +77,7 @@ def build_tier_evt() -> None: Props.add_to(evt_config, file_path_config) # block for snakemake to fill in channel lists - for field, dic in evt_config["channels"].items(): + for field, dic in evt_config.channels.items(): if isinstance(dic, dict): chans = chmap.map("system", unique=False)[dic["system"]] if "selectors" in dic: @@ -109,11 +90,10 @@ def build_tier_evt() -> None: chans = [f"ch{chan}" for chan in list(chans.map("daq.rawid"))] else: chans = [] - evt_config["channels"][field] = chans + evt_config.channels[field] = chans - log.debug(json.dumps(evt_config["channels"], indent=2)) + log.debug(json.dumps(evt_config.channels, indent=2)) - t_start = time.time() Path(args.output).parent.mkdir(parents=True, exist_ok=True) file_table = { @@ -123,7 +103,7 @@ def build_tier_evt() -> None: "evt": (None, "evt"), } - if args.ann_file is not None: + if len(args.ann_file) > 0: file_table["ann"] = (args.ann_file, "dsp", "ch{}") table = build_evt( @@ -131,9 +111,12 @@ def build_tier_evt() -> None: evt_config, ) - if "muon_config" in config_dict and config_dict["muon_config"] is not None: - muon_config = Props.read_from(config_dict["muon_config"]["evt_config"]) - field_config = Props.read_from(config_dict["muon_config"]["field_config"]) + if ( + "muon_config" in df_config.inputs + and df_config.inputs["muon_config"] is not None + ): + muon_config = Props.read_from(df_config.inputs["muon_config"]["evt_config"]) + field_config = Props.read_from(df_config.inputs["muon_config"]["field_config"]) # block for snakemake to fill in channel lists for field, dic in muon_config["channels"].items(): if isinstance(dic, dict): @@ -167,7 +150,7 @@ def build_tier_evt() -> None: muon_timestamp = muon_table[field_config["muon_timestamp"]["field"]].nda muon_tbl_flag = muon_table[field_config["muon_flag"]["field"]].nda if len(muon_timestamp[muon_tbl_flag]) > 0: - is_muon_veto_triggered = find_matching_values_with_delay( + is_muon_veto_triggered = _find_matching_values_with_delay( trigger_timestamp, muon_timestamp[muon_tbl_flag], field_config["jitter"], @@ -183,5 +166,18 @@ def build_tier_evt() -> None: sto.write(obj=table, name="evt", lh5_file=args.output, wo_mode="a") - t_elap = time.time() - t_start - log.info(f"Done! Time elapsed: {t_elap:.2f} sec.") + +def _find_matching_values_with_delay(arr1, arr2, jit_delay): + matching_values = [] + + # Create an array with all possible delay values + delays = np.arange(0, int(1e9 * jit_delay)) * jit_delay + + for delay in delays: + arr2_delayed = arr2 + delay + + # Find matching values and indices + mask = np.isin(arr1, arr2_delayed, assume_unique=True) + matching_values.extend(arr1[mask]) + + return np.unique(matching_values) diff --git a/workflow/src/legenddataflow/scripts/tier/hit.py b/workflow/src/legenddataflow/scripts/tier/hit.py index 7ba9d54..b3600dd 100644 --- a/workflow/src/legenddataflow/scripts/tier/hit.py +++ b/workflow/src/legenddataflow/scripts/tier/hit.py @@ -1,10 +1,8 @@ import argparse -import time from pathlib import Path from dbetto.catalog import Props from legendmeta import LegendMetadata, TextDB -from lgdo import lh5 from pygama.hit.build_hit import build_hit from ...log import build_log @@ -12,65 +10,76 @@ def build_tier_hit() -> None: argparser = argparse.ArgumentParser() - argparser.add_argument("--input", help="input file", type=str) - argparser.add_argument("--pars-file", help="hit pars file", nargs="*") + argparser.add_argument("--input") + argparser.add_argument("--pars-file", nargs="*") - argparser.add_argument("--configs", help="configs", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) - argparser.add_argument("--log", help="log_file", type=str) + argparser.add_argument("--configs", required=True) + argparser.add_argument("--metadata", required=True) + argparser.add_argument("--log") - argparser.add_argument("--datatype", help="Datatype", type=str, required=True) - argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) - argparser.add_argument("--tier", help="Tier", type=str, required=True) + argparser.add_argument("--datatype", required=True) + argparser.add_argument("--timestamp", required=True) + argparser.add_argument("--tier", required=True) - argparser.add_argument("--output", help="output file", type=str) - argparser.add_argument("--db-file", help="db file", type=str) + argparser.add_argument("--output") + argparser.add_argument("--db-file") args = argparser.parse_args() - configs = TextDB(args.configs, lazy=True) - if args.tier == "hit" or args.tier == "pht": - config_dict = configs.on(args.timestamp, system=args.datatype)[ - "snakemake_rules" - ]["tier_hit"] - else: - msg = "unknown tier" + if args.tier not in ("hit", "pht"): + msg = f"unsupported tier {args.tier}" raise ValueError(msg) - log = build_log(config_dict, args.log) + df_config = ( + TextDB(args.configs, lazy=True) + .on(args.timestamp, system=args.datatype) + .snakemake_rules.tier_hit + ) + log = build_log(df_config, args.log) + log.info("initializing") + + settings_dict = df_config.options.get("settings", {}) - channel_dict = config_dict["inputs"]["hit_config"] - settings_dict = config_dict["options"].get("settings", {}) if isinstance(settings_dict, str): settings_dict = Props.read_from(settings_dict) - meta = LegendMetadata(path=args.metadata) - chan_map = meta.channelmap(args.timestamp, system=args.datatype) - - pars_dict = Props.read_from(args.pars_file) - pars_dict = {chan: chan_dict["pars"] for chan, chan_dict in pars_dict.items()} - - hit_dict = {} - channels_present = lh5.ls(args.input) - for channel in pars_dict: - chan_pars = pars_dict[channel].copy() - - if channel in channel_dict: - cfg_dict = Props.read_from(channel_dict[channel]) - Props.add_to(cfg_dict, chan_pars) - chan_pars = cfg_dict - - if f"ch{chan_map[channel].daq.rawid}" in channels_present: - hit_dict[f"ch{chan_map[channel].daq.rawid}/dsp"] = chan_pars - - t_start = time.time() + # mapping channel -> hit config file + chan_cfg_map = df_config.inputs.hit_config + # channel map + chan_map = LegendMetadata(args.metadata).channelmap( + args.timestamp, system=args.datatype + ) + + log.info("building the build_hit config") + # if the mapping only contains one __default__ key, build the channel + # list from the (processable) channel map and assign the default config + if list(chan_cfg_map.keys()) == ["__default__"]: + chan_cfg_map = { + chan: chan_cfg_map.__default__ + for chan in chan_map.group("analysis.processable")[True].map("name") + } + + # now construct the dictionary of hit configs for build_hit() + channel_dict = {} + pars_dict = {ch: chd["pars"] for ch, chd in Props.read_from(args.pars_file).items()} + for chan, file in chan_cfg_map.items(): + if chan_map[chan].analysis.processable is False: + msg = f"channel {chan} is set to non-processable in the channel map" + raise RuntimeError(msg) + + hit_cfg = Props.read_from(file) + + # get pars (to override hit config) + Props.add_to(hit_cfg, pars_dict.get("chan", {}).copy()) + + channel_dict[f"ch{chan_map[chan].daq.rawid}/dsp"] = hit_cfg + + log.info("running build_hit()...") Path(args.output).parent.mkdir(parents=True, exist_ok=True) - build_hit(args.input, lh5_tables_config=hit_dict, outfile=args.output) - t_elap = time.time() - t_start - log.info(f"Done! Time elapsed: {t_elap:.2f} sec.") + build_hit(args.input, lh5_tables_config=channel_dict, outfile=args.output) hit_outputs = {} hit_channels = [] - for channel, file in channel_dict.items(): + for channel, file in chan_cfg_map.items(): output = Props.read_from(file)["outputs"] in_dict = False for entry in hit_outputs: diff --git a/workflow/src/legenddataflow/utils.py b/workflow/src/legenddataflow/utils.py index dc90ed0..9868e57 100644 --- a/workflow/src/legenddataflow/utils.py +++ b/workflow/src/legenddataflow/utils.py @@ -11,13 +11,6 @@ import string from pathlib import Path -# from dateutil import parser - -# For testing/debugging, use -# from scripts.utils import * -# import snakemake as smk -# setup = smk.load_configfile("config.json")["setups"]["l200"] - def sandbox_path(setup): if "sandbox_path" in setup["paths"]: