From b64c4af69403ba7f896f27cfdd8f0369e84c732a Mon Sep 17 00:00:00 2001 From: Patrick Krause Date: Tue, 28 Nov 2023 15:54:45 +0100 Subject: [PATCH] implemented suggestions of luigi: retrieve ids in seperate config block, explicit tier naming, field naming changes --- src/pygama/evt/build_evt.py | 566 ++++++++++-------- tests/evt/configs/basic-evt-config.json | 49 +- tests/evt/configs/module-test-evt-config.json | 45 +- .../module-test-t0-vov-evt-config.json | 42 +- tests/evt/configs/query-test-evt-config.json | 88 +++ tests/evt/configs/vov-test-evt-config.json | 26 +- tests/evt/test_build_evt.py | 73 +-- 7 files changed, 531 insertions(+), 358 deletions(-) create mode 100644 tests/evt/configs/query-test-evt-config.json diff --git a/src/pygama/evt/build_evt.py b/src/pygama/evt/build_evt.py index f606e3774..cc1e237e5 100644 --- a/src/pygama/evt/build_evt.py +++ b/src/pygama/evt/build_evt.py @@ -45,9 +45,8 @@ def evaluate_expression( expr: str, nrows: int, group: str, - dsp_group: str, - hit_group: str, para: dict = None, + qry: str = None, defv=np.nan, ) -> dict: """ @@ -74,6 +73,8 @@ def evaluate_expression( - "all": Logical and between all channels. Non boolean values are True for values != 0 and False for values == 0. - ch_field: A previously generated channel_id field (i.e. from the get_ch flag) can be given here, and the value of this specific channels is used. if ch_field is a VectorOfVectors, the channel list is ignored. If ch_field is an Array, the intersection of the passed channels list and the Array is formed. If a channel is not in the Array, the default is used. - "vov": Channels are not combined, but result saved as VectorOfVectors. Use of getch is recommended. It is possible (and recommended) to add a condition (e.g. "vov>10"). Only channels fulfilling this condition are saved. + qry + A query that can set a condition on mode. Can be any tier (i.e. a channelxevents shaped boolean matrix for tiers below event or an events long boolean array at the evt level) expr The expression. That can be any mathematical equation/comparison. If mode == func, the expression needs to be a special processing function defined in modules (e.g. "modules.spm.get_energy). In the expression parameters from either hit, dsp, evt tier (from operations performed before this one! --> JSON operations order matters), or from the "parameters" field can be used. nrows @@ -94,21 +95,21 @@ def evaluate_expression( mode, sorter = mod, None if isinstance(mod, list): mode = mod[0] - sorter = mod[1] + sorter = mod[1].split(".") # find parameters in evt file or in parameters - exprl = re.findall(r"[a-zA-Z_$][\w$]*", expr) + exprl = re.findall(r"(evt|hit|dsp).([a-zA-Z_$][\w$]*)", expr) var_ph = {} if os.path.exists(f_evt): - var_ph = load_vars_to_nda(f_evt, group, exprl) + var_ph = load_vars_to_nda(f_evt, "", exprl) if para: var_ph = var_ph | para - if mode == "func": + if mode == "function": # evaluate expression func, params = expr.split("(") params = [f_hit, f_dsp, f_tcm, chns] + [ - num_and_pars(e, var_ph) for e in params[:-1].split(",") + num_and_pars(e.replace(".", "_"), var_ph) for e in params[:-1].split(",") ] # load function dynamically @@ -118,14 +119,18 @@ def evaluate_expression( return {"values": out} else: - # evaluate possible operator in mode - ops = re.findall(r"([<>]=?|==)", mode) - op, mode_lim = None, None - if len(ops) == 1: - op = ops[0] - mode_lim = float(mode.split(op)[-1]) - elif len(ops) > 1: - raise ValueError(mode + " contains invalid operator") + # check if query is either on channel basis or evt basis (and not a mix) + qry_mask = qry + if qry is not None: + if "evt." in qry and ("hit." in qry or "dsp." in qry): + raise ValueError("Query can't be a mix of evt tier and lower tiers.") + + # if it is an evt query we can evaluate it directly here + if os.path.exists(f_evt) and "evt." in qry: + var_qry = load_vars_to_nda( + f_evt, "", re.findall(r"(evt).([a-zA-Z_$][\w$]*)", qry) + ) + qry_mask = eval(qry.replace("evt.", "evt_"), var_qry) # load TCM data to define an event nda = store.load_nda(f_tcm, ["array_id", "array_idx"], "hardware_tcm_1/") @@ -133,19 +138,20 @@ def evaluate_expression( idx = nda["array_idx"] # switch through modes - if os.path.exists(f_evt) and mode in [ - e.split("/")[-1] for e in store.ls(f_evt, group) - ]: + if ( + os.path.exists(f_evt) + and "evt." == mode[:4] + and mode.split(".")[-1] + in [e.split("/")[-1] for e in store.ls(f_evt, "/evt/")] + ): lstore = store.LH5Store() - ch_comp, _ = lstore.read_object(group + mode, f_evt) + ch_comp, _ = lstore.read_object(mode.replace(".", "/"), f_evt) if isinstance(ch_comp, Array): return evaluate_at_channel( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, @@ -158,9 +164,7 @@ def evaluate_expression( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, expr, exprl, ch_comp, @@ -171,73 +175,62 @@ def evaluate_expression( type(ch_comp) + " not supported (only Array and VectorOfVectors are supported)" ) - elif "first" in mode: + + elif "first" == mode: return evaluate_to_first( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, + qry_mask, nrows, - mode_lim, sorter, - op, var_ph, defv, ) - elif "last" in mode: + elif "last" == mode: return evaluate_to_last( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, + qry_mask, nrows, - mode_lim, sorter, - op, var_ph, defv, ) - elif "tot" in mode: + elif "sum" == mode: return evaluate_to_tot( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, + qry_mask, nrows, - mode_lim, - op, var_ph, defv, ) - elif "vov" in mode: + elif "vov" == mode: return evaluate_to_vector( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, + qry_mask, nrows, - mode_lim, - op, var_ph, ) elif "any" == mode: @@ -245,12 +238,11 @@ def evaluate_expression( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, + qry_mask, nrows, var_ph, defv, @@ -260,12 +252,11 @@ def evaluate_expression( idx, ids, f_hit, - hit_group, f_dsp, - dsp_group, chns, expr, exprl, + qry_mask, nrows, var_ph, defv, @@ -280,22 +271,31 @@ def find_parameters( ch: str, idx_ch: np.ndarray, exprl: list, - dsp_group: str, - hit_group: str, ) -> dict: # find fields in either dsp, hit - var = load_vars_to_nda(f_hit, ch + hit_group, exprl) - dsp_dic = load_vars_to_nda(f_dsp, ch + dsp_group, exprl) + var = load_vars_to_nda(f_hit, ch, exprl, idx_ch) + dsp_dic = load_vars_to_nda(f_dsp, ch, exprl, idx_ch) return dsp_dic | var -def load_vars_to_nda(f_evt: str, group: str, exprl: list) -> dict: +def load_vars_to_nda( + f_evt: str, group: str, exprl: list, idx: np.ndarray = None +) -> dict: lstore = store.LH5Store() - flds = [ - e.split("/")[-1] for e in store.ls(f_evt, group) if e.split("/")[-1] in exprl - ] - var = {e: lstore.read_object(group + e, f_evt)[0] for e in flds} + var = { + f"{e[0]}_{e[1]}": lstore.read_object( + f"{group.replace('/','')}/{e[0]}/{e[1]}", + f_evt, + idx=idx, + )[0] + for e in exprl + if e[1] + in [ + x.split("/")[-1] + for x in store.ls(f_evt, f"{group.replace('/','')}/{e[0]}/") + ] + } # to make any operations to VoVs we have to blow it up to a table (future change to more intelligant way) arr_keys = [] @@ -316,6 +316,8 @@ def load_vars_to_nda(f_evt: str, group: str, exprl: list) -> dict: if len(arr_keys) > 0 and not set(arr_keys) == set(var.keys()): for key in arr_keys: var[key] = var[key][:, None] + + log.debug(f"Found parameters {var.keys()}") return var @@ -323,16 +325,13 @@ def evaluate_to_first( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, + qry: str | np.ndarray, nrows: int, - mode_lim: int | float, - sorter: str, - op: str = None, + sorter: list, var_ph: dict = None, defv=np.nan, ) -> dict: @@ -345,39 +344,53 @@ def evaluate_to_first( # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) - - # evaluate expression - res = eval(expr, var) - - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) - - # get unification condition if present in mode - if op is not None: - limarr = eval( - "".join(["res", op, "lim"]), - {"res": res, "lim": mode_lim}, + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + # move tier+dots in expression to underscores (e.g. evt.foo -> evt_foo) + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, ) + + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) + + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters(f_hit, f_dsp, ch, idx_ch, qry_lst) + limarr = eval(qry.replace("dsp.", "dsp_").replace("hit.", "hit_"), qry_var) + + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry + + # if no condition, it must be true else: limarr = np.ones(len(res)).astype(bool) + if limarr.dtype != bool: + limarr = limarr.astype(bool) + # append to out according to mode == first if ch == chns[0]: outt[:] = np.inf # find if sorter is in hit or dsp - if sorter in [e.split("/")[-1] for e in store.ls(f_dsp, ch + dsp_group)]: - t0 = store.load_nda(f_dsp, [sorter], ch + dsp_group, idx_ch)[sorter] - elif sorter in [e.split("/")[-1] for e in store.ls(f_hit, ch + hit_group)]: - t0 = store.load_nda(f_hit, [sorter], ch + hit_group, idx_ch)[sorter] - else: - raise ValueError(f"Couldn't find sorter {sorter}") + t0 = store.load_nda( + f_hit if "hit" == sorter[0] else f_dsp, + [sorter[1]], + f"{ch}/{sorter[0]}/", + idx_ch, + )[sorter[1]] out[idx_ch] = np.where((t0 < outt) & (limarr), res, out[idx_ch]) out_chs[idx_ch] = np.where((t0 < outt) & (limarr), int(ch[2:]), out_chs[idx_ch]) @@ -390,16 +403,13 @@ def evaluate_to_last( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, + qry: str | np.ndarray, nrows: int, - mode_lim: int | float, - sorter: str, - op: str = None, + sorter: list, var_ph: dict = None, defv=np.nan, ) -> dict: @@ -411,38 +421,49 @@ def evaluate_to_last( for ch in chns: # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + # move tier+dots in expression to underscores (e.g. evt.foo -> evt_foo) + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, + ) - # find fields in either dsp, hit - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) - # evaluate expression - res = eval(expr, var) - - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) - - # get unification condition if present in mode - if op is not None: - limarr = eval( - "".join(["res", op, "lim"]), - {"res": res, "lim": mode_lim}, - ) + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters(f_hit, f_dsp, ch, idx_ch, qry_lst) + limarr = eval(qry.replace("dsp.", "dsp_").replace("hit.", "hit_"), qry_var) + + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry + + # if no condition, it must be true else: limarr = np.ones(len(res)).astype(bool) - + if limarr.dtype != bool: + limarr = limarr.astype(bool) # append to out according to mode == last # find if sorter is in hit or dsp - if sorter in [e.split("/")[-1] for e in store.ls(f_dsp, ch + dsp_group)]: - t0 = store.load_nda(f_dsp, [sorter], ch + dsp_group, idx_ch)[sorter] - elif sorter in [e.split("/")[-1] for e in store.ls(f_hit, ch + hit_group)]: - t0 = store.load_nda(f_hit, [sorter], ch + hit_group, idx_ch)[sorter] - else: - raise ValueError(f"Couldn't find sorter {sorter}") + t0 = store.load_nda( + f_hit if "hit" == sorter[0] else f_dsp, + [sorter[1]], + f"{ch}/{sorter[0]}/", + idx_ch, + )[sorter[1]] out[idx_ch] = np.where((t0 > outt) & (limarr), res, out[idx_ch]) out_chs[idx_ch] = np.where((t0 > outt) & (limarr), int(ch[2:]), out_chs[idx_ch]) @@ -455,15 +476,12 @@ def evaluate_to_tot( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, + qry: str | np.ndarray, nrows: int, - mode_lim: int | float, - op: str = None, var_ph: dict = None, defv=np.nan, ) -> dict: @@ -474,32 +492,45 @@ def evaluate_to_tot( # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] - # find fields in either dsp, hit - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) - - # evaluate expression - res = eval(expr, var) - - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) - - # get unification condition if present in mode - if op is not None: - limarr = eval( - "".join(["res", op, "lim"]), - {"res": res, "lim": mode_lim}, + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + # move tier+dots in expression to underscores (e.g. evt.foo -> evt_foo) + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, ) + + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) + + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters(f_hit, f_dsp, ch, idx_ch, qry_lst) + limarr = eval(qry.replace("dsp.", "dsp_").replace("hit.", "hit_"), qry_var) + + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry + + # if no condition, it must be true else: limarr = np.ones(len(res)).astype(bool) # append to out according to mode == tot if res.dtype == bool: res = res.astype(int) + if limarr.dtype != bool: + limarr = limarr.astype(bool) out[idx_ch] = np.where(limarr, res + out[idx_ch], out[idx_ch]) return {"values": out} @@ -509,12 +540,11 @@ def evaluate_to_any( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, + qry: str | np.ndarray, nrows: int, var_ph: dict = None, defv=np.nan, @@ -526,24 +556,46 @@ def evaluate_to_any( # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] - # find fields in either dsp, hit - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + # move tier+dots in expression to underscores (e.g. evt.foo -> evt_foo) + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, + ) - # evaluate expression - res = eval(expr, var) + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) + + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters(f_hit, f_dsp, ch, idx_ch, qry_lst) + limarr = eval(qry.replace("dsp.", "dsp_").replace("hit.", "hit_"), qry_var) - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry + + # if no condition, it must be true + else: + limarr = np.ones(len(res)).astype(bool) # append to out according to mode == any if res.dtype != bool: res = res.astype(bool) - out[idx_ch] = out[idx_ch] | res + if limarr.dtype != bool: + limarr = limarr.astype(bool) + out[idx_ch] = out[idx_ch] | (res & limarr) return {"values": out} @@ -552,12 +604,11 @@ def evaluate_to_all( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, + qry: str | np.ndarray, nrows: int, var_ph: dict = None, defv=np.nan, @@ -569,24 +620,45 @@ def evaluate_to_all( # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] - # find fields in either dsp, hit - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, + ) - # evaluate expression - res = eval(expr, var) + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) + + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters(f_hit, f_dsp, ch, idx_ch, qry_lst) + limarr = eval(qry.replace("dsp.", "dsp_").replace("hit.", "hit_"), qry_var) + + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) + # if no condition, it must be true + else: + limarr = np.ones(len(res)).astype(bool) # append to out according to mode == all if res.dtype != bool: res = res.astype(bool) - out[idx_ch] = out[idx_ch] & res + if limarr.dtype != bool: + limarr = limarr.astype(bool) + out[idx_ch] = out[idx_ch] & res & limarr return {"values": out} @@ -595,9 +667,7 @@ def evaluate_at_channel( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, @@ -611,19 +681,24 @@ def evaluate_at_channel( # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] - # find fields in either dsp, hit - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) - - # evaluate expression - res = eval(expr, var) + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, + ) - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) out[idx_ch] = np.where(int(ch[2:]) == ch_comp.nda, res, out[idx_ch]) @@ -634,9 +709,7 @@ def evaluate_at_channel_vov( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, expr: str, exprl: list, ch_comp: VectorOfVectors, @@ -651,21 +724,24 @@ def evaluate_at_channel_vov( # get index list for this channel to be loaded idx_ch = idx[ids == ch] - # find fields in either dsp, hit - var = ( - find_parameters( - f_hit, f_dsp, f"ch{ch}", idx_ch, exprl, dsp_group, hit_group + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, f"ch{ch}", idx_ch, exprl) | var_ph + + # evaluate expression + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, ) - | var_ph - ) - - # evaluate expression - res = eval(expr, var) - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) # see in which events the current channel is present mask = (out == ch).any(axis=1) @@ -683,15 +759,12 @@ def evaluate_to_vector( idx: np.ndarray, ids: np.ndarray, f_hit: str, - hit_group: str, f_dsp: str, - dsp_group: str, chns: list, expr: str, exprl: list, + qry: str | np.ndarray, nrows: int, - mode_lim: int | float, - op: str = None, var_ph: dict = None, ) -> dict: """ @@ -710,29 +783,41 @@ def evaluate_to_vector( # get index list for this channel to be loaded idx_ch = idx[ids == int(ch[2:])] - # find fields in either dsp, hit - var = ( - find_parameters(f_hit, f_dsp, ch, idx_ch, exprl, dsp_group, hit_group) - | var_ph - ) - - # evaluate expression - res = eval(expr, var) - - # if it is not a nparray it could be a single value - # expand accordingly - if not isinstance(res, np.ndarray): - res = np.full(len(out), res, dtype=type(res)) - - # get unification condition if present in mode - if op is not None: - limarr = eval( - "".join(["res", op, "lim"]), - {"res": res, "lim": mode_lim}, + if "tcm.array_id" == expr: + res = np.full(len(out), int(ch[2:]), dtype=int) + else: + # find fields in either dsp, hit + var = find_parameters(f_hit, f_dsp, ch, idx_ch, exprl) | var_ph + + # evaluate expression + res = eval( + expr.replace("dsp.", "dsp_") + .replace("hit.", "hit_") + .replace("evt.", "evt_"), + var, ) + + # if it is not a nparray it could be a single value + # expand accordingly + if not isinstance(res, np.ndarray): + res = np.full(len(out), res, dtype=type(res)) + + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters(f_hit, f_dsp, ch, idx_ch, qry_lst) + limarr = eval(qry.replace("dsp.", "dsp_").replace("hit.", "hit_"), qry_var) + + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry + + # if no condition, it must be true else: limarr = np.ones(len(res)).astype(bool) + if limarr.dtype != bool: + limarr = limarr.astype(bool) # append to out according to mode == vov out[:, i][limarr] = res[limarr] out_chs[:, i][limarr] = int(ch[2:]) @@ -763,8 +848,6 @@ def build_evt( wo_mode: str = "write_safe", group: str = "/evt/", tcm_group: str = "/hardware_tcm_1/", - dsp_group: str = "/dsp/", - hit_group: str = "/hit/", ) -> None: """ Transform data from the hit and dsp levels which a channel sorted @@ -845,10 +928,6 @@ def build_evt( lh5 root group name tcm_group lh5 root group in tcm file - dsp_group - lh5 root group in dsp file - hit_group - lh5 root group in hit file """ lstore = store.LH5Store() @@ -907,15 +986,15 @@ def build_evt( log.debug("Processing field" + k) # if mode not defined in operation, it can only be an operation on the evt level. - if "mode" not in v.keys(): - exprl = re.findall(r"[a-zA-Z_$][\w$]*", v["expression"]) + if "aggregation_mode" not in v.keys(): + exprl = re.findall(r"(evt).([a-zA-Z_$][\w$]*)", v["expression"]) var = {} if os.path.exists(f_evt): - var = load_vars_to_nda(f_evt, group, exprl) + var = load_vars_to_nda(f_evt, "", exprl) if "parameters" in v.keys(): var = var | v["parameters"] - res = eval(v["expression"], var) + res = eval(v["expression"].replace("evt.", "evt_"), var) # now check what dimension we have after the evaluation if len(res.shape) == 1: @@ -950,9 +1029,11 @@ def build_evt( itertools.chain.from_iterable([chns[e] for e in v["channels"]]) ) - pars, defaultv = None, np.nan + pars, qry, defaultv = None, None, np.nan if "parameters" in v.keys(): pars = v["parameters"] + if "query" in v.keys(): + qry = v["query"] if "initial" in v.keys() and not v["initial"] == "np.nan": defaultv = v["initial"] @@ -962,13 +1043,12 @@ def build_evt( f_hit, f_dsp, chns_e, - v["mode"], + v["aggregation_mode"], v["expression"], nrows, group, - dsp_group, - hit_group, pars, + qry, defaultv, ) @@ -984,16 +1064,16 @@ def build_evt( # if get_ch flag is true and exists and result dic contains channels entry # write also channels information - if "get_ch" in v.keys() and v["get_ch"] and "channels" in result.keys(): - obj = result["channels"] - if isinstance(obj, np.ndarray): - obj = Array(result["channels"]) - lstore.write_object( - obj=obj, - name=group + k + "_id", - lh5_file=f_evt, - wo_mode=wo_mode, - ) + # if "get_ch" in v.keys() and v["get_ch"] and "channels" in result.keys(): + # obj = result["channels"] + # if isinstance(obj, np.ndarray): + # obj = Array(result["channels"]) + # lstore.write_object( + # obj=obj, + # name=group + k + "_id", + # lh5_file=f_evt, + # wo_mode=wo_mode, + # ) log.info("Done") diff --git a/tests/evt/configs/basic-evt-config.json b/tests/evt/configs/basic-evt-config.json index aa0b68456..c573c89fb 100644 --- a/tests/evt/configs/basic-evt-config.json +++ b/tests/evt/configs/basic-evt-config.json @@ -5,62 +5,67 @@ "operations": { "multiplicity": { "channels": "geds_on", - "mode": "tot", - "expression": "cuspEmax_ctc_cal > a", + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", "parameters": { "a": 25 }, "initial": 0 }, "energy": { "channels": "geds_on", - "mode": ["first>25", "tp_0_est"], - "get_ch": true, - "expression": "cuspEmax_ctc_cal", + "aggregation_mode": ["first", "dsp.tp_0_est"], + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", "initial": "np.nan" }, + "energy_id": { + "channels": "geds_on", + "aggregation_mode": ["first", "dsp.tp_0_est"], + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "initial": 0 + }, "energy_any_above1MeV": { "channels": "geds_on", - "mode": "any", - "get_ch": true, - "expression": "cuspEmax_ctc_cal>1000", + "aggregation_mode": "any", + "expression": "hit.cuspEmax_ctc_cal>1000", "initial": false }, "energy_all_above1MeV": { "channels": "geds_on", - "mode": "all", - "get_ch": true, - "expression": "cuspEmax_ctc_cal>1000", + "aggregation_mode": "all", + "expression": "hit.cuspEmax_ctc_cal>1000", "initial": false }, "energy_aux": { "channels": "geds_on", - "mode": ["last>25", "tp_0_est"], - "get_ch": true, - "expression": "cuspEmax_ctc_cal", + "aggregation_mode": ["last", "dsp.tp_0_est"], + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", "initial": "np.nan" }, "energy_sum": { "channels": "geds_on", - "mode": "tot>25", - "get_ch": true, - "expression": "cuspEmax_ctc_cal", + "aggregation_mode": "sum", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", "initial": 0.0 }, "is_usable_aoe": { "channels": "geds_on", - "mode": "energy_id", + "aggregation_mode": "evt.energy_id", "expression": "True", "initial": false }, "aoe": { "channels": "geds_on", - "mode": "energy_id", - "expression": "AoE_Classifier", + "aggregation_mode": "evt.energy_id", + "expression": "hit.AoE_Classifier", "initial": "np.nan" }, "is_aoe_rejected": { "channels": "geds_on", - "mode": "energy_id", - "expression": "~(AoE_Double_Sided_Cut)", + "aggregation_mode": "evt.energy_id", + "expression": "~(hit.AoE_Double_Sided_Cut)", "initial": false } } diff --git a/tests/evt/configs/module-test-evt-config.json b/tests/evt/configs/module-test-evt-config.json index 8f084034a..4810b91e0 100644 --- a/tests/evt/configs/module-test-evt-config.json +++ b/tests/evt/configs/module-test-evt-config.json @@ -5,50 +5,57 @@ }, "operations": { "energy_first": { - "channels": ["geds_on"], - "mode": ["first>25", "tp_0_est"], - "get_ch": true, - "expression": "cuspEmax_ctc_cal", + "channels": "geds_on", + "aggregation_mode": ["first", "dsp.tp_0_est"], + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", "initial": "np.nan" }, + "energy_first_id": { + "channels": "geds_on", + "aggregation_mode": ["first", "dsp.tp_0_est"], + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "initial": 0 + }, "t0": { "channels": ["geds_on"], - "mode": "energy_first_id", - "expression": "tp_0_est", + "aggregation_mode": "evt.energy_first_id", + "expression": "dsp.tp_0_est", "initial": 0.0 }, "lar_energy": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_energy(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy(0.5,evt.t0,48000,1000,5000)" }, "lar_multiplicity": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_majority(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority(0.5,evt.t0,48000,1000,5000)" }, "is_lar_rejected": { - "expression": "(lar_energy >4) | (lar_multiplicity > 4) " + "expression": "(evt.lar_energy >4) | (evt.lar_multiplicity > 4) " }, "lar_classifier": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_etc(0.5,t0,48000,100,6000,80,1)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_etc(0.5,evt.t0,48000,100,6000,80,1)" }, "lar_energy_dplms": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_energy_dplms(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy_dplms(0.5,evt.t0,48000,1000,5000)" }, "lar_multiplicity_dplms": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_majority_dplms(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority_dplms(0.5,evt.t0,48000,1000,5000)" }, "lar_time_shift": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_time_shift(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_time_shift(0.5,evt.t0,48000,1000,5000)" } } } diff --git a/tests/evt/configs/module-test-t0-vov-evt-config.json b/tests/evt/configs/module-test-t0-vov-evt-config.json index 436332409..06918a421 100644 --- a/tests/evt/configs/module-test-t0-vov-evt-config.json +++ b/tests/evt/configs/module-test-t0-vov-evt-config.json @@ -6,48 +6,54 @@ "operations": { "energy": { "channels": "geds_on", - "mode": "vov>25", - "get_ch": true, - "expression": "cuspEmax_ctc_cal" + "aggregation_mode": "vov", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal" + }, + "energy_id": { + "channels": "geds_on", + "aggregation_mode": "vov", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id" }, "t0": { "channels": ["geds_on"], - "mode": "energy_id", - "expression": "tp_0_est", + "aggregation_mode": "evt.energy_id", + "expression": "dsp.tp_0_est", "initial": 0.0 }, "lar_energy": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_energy(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy(0.5,evt.t0,48000,1000,5000)" }, "lar_multiplicity": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_majority(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority(0.5,evt.t0,48000,1000,5000)" }, "is_lar_rejected": { - "expression": "(lar_energy >4) | (lar_multiplicity > 4) " + "expression": "(evt.lar_energy >4) | (evt.lar_multiplicity > 4) " }, "lar_classifier": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_etc(0.5,t0,48000,100,6000,80,1)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_etc(0.5,evt.t0,48000,100,6000,80,1)" }, "lar_energy_dplms": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_energy_dplms(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy_dplms(0.5,evt.t0,48000,1000,5000)" }, "lar_multiplicity_dplms": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_majority_dplms(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority_dplms(0.5,evt.t0,48000,1000,5000)" }, "lar_time_shift": { "channels": "spms_on", - "mode": "func", - "expression": ".modules.spm.get_time_shift(0.5,t0,48000,1000,5000)" + "aggregation_mode": "function", + "expression": ".modules.spm.get_time_shift(0.5,evt.t0,48000,1000,5000)" } } } diff --git a/tests/evt/configs/query-test-evt-config.json b/tests/evt/configs/query-test-evt-config.json new file mode 100644 index 000000000..abbaa8da4 --- /dev/null +++ b/tests/evt/configs/query-test-evt-config.json @@ -0,0 +1,88 @@ +{ + "channels": { + "geds_on": ["ch1084803", "ch1084804", "ch1121600"] + }, + "operations":{ + "multiplicity": { + "channels": "geds_on", + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": { "a": 25 }, + "initial": 0 + }, + "test_sum": { + "channels": "geds_on", + "aggregation_mode": "sum", + "query":"evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_first": { + "channels": "geds_on", + "aggregation_mode": ["first", "dsp.tp_0_est"], + "query":"evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_first2": { + "channels": "geds_on", + "aggregation_mode": ["first", "dsp.tp_0_est"], + "expression": "True", + "initial": false + }, + "test_last": { + "channels": "geds_on", + "aggregation_mode": ["last", "dsp.tp_0_est"], + "query":"evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_last2": { + "channels": "geds_on", + "aggregation_mode": ["last", "dsp.tp_0_est"], + "expression": "True", + "initial": false + }, + "test_any": { + "channels": "geds_on", + "aggregation_mode": "any", + "query":"evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_any2": { + "channels": "geds_on", + "aggregation_mode": "any", + "query":"hit.cuspEmax_ctc_cal >25", + "expression": "True", + "initial": false + }, + "test_all": { + "channels": "geds_on", + "aggregation_mode": "all", + "query":"evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_all2": { + "channels": "geds_on", + "aggregation_mode": "all", + "query":"hit.cuspEmax_ctc_cal >25", + "expression": "True", + "initial": false + }, + "test_vov": { + "channels": "geds_on", + "aggregation_mode": "vov", + "query":"evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_vov2": { + "channels": "geds_on", + "aggregation_mode": "vov", + "expression": "True", + "initial": false + } + } +} \ No newline at end of file diff --git a/tests/evt/configs/vov-test-evt-config.json b/tests/evt/configs/vov-test-evt-config.json index d1bfc4120..6f057d18c 100644 --- a/tests/evt/configs/vov-test-evt-config.json +++ b/tests/evt/configs/vov-test-evt-config.json @@ -5,29 +5,35 @@ "operations": { "energy": { "channels": "geds_on", - "mode": "vov>25", - "get_ch": true, - "expression": "cuspEmax_ctc_cal" + "aggregation_mode": "vov", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal" + }, + "energy_id": { + "channels": "geds_on", + "aggregation_mode": "vov", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id" }, "aoe": { - "mode": "energy_id", - "expression": "AoE_Classifier" + "aggregation_mode": "evt.energy_id", + "expression": "hit.AoE_Classifier" }, "multiplicity": { "channels": "geds_on", - "mode": "tot", - "expression": "cuspEmax_ctc_cal > a", + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", "parameters": { "a": 25 }, "initial": 0 }, "energy_times_aoe": { - "expression": "energy*aoe" + "expression": "evt.energy*evt.aoe" }, "energy_times_multiplicity": { - "expression": "energy*multiplicity" + "expression": "evt.energy*evt.multiplicity" }, "multiplicity_squared": { - "expression": "multiplicity*multiplicity" + "expression": "evt.multiplicity*evt.multiplicity" } } } diff --git a/tests/evt/test_build_evt.py b/tests/evt/test_build_evt.py index 128833e5b..146fe5150 100644 --- a/tests/evt/test_build_evt.py +++ b/tests/evt/test_build_evt.py @@ -26,14 +26,10 @@ def test_basics(lgnd_test_data, tmptestdir): wo_mode="o", group="/evt/", tcm_group="hardware_tcm_1", - dsp_group="/dsp/", - hit_group="/hit/", ) assert os.path.exists(outfile) - assert ( - len(ls(outfile, "/evt/")) == 11 - ) # 7 operations of which 2 are requesting channel field + assert len(ls(outfile, "/evt/")) == 10 nda = load_nda( outfile, ["energy", "energy_aux", "energy_sum", "multiplicity"], "/evt/" ) @@ -154,7 +150,7 @@ def test_graceful_crashing(lgnd_test_data, tmptestdir): with pytest.raises(RuntimeError): build_evt(f_dsp, f_tcm, f_hit, outfile, f_config, meta_path) - with pytest.raises(NameError): + with pytest.raises(RuntimeError): build_evt(f_tcm, f_hit, f_dsp, outfile, f_config, meta_path) with pytest.raises(TypeError): @@ -168,57 +164,39 @@ def test_graceful_crashing(lgnd_test_data, tmptestdir): with pytest.raises(ValueError): build_evt(f_tcm, f_dsp, f_hit, outfile, conf, meta_path) - conf = { - "channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}, - "operations": {}, - } - build_evt(f_tcm, f_dsp, f_hit, outfile, conf, meta_path) - assert not os.path.exists(outfile) - conf = { "channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}, "operations": { - "energy": { + "foo": { "channels": "geds_on", - "mode": ["first>pineapple", "tp_0_est"], - "get_ch": True, - "expression": "cuspEmax_ctc_cal", - "initial": "np.nan", + "aggregation_mode": "banana", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": {"a": 25}, + "initial": 0, } }, } with pytest.raises(ValueError): build_evt(f_tcm, f_dsp, f_hit, outfile, conf, meta_path) - conf = { - "channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}, - "operations": { - "energy": { - "channels": "geds_on", - "mode": ["first>25", "tp_0_est"], - "get_ch": True, - "expression": "cuspEmax_ctc_cal$cuspEmax_ctc_cal", - "initial": "np.nan", - } - }, - } - with pytest.raises(SyntaxError): - build_evt(f_tcm, f_dsp, f_hit, outfile, conf, meta_path) - conf = { - "channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}, - "operations": { - "energy": { - "channels": "geds_on", - "mode": ["first>25", "coconut"], - "get_ch": True, - "expression": "cuspEmax_ctc_cal", - "initial": "np.nan", - } - }, - } - with pytest.raises(ValueError): - build_evt(f_tcm, f_dsp, f_hit, outfile, conf, meta_path) +def test_query(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + meta_path=None, + evt_config=f"{config_dir}/query-test-evt-config.json", + wo_mode="o", + group="/evt/", + tcm_group="hardware_tcm_1", + ) + assert len(ls(outfile, "/evt/")) == 12 def test_skimming(lgnd_test_data, tmptestdir): @@ -244,3 +222,6 @@ def test_skimming(lgnd_test_data, tmptestdir): skim_evt(outfile, "multiplicity == 3", None, None, "o") assert ac == len(lstore.read_object("/evt/energy", outfile)[0].to_aoesa().nda) + + with pytest.raises(ValueError): + skim_evt(outfile, "multiplicity == 3", None, None, "bla")