From 07786ae7bfe1cb507099ac75472fae212b83217e Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Tue, 23 Jul 2024 15:32:23 +0200 Subject: [PATCH 01/61] Add draft for wlcg targets with local mirror. --- analysis_templates/cms_minimal/law.cfg | 5 +++-- columnflow/tasks/framework/base.py | 26 ++++++++++++++++++++++++++ law.cfg | 5 +++-- modules/law | 2 +- 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/analysis_templates/cms_minimal/law.cfg b/analysis_templates/cms_minimal/law.cfg index d0991afcf..b695fd19e 100644 --- a/analysis_templates/cms_minimal/law.cfg +++ b/analysis_templates/cms_minimal/law.cfg @@ -91,8 +91,9 @@ lfn_sources: wlcg_fs_infn_redirector, wlcg_fs_global_redirector # the config name, the task family, the dataset name, or the shift name # (see AnalysisTask.get_config_lookup_keys() - and subclasses - for the exact order) # values can have the following format: -# for local targets : "local[, LOCAL_FS_NAME or STORE_PATH][, store_parts_modifier]" -# for remote targets: "wlcg[, WLCG_FS_NAME][, store_parts_modifier]" +# for local targets : "local[, LOCAL_FS_NAME or STORE_PATH][, store_parts_modifier]" +# for remote targets : "wlcg[, WLCG_FS_NAME][, store_parts_modifier]" +# for mirrored targets: "wlcg_mirrored, LOCAL_FS_NAME, WLCG_FS_NAME[, store_parts_modifier]" # (when WLCG_FS_NAME is empty, the tasks' "default_wlcg_fs" attribute is used) # the "store_parts_modifiers" can be the name of a function in the "store_parts_modifiers" aux dict # of the analysis instance, which is called with an output's store parts of an output to modify them diff --git a/columnflow/tasks/framework/base.py b/columnflow/tasks/framework/base.py index e8ad1c3cb..bbda49a88 100644 --- a/columnflow/tasks/framework/base.py +++ b/columnflow/tasks/framework/base.py @@ -65,6 +65,7 @@ class OutputLocation(enum.Enum): config = "config" local = "local" wlcg = "wlcg" + wlcg_mirrored = "wlcg_mirrored" class AnalysisTask(BaseTask, law.SandboxTask): @@ -866,6 +867,31 @@ def target(self, *path, **kwargs): kwargs.setdefault("store_parts_modifier", store_parts_modifier) return self.wlcg_target(*path, **kwargs) + if location[0] == OutputLocation.wlcg_mirrored: + # get other options + loc, wlcg_fs, store_parts_modifier = (location[1:] + [None, None, None])[:3] + kwargs.setdefault("store_parts_modifier", store_parts_modifier) + # create the local target + local_kwargs = kwargs.copy() + loc_key = "fs" if (loc and law.config.has_section(loc)) else "store" + local_kwargs.setdefault(loc_key, loc) + local_target = self.local_target(*path, **local_kwargs) + # create the wlcg target + wlcg_kwargs = kwargs.copy() + wlcg_kwargs.setdefault("fs", wlcg_fs) + wlcg_target = self.wlcg_target(*path, **wlcg_kwargs) + # build the mirrored target from these two + mirrored_target_cls = ( + law.MirroredFileTarget + if isinstance(local_target, law.LocalFileTarget) + else law.MirroredDirectoryTarget + ) + return mirrored_target_cls( + path=local_target.path, + remote_target=wlcg_target, + local_target=local_target, + ) + raise Exception(f"cannot determine output location based on '{location}'") def get_parquet_writer_opts(self, repeating_values: bool = False) -> dict[str, Any]: diff --git a/law.cfg b/law.cfg index d59a59283..634f6f6d4 100644 --- a/law.cfg +++ b/law.cfg @@ -91,8 +91,9 @@ lfn_sources: wlcg_fs_desy_store, wlcg_fs_infn_redirector, wlcg_fs_global_redirec # the config name, the task family, the dataset name, or the shift name # (see AnalysisTask.get_config_lookup_keys() - and subclasses - for the exact order) # values can have the following format: -# for local targets : "local[, LOCAL_FS_NAME or STORE_PATH][, store_parts_modifier]" -# for remote targets: "wlcg[, WLCG_FS_NAME][, store_parts_modifier]" +# for local targets : "local[, LOCAL_FS_NAME or STORE_PATH][, store_parts_modifier]" +# for remote targets : "wlcg[, WLCG_FS_NAME][, store_parts_modifier]" +# for mirrored targets: "wlcg_mirrored, LOCAL_FS_NAME, WLCG_FS_NAME[, store_parts_modifier]" # (when WLCG_FS_NAME is empty, the tasks' "default_wlcg_fs" attribute is used) # the "store_parts_modifiers" can be the name of a function in the "store_parts_modifiers" aux dict # of the analysis instance, which is called with an output's store parts of an output to modify them diff --git a/modules/law b/modules/law index 6c714a893..f4a020357 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 6c714a893b37cf16f4921313cc663c3faea7a706 +Subproject commit f4a0203577709366ce4c5ba143ea5e35cd34350a From 57a0a29f6b74d3bf19f30a8142234227fde429b7 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Tue, 23 Jul 2024 17:51:51 +0200 Subject: [PATCH 02/61] Update law. --- columnflow/tasks/ml.py | 2 +- modules/law | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/columnflow/tasks/ml.py b/columnflow/tasks/ml.py index 46aa149ea..25f9b2a04 100644 --- a/columnflow/tasks/ml.py +++ b/columnflow/tasks/ml.py @@ -1171,6 +1171,6 @@ def run(self: PlotMLResults): for index, f in enumerate(figs): f.savefig( - file_path.abs_dirname + "/" + file_path.basename.replace("0", str(index)), + file_path.absdirname + "/" + file_path.basename.replace("0", str(index)), format=file_path.ext(), ) diff --git a/modules/law b/modules/law index f4a020357..5593f184f 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit f4a0203577709366ce4c5ba143ea5e35cd34350a +Subproject commit 5593f184f16d3cba83f818b8051bb1c30b26f1b0 From e6a24d8c2784816e6d5c5c8abedb36575f2ff5d2 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Fri, 26 Jul 2024 16:42:26 +0200 Subject: [PATCH 03/61] added feature to skip categories by custom function --- columnflow/production/categories.py | 30 +++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/columnflow/production/categories.py b/columnflow/production/categories.py index 415b5dbd7..985691f60 100644 --- a/columnflow/production/categories.py +++ b/columnflow/production/categories.py @@ -12,7 +12,7 @@ from columnflow.categorization import Categorizer from columnflow.production import Producer, producer -from columnflow.util import maybe_import +from columnflow.util import maybe_import, InsertableDict from columnflow.columnar_util import set_ak_column np = maybe_import("numpy") @@ -22,7 +22,11 @@ logger = law.logger.get_logger(__name__) -@producer(produces={"category_ids"}) +@producer( + produces={"category_ids"}, + # custom function to skip categorizers + skip_category_func=None, +) def category_ids( self: Producer, events: ak.Array, @@ -34,12 +38,12 @@ def category_ids( """ category_ids = [] - for cat_inst in self.config_inst.get_leaf_categories(): + for cat_inst, categorizers in self.categorizer_map.items(): # start with a true mask cat_mask = np.ones(len(events), dtype=bool) # loop through selectors - for categorizer in self.categorizer_map[cat_inst]: + for categorizer in categorizers: events, mask = self[categorizer](events, **kwargs) cat_mask = cat_mask & mask @@ -58,13 +62,27 @@ def category_ids( return target_events -@category_ids.init -def category_ids_init(self: Producer) -> None: +@category_ids.setup +def category_ids_setup( + self: Producer, + reqs: dict, + inputs: dict, + reader_targets: InsertableDict, +) -> None: + + skip_category = lambda task, category_inst: False + if callable(self.skip_category_func): + skip_category = self.skip_category_func + # store a mapping from leaf category to categorizer classes for faster lookup self.categorizer_map = defaultdict(list) # add all categorizers obtained from leaf category selection expressions to the used columns for cat_inst in self.config_inst.get_leaf_categories(): + # check if skipped + if skip_category(self.task, cat_inst): + continue + # treat all selections as lists of categorizers for sel in law.util.make_list(cat_inst.selection): if Categorizer.derived_by(sel): From 05b1b5108d0960f84996bec447c6e7ee2eb053e1 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Fri, 26 Jul 2024 17:01:18 +0200 Subject: [PATCH 04/61] forgot a "self" --- columnflow/production/categories.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/columnflow/production/categories.py b/columnflow/production/categories.py index 985691f60..d88aa459f 100644 --- a/columnflow/production/categories.py +++ b/columnflow/production/categories.py @@ -70,7 +70,7 @@ def category_ids_setup( reader_targets: InsertableDict, ) -> None: - skip_category = lambda task, category_inst: False + skip_category = lambda self, task, category_inst: False if callable(self.skip_category_func): skip_category = self.skip_category_func From 4c217bc284dacd0b716df290fb81ee725eaf5953 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Fri, 26 Jul 2024 17:25:39 +0200 Subject: [PATCH 05/61] Revert "forgot a "self"" This reverts commit 05b1b5108d0960f84996bec447c6e7ee2eb053e1. --- columnflow/production/categories.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/columnflow/production/categories.py b/columnflow/production/categories.py index d88aa459f..985691f60 100644 --- a/columnflow/production/categories.py +++ b/columnflow/production/categories.py @@ -70,7 +70,7 @@ def category_ids_setup( reader_targets: InsertableDict, ) -> None: - skip_category = lambda self, task, category_inst: False + skip_category = lambda task, category_inst: False if callable(self.skip_category_func): skip_category = self.skip_category_func From f7ee1ae74a0a6902de99d4b89206a06145eb7c90 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Fri, 26 Jul 2024 18:18:56 +0200 Subject: [PATCH 06/61] reverted to init --- columnflow/production/categories.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/columnflow/production/categories.py b/columnflow/production/categories.py index 985691f60..394bafd90 100644 --- a/columnflow/production/categories.py +++ b/columnflow/production/categories.py @@ -12,7 +12,7 @@ from columnflow.categorization import Categorizer from columnflow.production import Producer, producer -from columnflow.util import maybe_import, InsertableDict +from columnflow.util import maybe_import from columnflow.columnar_util import set_ak_column np = maybe_import("numpy") @@ -62,14 +62,12 @@ def category_ids( return target_events -@category_ids.setup -def category_ids_setup( - self: Producer, - reqs: dict, - inputs: dict, - reader_targets: InsertableDict, -) -> None: +@category_ids.init +def category_ids_init(self: Producer) -> None: + if not self.inst_dict.get("task", None): + return + # define a dummy function to skip categories or get the given one skip_category = lambda task, category_inst: False if callable(self.skip_category_func): skip_category = self.skip_category_func @@ -80,7 +78,7 @@ def category_ids_setup( # add all categorizers obtained from leaf category selection expressions to the used columns for cat_inst in self.config_inst.get_leaf_categories(): # check if skipped - if skip_category(self.task, cat_inst): + if skip_category(self.inst_dict["task"], cat_inst): continue # treat all selections as lists of categorizers From d869c49a9fb0297f8fd749b5227a1a28c87aad77 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Fri, 26 Jul 2024 18:50:35 +0200 Subject: [PATCH 07/61] final fixes --- columnflow/production/categories.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/columnflow/production/categories.py b/columnflow/production/categories.py index 394bafd90..c00edcb58 100644 --- a/columnflow/production/categories.py +++ b/columnflow/production/categories.py @@ -25,7 +25,7 @@ @producer( produces={"category_ids"}, # custom function to skip categorizers - skip_category_func=None, + skip_category=(lambda task, category_inst: False), ) def category_ids( self: Producer, @@ -64,21 +64,16 @@ def category_ids( @category_ids.init def category_ids_init(self: Producer) -> None: - if not self.inst_dict.get("task", None): + if not self.inst_dict.get("task"): return - # define a dummy function to skip categories or get the given one - skip_category = lambda task, category_inst: False - if callable(self.skip_category_func): - skip_category = self.skip_category_func - # store a mapping from leaf category to categorizer classes for faster lookup self.categorizer_map = defaultdict(list) # add all categorizers obtained from leaf category selection expressions to the used columns for cat_inst in self.config_inst.get_leaf_categories(): # check if skipped - if skip_category(self.inst_dict["task"], cat_inst): + if self.skip_category(self.inst_dict["task"], cat_inst): continue # treat all selections as lists of categorizers From ed4e7923f289f1c72be0d9e74dcdae16194ee006 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Fri, 26 Jul 2024 18:52:42 +0200 Subject: [PATCH 08/61] added self to the skip_category --- columnflow/production/categories.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/columnflow/production/categories.py b/columnflow/production/categories.py index c00edcb58..2d4da610b 100644 --- a/columnflow/production/categories.py +++ b/columnflow/production/categories.py @@ -25,7 +25,7 @@ @producer( produces={"category_ids"}, # custom function to skip categorizers - skip_category=(lambda task, category_inst: False), + skip_category=(lambda self, task, category_inst: False), ) def category_ids( self: Producer, From 5d7090b8e4d60abf3428825457eb192d7abae6b3 Mon Sep 17 00:00:00 2001 From: Mathis Frahm <49306645+mafrahm@users.noreply.github.com> Date: Sat, 27 Jul 2024 10:32:58 +0200 Subject: [PATCH 09/61] allow stitching using non-leaf processes in sum_mc_weight_per_process (#518) * allow stitching using non-leaf processes in sum_mc_weight_per_process * Apply suggestions from code review Co-authored-by: Anas <103462379+haddadanas@users.noreply.github.com> --------- Co-authored-by: Anas <103462379+haddadanas@users.noreply.github.com> Co-authored-by: Marcel Rieger --- columnflow/production/normalization.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/columnflow/production/normalization.py b/columnflow/production/normalization.py index 72f60b2e1..b666fc562 100644 --- a/columnflow/production/normalization.py +++ b/columnflow/production/normalization.py @@ -99,16 +99,21 @@ def get_br_from_inclusive_dataset( # (identified as leaf processes that have no occurrences in the stats) # (or as non-leaf processes that are not in the stitching datasets) is_leaf = child_proc.is_leaf_process + child_in_weight_procs = str(child_proc.id) in sum_mc_weight_per_process if ( (is_leaf and str(child_proc.id) not in sum_mc_weight_per_process) or (not is_leaf and child_proc.id not in proc_ds_map) ): continue - proc_ids = [child_proc.id] if is_leaf else [p.id for p in child_proc.get_leaf_processes()] + proc_ids = [child_proc.id] if (is_leaf or child_in_weight_procs) else [ + p.id for p, _, _ in child_proc.walk_processes() if str(p.id) in sum_mc_weight_per_process + ] # compute the uncertainty on the branching ratio using number of events _br = N(sum(num_events_per_process.get(str(proc_id), 0) for proc_id in proc_ids)) / N(num_events) - # uncertainty is independent of the mc weights, so we can use the same relative uncertainty + # NOTE: we assume that the uncertainty is independent of the mc weights, so we can use + # the same relative uncertainty. This is a simplification, but should be fine for most + # cases. We can improve this by switching from jsons to hists when storing sum of weights. br[proc.id][child_proc.id] = sn.Number( sum(sum_mc_weight_per_process.get(str(proc_id), 0) for proc_id in proc_ids) / sum_mc_weight, _br(sn.UP, unc=True, factor=True) * 1j, # same relative uncertainty From 0ae80bd26628aa68835377865183b50fa8f747f4 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Tue, 6 Aug 2024 14:47:36 +0200 Subject: [PATCH 10/61] Hotfix missing parameter groups in datacards. --- columnflow/inference/__init__.py | 16 ++++++++++++---- columnflow/inference/cms/datacard.py | 12 +++++++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/columnflow/inference/__init__.py b/columnflow/inference/__init__.py index 1c5e2daaa..4303c69f4 100644 --- a/columnflow/inference/__init__.py +++ b/columnflow/inference/__init__.py @@ -255,7 +255,7 @@ def category_spec( config_variable: str | None = None, config_data_datasets: Sequence[str] | None = None, data_from_processes: Sequence[str] | None = None, - mc_stats: float | tuple | None = None, + mc_stats: int | float | tuple | None = None, empty_bin_value: float = 1e-5, ) -> DotDict: """ @@ -268,8 +268,8 @@ def category_spec( - *config_data_datasets*: List of names of datasets in the config to use for real data. - *data_from_processes*: Optional list of names of :py:meth:`process_spec` objects that, when *config_data_datasets* is not defined, make of a fake data contribution. - - *mc_stats*: Either *None* to disable MC stat uncertainties, or a float or tuple of - floats to control the options of MC stat options. + - *mc_stats*: Either *None* to disable MC stat uncertainties, or an integer, a float or + a tuple of thereof to control the options of MC stat options. - *empty_bin_value*: When bins are no content, they are filled with this value. """ return DotDict([ @@ -881,6 +881,7 @@ def add_parameter( *args, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, + group: str | Sequence[str] | None = None, **kwargs, ) -> DotDict: """ @@ -889,6 +890,9 @@ def add_parameter( via :py:meth:`parameter_spec`. Both *process* and *category* can be a string, a pattern, or sequence of them. + When *group* is given, the parameter is added to one or more parameter groups via + :py:meth:`add_parameter_to_group`. + If a parameter with the same name already exists in one of the processes throughout the categories, an exception is raised. """ @@ -915,6 +919,10 @@ def add_parameter( for process in _processes: process.parameters.append(_copy.deepcopy(parameter)) + # add to groups + if group: + self.add_parameter_to_group(parameter.name, group) + return parameter def remove_parameter( @@ -965,7 +973,7 @@ def get_parameter_groups( only_names: bool = False, ) -> list[DotDict | str]: """ - Returns a list of parameter group whose name match *group*. *group* can be a string, a + Returns a list of parameter groups whose name match *group*. *group* can be a string, a pattern, or sequence of them. When *only_names* is *True*, only names of parameter groups are returned rather than diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index 88ea5b684..c789503b2 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -268,6 +268,14 @@ def write( if blocks.line_parameters: empty_lines.add("line_parameters") + # groups + blocks.groups = [] + for group in self.inference_model_inst.get_parameter_groups(): + blocks.groups.append([group.name, "group", "="] + group.parameter_names) + + if blocks.groups: + empty_lines.add("groups") + # mc stats blocks.mc_stats = [] for cat_obj in cat_objects: @@ -291,6 +299,8 @@ def write( blocks.rates = self.align_lines(list(blocks.rates)) if blocks.line_parameters: blocks.line_parameters = self.align_lines(list(blocks.line_parameters)) + if blocks.groups: + blocks.groups = self.align_lines(list(blocks.groups)) if blocks.mc_stats: blocks.mc_stats = self.align_lines(list(blocks.mc_stats)) @@ -498,7 +508,7 @@ def get_shapes(param_name): h_data = sum(h_data[1:], h_data[0].copy()) data_name = data_pattern.format(category=cat_name) out_file[data_name] = h_data - _rates["data"] = h_data.sum().value + _rates["data"] = int(round(h_data.sum().value)) return (rates, effects, nom_pattern_comb, syst_pattern_comb) From a4e15957f8633eb362af7e3f3cc2d5e96b61c8e5 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Wed, 7 Aug 2024 15:07:14 +0200 Subject: [PATCH 11/61] Fix over/underflow handling. --- analysis_templates/cms_minimal/law.cfg | 2 +- columnflow/plotting/plot_util.py | 15 +++++++++------ law.cfg | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/analysis_templates/cms_minimal/law.cfg b/analysis_templates/cms_minimal/law.cfg index b695fd19e..3fce2ce3e 100644 --- a/analysis_templates/cms_minimal/law.cfg +++ b/analysis_templates/cms_minimal/law.cfg @@ -132,7 +132,7 @@ remote_lcg_setup_el7: /cvmfs/grid.cern.ch/centos7-ui-200122/etc/profile.d/setup- remote_lcg_setup_el9: /cvmfs/grid.cern.ch/alma9-ui-test/etc/profile.d/setup-alma9-test.sh # whether the loading of the remove lcg setup file is enforced -# otherwise is might be skipped in case gfal-ls, etc., are already available +# otherwise this might be skipped in case gfal-ls, etc., are already available remote_lcg_setup_force: True diff --git a/columnflow/plotting/plot_util.py b/columnflow/plotting/plot_util.py index 3f8cdfedc..57201c738 100644 --- a/columnflow/plotting/plot_util.py +++ b/columnflow/plotting/plot_util.py @@ -238,6 +238,7 @@ def apply_variable_settings( if overflow or underflow: for proc_inst, h in list(hists.items()): h = use_flow_bins(h, var_inst.name, underflow=underflow, overflow=overflow) + hists[proc_inst] = h # slicing slices = getattr(var_inst, "slice", None) or var_inst.x("slice", None) @@ -267,18 +268,20 @@ def use_flow_bins( :param axis_name: Name or index of the axis of interest. :param underflow: Whether to add the content of the underflow bin to the first bin of axis *axis_name. :param overflow: Whether to add the content of the overflow bin to the last bin of axis *axis_name*. - :return: Histogram with underflow and/or overflow content added to the first/last bin of the histogram. + :return: Copy of the histogram with underflow and/or overflow content added to the first/last + bin of the histogram. """ + # work on a copy of the histogram + h_out = h_in.copy() + + # nothing to do if neither flag is set if not overflow and not underflow: print(f"{use_flow_bins.__name__} has nothing to do since overflow and underflow are set to False") - return h_in + return h_out + # determine the index of the axis of interest and check if it has flow bins activated axis_idx = axis_name if isinstance(axis_name, int) else h_in.axes.name.index(axis_name) - - # work on a copy of the histogram - h_out = h_in.copy() h_view = h_out.view(flow=True) - if h_out.view().shape[axis_idx] + 2 != h_view.shape[axis_idx]: raise Exception(f"We expect axis {axis_name} to have assigned an underflow and overflow bin") diff --git a/law.cfg b/law.cfg index 634f6f6d4..1d07fb288 100644 --- a/law.cfg +++ b/law.cfg @@ -132,7 +132,7 @@ remote_lcg_setup_el7: /cvmfs/grid.cern.ch/centos7-ui-200122/etc/profile.d/setup- remote_lcg_setup_el9: /cvmfs/grid.cern.ch/alma9-ui-test/etc/profile.d/setup-alma9-test.sh # whether the loading of the remove lcg setup file is enforced -# otherwise is might be skipped in case gfal-ls, etc., are already available +# otherwise this might be skipped in case gfal-ls, etc., are already available remote_lcg_setup_force: True From 7ffa0677c8e5b9ee9b9e6ad50b49ee1ee9b047e5 Mon Sep 17 00:00:00 2001 From: Marcel Rieger Date: Wed, 7 Aug 2024 18:10:56 +0200 Subject: [PATCH 12/61] Bump scinum version in cf sandbox. --- sandboxes/cf.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sandboxes/cf.txt b/sandboxes/cf.txt index 87fe93248..d389fa0a5 100644 --- a/sandboxes/cf.txt +++ b/sandboxes/cf.txt @@ -2,7 +2,7 @@ tenacity!=8.4.0 luigi~=3.5.1 -scinum~=2.1.1 +scinum~=2.2.0 six~=1.16.0 pyyaml~=6.0.1 typing_extensions~=4.12.2 From 0e3960079f665074bc12f0c2b7b607b3e706396f Mon Sep 17 00:00:00 2001 From: Daniel Savoiu Date: Fri, 9 Aug 2024 11:45:25 +0200 Subject: [PATCH 13/61] Minor fixes in `WeightProducer`. --- columnflow/tasks/histograms.py | 2 +- columnflow/weight/__init__.py | 2 ++ columnflow/weight/empty.py | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/columnflow/tasks/histograms.py b/columnflow/tasks/histograms.py index 8255d063a..c392282f9 100644 --- a/columnflow/tasks/histograms.py +++ b/columnflow/tasks/histograms.py @@ -193,7 +193,7 @@ def run(self): ) # build the full event weight - if not self.weight_producer_inst.skip_func(): + if hasattr(self.weight_producer_inst, "skip_func") and not self.weight_producer_inst.skip_func(): events, weight = self.weight_producer_inst(events) else: weight = ak.Array(np.ones(len(events), dtype=np.float32)) diff --git a/columnflow/weight/__init__.py b/columnflow/weight/__init__.py index 56cfa1ff5..4d3a9af83 100644 --- a/columnflow/weight/__init__.py +++ b/columnflow/weight/__init__.py @@ -81,6 +81,7 @@ def update_cls_dict(cls_name, cls_dict, get_attr): raise Exception( f"weight producer {cls_name} received both mc_only and data_only", ) + if mc_only or data_only: if cls_dict.get("skip_func"): raise Exception( @@ -88,6 +89,7 @@ def update_cls_dict(cls_name, cls_dict, get_attr): "mc_only or data_only are set", ) + if "skip_func" not in cls_dict: def skip_func(self): # check mc_only and data_only if getattr(self, "dataset_inst", None): diff --git a/columnflow/weight/empty.py b/columnflow/weight/empty.py index 71e489148..178b063fb 100644 --- a/columnflow/weight/empty.py +++ b/columnflow/weight/empty.py @@ -14,4 +14,4 @@ @weight_producer def empty(self: WeightProducer, events: ak.Array, **kwargs) -> ak.Array: # simply return ones - return ak.Array(np.ones(len(events), dtype=np.float32)) + return events, ak.Array(np.ones(len(events), dtype=np.float32)) From 9d3e7b5d32474a0d96f73ddad0f07fa86e63fd35 Mon Sep 17 00:00:00 2001 From: Daniel Savoiu Date: Fri, 9 Aug 2024 12:04:35 +0200 Subject: [PATCH 14/61] Check all collections for colorbars when setting norm. --- columnflow/plotting/plot_functions_2d.py | 25 +++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/columnflow/plotting/plot_functions_2d.py b/columnflow/plotting/plot_functions_2d.py index fbe4f034f..de2aa0bb5 100644 --- a/columnflow/plotting/plot_functions_2d.py +++ b/columnflow/plotting/plot_functions_2d.py @@ -248,16 +248,23 @@ def plot_2d( h_sum.plot2d(ax=ax, **style_config["plot2d_cfg"]) # fix color bar minor ticks with SymLogNorm - cbar = ax.collections[-1].colorbar if isinstance(cbar_norm, mpl.colors.SymLogNorm): - _scale = cbar.ax.yaxis._scale - _scale.subs = [2, 3, 4, 5, 6, 7, 8, 9] - cbar.ax.yaxis.set_minor_locator( - mticker.SymmetricalLogLocator(_scale.get_transform(), subs=_scale.subs), - ) - cbar.ax.yaxis.set_minor_formatter( - mticker.LogFormatterSciNotation(_scale.base), - ) + # returned collections can vary -> brute-force set + # norm on all colorbars that are found + cbars = { + coll.colorbar + for coll in ax.collections + if coll.colorbar + } + for cbar in cbars: + _scale = cbar.ax.yaxis._scale + _scale.subs = [2, 3, 4, 5, 6, 7, 8, 9] + cbar.ax.yaxis.set_minor_locator( + mticker.SymmetricalLogLocator(_scale.get_transform(), subs=_scale.subs), + ) + cbar.ax.yaxis.set_minor_formatter( + mticker.LogFormatterSciNotation(_scale.base), + ) plt.tight_layout() From e7518c5fd6fe774601b661378112aee32031ef56 Mon Sep 17 00:00:00 2001 From: Mathis Frahm Date: Fri, 9 Aug 2024 15:15:27 +0200 Subject: [PATCH 15/61] use producer inst to determine producer_repr --- columnflow/tasks/framework/mixins.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/columnflow/tasks/framework/mixins.py b/columnflow/tasks/framework/mixins.py index 74f5d1f7f..d38981db6 100644 --- a/columnflow/tasks/framework/mixins.py +++ b/columnflow/tasks/framework/mixins.py @@ -938,7 +938,7 @@ def producer_repr(self) -> str: """ Return a string representation of the producer. """ - return str(self.producer) if self.producer != law.NO_STR else "none" + return str(self.producer_inst) if self.producer != law.NO_STR else "none" def store_parts(self) -> law.util.InsertableDict[str, str]: """ From 10f077b41fa4b8e5bed9f0f1572141b47e33f9c9 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Tue, 13 Aug 2024 11:09:17 +0200 Subject: [PATCH 16/61] Hotfix for buggy naf seutp. --- columnflow/tasks/framework/remote.py | 2 + .../tasks/framework/remote_bootstrap.sh | 54 ++++++++++++++++--- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index 42b5d03f6..5099719d3 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -813,6 +813,8 @@ def slurm_job_config(self, config, job_num, branches): config.render_variables["cf_bootstrap_name"] = "slurm" config.render_variables.setdefault("cf_pre_setup_command", "") config.render_variables.setdefault("cf_post_setup_command", "") + if self.slurm_flavor not in ("", law.NO_STR): + config.render_variables["cf_slurm_flavor"] = self.slurm_flavor # forward env variables for ev, rv in self.slurm_forward_env_variables.items(): diff --git a/columnflow/tasks/framework/remote_bootstrap.sh b/columnflow/tasks/framework/remote_bootstrap.sh index 89602eda8..6c9825d70 100755 --- a/columnflow/tasks/framework/remote_bootstrap.sh +++ b/columnflow/tasks/framework/remote_bootstrap.sh @@ -7,8 +7,9 @@ # Bootstrap function for standalone htcondor jobs. bootstrap_htcondor_standalone() { # set env variables - export CF_ON_HTCONDOR="1" export CF_REMOTE_ENV="1" + export CF_ON_HTCONDOR="1" + export CF_HTCONDOR_FLAVOR="{{cf_htcondor_flavor}}" export CF_CERN_USER="{{cf_cern_user}}" export CF_REPO_BASE="${LAW_JOB_HOME}/repo" export CF_DATA="${LAW_JOB_HOME}/cf_data" @@ -32,10 +33,11 @@ bootstrap_htcondor_standalone() { local force_lcg_setup="$( [ -z "{{cf_remote_lcg_setup_force}}" ] && echo "false" || echo "true" )" # temporary fix for missing voms/x509 variables in the lcg setup - export X509_CERT_DIR="/cvmfs/grid.cern.ch/etc/grid-security/certificates" - export X509_VOMS_DIR="/cvmfs/grid.cern.ch/etc/grid-security/vomsdir" - export X509_VOMSES="/cvmfs/grid.cern.ch/etc/grid-security/vomses" - export VOMS_USERCONF="/cvmfs/grid.cern.ch/etc/grid-security/vomses" + # (disabled in favor of the general software fix below which also sets these variables) + # export X509_CERT_DIR="/cvmfs/grid.cern.ch/etc/grid-security/certificates" + # export X509_VOMS_DIR="/cvmfs/grid.cern.ch/etc/grid-security/vomsdir" + # export X509_VOMSES="/cvmfs/grid.cern.ch/etc/grid-security/vomses" + # export VOMS_USERCONF="/cvmfs/grid.cern.ch/etc/grid-security/vomses" # fallback to a default path when the externally given software base is empty or inaccessible local fetch_software="true" @@ -48,6 +50,18 @@ bootstrap_htcondor_standalone() { else fetch_software="false" echo "found existing software at ${CF_SOFTWARE_BASE}" + + # temporary fix on the NAF that suffers from a proliferation of python 2.7 packages being + # prepended to the general python path, and simultaneously missing libraries (e.g. json-c) + # in the alma9 lcg setup that stops gfal from working + if [[ "${CF_HTCONDOR_FLAVOR}" = naf* ]]; then + export PATH="$( filter_path_var "${PATH}" "python2\.7" )" + export PYTHONPATH="$( filter_path_var "${PYTHONPATH}" "python2\.7" )" + export MAMBA_ROOT_PREFIX="${CF_SOFTWARE_BASE}/conda" + export MAMBA_EXE="${MAMBA_ROOT_PREFIX}/bin/micromamba" + source "${CF_SOFTWARE_BASE}/conda/etc/profile.d/micromamba.sh" "" || return "$?" + micromamba activate || return "$?" + fi fi # when gfal is not available, check that the lcg_setup file exists @@ -120,8 +134,9 @@ bootstrap_htcondor_standalone() { # Bootstrap function for slurm jobs. bootstrap_slurm() { # set env variables - export CF_ON_SLURM="1" export CF_REMOTE_ENV="1" + export CF_ON_SLURM="1" + export CF_SLURM_FLAVOR="{{cf_slurm_flavor}}" export CF_REPO_BASE="{{cf_repo_base}}" export CF_WLCG_CACHE_ROOT="${LAW_JOB_HOME}/cf_wlcg_cache" export KRB5CCNAME="FILE:{{kerberosproxy_file}}" @@ -219,5 +234,32 @@ bootstrap_crab() { return "0" } +# helper to remove fragments from ":"-separated path variables using expressions +filter_path_var() { + # get arguments + local old_val="$1" + shift + local regexps + regexps=( ${@} ) + + # loop through paths and set the new variable if no expression matched + local new_val="" + printf '%s:\0' "${old_val}" | while IFS=: read -d: -r p; do + local matched="false" + local regexp + for regexp in ${regexps[@]}; do + if echo "${p}" | grep -Po "${regexp}" &> /dev/null; then + matched="true" + break + fi + done + if ! ${matched}; then + [ ! -z "${new_val}" ] && new_val="${new_val}:" + new_val="${new_val}${p}" + echo "${new_val}" + fi + done | tail -n 1 +} + # job entry point bootstrap_{{cf_bootstrap_name}} "$@" From 4580a65e684b67f5b311c1e1042f8ac85e90a08b Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Tue, 13 Aug 2024 11:09:44 +0200 Subject: [PATCH 17/61] Add 'call' decorator for consistency. --- columnflow/columnar_util.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/columnflow/columnar_util.py b/columnflow/columnar_util.py index 582756a52..9391b1720 100644 --- a/columnflow/columnar_util.py +++ b/columnflow/columnar_util.py @@ -1565,6 +1565,17 @@ def PRODUCES(cls) -> IOFlagged: """ return cls.IOFlagged(cls, cls.IOFlag.PRODUCES) + @classmethod + def call(cls, func: Callable[[Any, ...], Any]) -> None: + """ + Decorator to wrap a function *func* that should be registered as :py:meth:`call_func` + which defines the main callable for processing chunks of data. The function should accept + arbitrary arguments and can return arbitrary objects. + + The decorator does not return the wrapped function. + """ + cls.call_func = func + @classmethod def init(cls, func: Callable[[], None]) -> None: """ From c3987e69dad98b4e62b34754436696f2ea19b6fc Mon Sep 17 00:00:00 2001 From: Mathis Frahm Date: Wed, 14 Aug 2024 10:47:39 +0200 Subject: [PATCH 18/61] fix kwargs of ml preparation producer inst --- columnflow/tasks/ml.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/columnflow/tasks/ml.py b/columnflow/tasks/ml.py index 25f9b2a04..cd114c621 100644 --- a/columnflow/tasks/ml.py +++ b/columnflow/tasks/ml.py @@ -14,7 +14,6 @@ from columnflow.tasks.framework.mixins import ( CalibratorsMixin, SelectorMixin, - ProducerMixin, ProducersMixin, MLModelDataMixin, MLModelTrainingMixin, @@ -88,8 +87,7 @@ def preparation_producer_inst(self): # set producer inst to None when no producer is requested self._preparation_producer_inst = None return self._preparation_producer_inst - - self._preparation_producer_inst = ProducerMixin.get_producer_inst(producer, {"task": self}) + self._preparation_producer_inst = self.get_producer_insts([producer], {"task": self})[0] # overwrite the sandbox when set sandbox = self._preparation_producer_inst.get_sandbox() @@ -643,7 +641,7 @@ def preparation_producer_inst(self): self._preparation_producer_inst = None return self._preparation_producer_inst - self._preparation_producer_inst = ProducerMixin.get_producer_inst(producer, {"task": self}) + self._preparation_producer_inst = self.get_producer_insts([producer], {"task": self})[0] # check that preparation_producer does not clash with ml_model_inst sandbox if ( From 49c05de7cb786eae9adc4517ddbd0dc9099c8b7a Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 15 Aug 2024 10:29:46 +0200 Subject: [PATCH 19/61] Inference model improvements. --- columnflow/inference/__init__.py | 146 ++++++++++++++++----------- columnflow/inference/cms/datacard.py | 24 +++-- columnflow/tasks/cms/inference.py | 41 ++++++-- modules/law | 2 +- 4 files changed, 133 insertions(+), 80 deletions(-) diff --git a/columnflow/inference/__init__.py b/columnflow/inference/__init__.py index 4303c69f4..db04b1e1a 100644 --- a/columnflow/inference/__init__.py +++ b/columnflow/inference/__init__.py @@ -188,26 +188,35 @@ class InferenceModel(Derivable): # optional initialization method init_func = None - class YamlDumper(yaml.SafeDumper or object): + class YamlDumper(yaml.SafeDumper): """ YAML dumper for statistical inference models with ammended representers to serialize internal, structured objects as safe, standard objects. """ - def __init__(self: InferenceModel.YamlDumper, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) + @classmethod + def _map_repr(cls, dumper: yaml.Dumper, data: dict) -> str: + return dumper.represent_dict(dict(data)) - # ammend representers - map_repr = lambda dumper, data: dumper.represent_mapping("tag:yaml.org,2002:map", data.items()) - self.add_representer(DotDict, map_repr) + @classmethod + def _list_repr(cls, dumper: yaml.Dumper, data: list) -> str: + return dumper.represent_list(list(data)) - list_repr = lambda dumper, data: dumper.represent_list(list(data)) - self.add_representer(tuple, list_repr) + @classmethod + def _str_repr(cls, dumper: yaml.Dumper, data: str) -> str: + return dumper.represent_str(str(data)) - str_repr = lambda dumper, data: dumper.represent_str(str(data)) - self.add_representer(ParameterType, str_repr) + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + # ammend representers + self.add_representer(DotDict, self._map_repr) + self.add_representer(tuple, self._list_repr) + self.add_representer(ParameterType, self._str_repr) + self.add_representer(ParameterTransformation, self._str_repr) + self.add_representer(ParameterTransformations, self._list_repr) - def ignore_aliases(self: InferenceModel.YamlDumper, *args, **kwargs) -> bool: + def ignore_aliases(self, *args, **kwargs) -> bool: return True @classmethod @@ -224,8 +233,10 @@ def inference_model( """ def decorator(func: Callable) -> DerivableMeta: # create the class dict - cls_dict = {"init_func": func} - cls_dict.update(kwargs) + cls_dict = { + **kwargs, + "init_func": func, + } # create the subclass subclass = cls.derive(func.__name__, bases=bases, cls_dict=cls_dict) @@ -265,7 +276,8 @@ def category_spec( - *name*: The name of the category in the model. - *config_category*: The name of the source category in the config to use. - *config_variable*: The name of the variable in the config to use. - - *config_data_datasets*: List of names of datasets in the config to use for real data. + - *config_data_datasets*: List of names or patterns of datasets in the config to use for + real data. - *data_from_processes*: Optional list of names of :py:meth:`process_spec` objects that, when *config_data_datasets* is not defined, make of a fake data contribution. - *mc_stats*: Either *None* to disable MC stat uncertainties, or an integer, a float or @@ -298,7 +310,7 @@ def process_spec( - *name*: The name of the process in the model. - *is_signal*: A boolean flag deciding whether this process describes signal. - *config_process*: The name of the source process in the config to use. - - *config_mc_datasets*: List of names of MC datasets in the config to use. + - *config_mc_datasets*: List of names or patterns of MC datasets in the config to use. - *scale*: A float value to scale the process, defaulting to 1.0. """ return DotDict([ @@ -357,7 +369,7 @@ def parameter_group_spec( ]) @classmethod - def require_shapes_for_parameter(self: InferenceModel, param_obj: dict) -> bool: + def require_shapes_for_parameter(self, param_obj: dict) -> bool: """ Returns *True* if for a certain parameter object *param_obj* varied shapes are needed, and *False* otherwise. @@ -378,11 +390,11 @@ def require_shapes_for_parameter(self: InferenceModel, param_obj: dict) -> bool: # other cases are not supported raise Exception( - f"shape requirement cannot be evaluated of parameter '{param_obj.name}' with type " + + f"shape requirement cannot be evaluated for parameter '{param_obj.name}' with type " + f"'{param_obj.type}' and transformations {param_obj.transformations}", ) - def __init__(self: InferenceModel, config_inst: od.Config) -> None: + def __init__(self, config_inst: od.Config) -> None: super().__init__() # store attributes @@ -391,19 +403,18 @@ def __init__(self: InferenceModel, config_inst: od.Config) -> None: # model info self.model = self.model_spec() - # custom init function when set, always followed by the cleanup + # custom init function when set if callable(self.init_func): self.init_func() - self.cleanup() - def to_yaml(self: InferenceModel, stream: TextIO | None = None) -> str | None: + def to_yaml(self, stream: TextIO | None = None) -> str | None: """ Writes the content of the :py:attr:`model` into a file-like object *stream* when given, and returns a string representation otherwise. """ return yaml.dump(self.model, stream=stream, Dumper=self.YamlDumper) - def pprint(self: InferenceModel) -> None: + def pprint(self) -> None: """ Pretty-prints the content of the :py:attr:`model` in yaml-style. """ @@ -414,11 +425,11 @@ def pprint(self: InferenceModel) -> None: # @property - def categories(self: InferenceModel) -> DotDict: + def categories(self) -> DotDict: return self.model.categories @property - def parameter_groups(self: InferenceModel) -> DotDict: + def parameter_groups(self) -> DotDict: return self.model.parameter_groups # @@ -426,7 +437,7 @@ def parameter_groups(self: InferenceModel) -> DotDict: # def get_categories( - self: InferenceModel, + self, category: str | Sequence[str] | None = None, only_names: bool = False, ) -> list[DotDict | str]: @@ -446,7 +457,7 @@ def get_categories( ] def get_category( - self: InferenceModel, + self, category: str | Sequence[str], only_name: bool = False, silent: bool = False, @@ -475,7 +486,7 @@ def get_category( return categories[0] def has_category( - self: InferenceModel, + self, category: str | Sequence[str], ) -> bool: """ @@ -488,7 +499,7 @@ def has_category( # simple length check return len(self.get_categories(category_pattern)) > 0 - def add_category(self: InferenceModel, *args, **kwargs) -> None: + def add_category(self, *args, **kwargs) -> None: """ Adds a new category with all *args* and *kwargs* used to create the structured category dictionary via :py:meth:`category_spec`. If a category with the same name already exists, an @@ -505,7 +516,7 @@ def add_category(self: InferenceModel, *args, **kwargs) -> None: self.categories.append(category) def remove_category( - self: InferenceModel, + self, category: str | Sequence[str], ) -> bool: """ @@ -536,7 +547,7 @@ def remove_category( # def get_processes( - self: InferenceModel, + self, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, only_names: bool = False, @@ -582,7 +593,7 @@ def get_processes( return processes def get_process( - self: InferenceModel, + self, process: str | Sequence[str], category: str | Sequence[str] | None = None, only_name: bool = False, @@ -635,7 +646,7 @@ def get_process( return processes[0] def has_process( - self: InferenceModel, + self, process: str | Sequence[str], category: str | Sequence[str] | None = None, ) -> bool: @@ -652,7 +663,7 @@ def has_process( return len(self.get_processes(process_pattern, category=category_pattern)) > 0 def add_process( - self: InferenceModel, + self, *args, category: str | Sequence[str] | None = None, silent: bool = False, @@ -691,7 +702,7 @@ def add_process( category.processes.append(_copy.deepcopy(process)) def remove_process( - self: InferenceModel, + self, process: str | Sequence[str], category: str | Sequence[str] | None = None, ) -> bool: @@ -729,7 +740,7 @@ def remove_process( # def get_parameters( - self: InferenceModel, + self, parameter: str | Sequence[str] | None = None, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, @@ -785,7 +796,7 @@ def get_parameters( return parameters def get_parameter( - self: InferenceModel, + self, parameter: str | Sequence[str], process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, @@ -853,7 +864,7 @@ def get_parameter( return parameters[0] def has_parameter( - self: InferenceModel, + self, parameter: str | Sequence[str], process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, @@ -877,7 +888,7 @@ def has_parameter( )) > 0 def add_parameter( - self: InferenceModel, + self, *args, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, @@ -926,7 +937,7 @@ def add_parameter( return parameter def remove_parameter( - self: InferenceModel, + self, parameter: str | Sequence[str], process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, @@ -968,7 +979,7 @@ def remove_parameter( # def get_parameter_groups( - self: InferenceModel, + self, group: str | Sequence[str] | None = None, only_names: bool = False, ) -> list[DotDict | str]: @@ -990,7 +1001,7 @@ def get_parameter_groups( ] def get_parameter_group( - self: InferenceModel, + self, group: str | Sequence[str], only_name: bool = False, ) -> DotDict | str: @@ -1016,7 +1027,7 @@ def get_parameter_group( return groups[0] def has_parameter_group( - self: InferenceModel, + self, group: str | Sequence[str], ) -> bool: """ @@ -1029,7 +1040,7 @@ def has_parameter_group( # simeple length check return len(self.get_parameter_groups(group_pattern)) > 0 - def add_parameter_group(self: InferenceModel, *args, **kwargs) -> None: + def add_parameter_group(self, *args, **kwargs) -> None: """ Adds a new parameter group with all *args* and *kwargs* used to create the structured parameter group dictionary via :py:meth:`parameter_group_spec`. If a group with the same @@ -1045,7 +1056,7 @@ def add_parameter_group(self: InferenceModel, *args, **kwargs) -> None: self.parameter_groups.append(group) def remove_parameter_group( - self: InferenceModel, + self, group: str | Sequence[str], ) -> bool: """ @@ -1068,7 +1079,7 @@ def remove_parameter_group( return removed_any def add_parameter_to_group( - self: InferenceModel, + self, parameter: str | Sequence[str], group: str | Sequence[str], ) -> bool: @@ -1107,7 +1118,7 @@ def add_parameter_to_group( return added_any def remove_parameter_from_groups( - self: InferenceModel, + self, parameter: str | Sequence[str], group: str | Sequence[str] | None = None, ) -> bool: @@ -1143,7 +1154,7 @@ def remove_parameter_from_groups( # def get_categories_with_process( - self: InferenceModel, + self, process: str | Sequence[str], ) -> list[str]: """ @@ -1157,7 +1168,7 @@ def get_categories_with_process( return list(self.get_processes(process=process_pattern, only_names=True).keys()) def get_processes_with_parameter( - self: InferenceModel, + self, parameter: str | Sequence[str], category: str | Sequence[str] | None = None, flat: bool = True, @@ -1192,7 +1203,7 @@ def get_processes_with_parameter( return processes def get_categories_with_parameter( - self: InferenceModel, + self, parameter: str | Sequence[str], process: str | Sequence[str] | None = None, flat: bool = True, @@ -1227,7 +1238,7 @@ def get_categories_with_parameter( return categories def get_groups_with_parameter( - self: InferenceModel, + self, parameter: str | Sequence[str], ) -> list[str]: """ @@ -1248,17 +1259,20 @@ def get_groups_with_parameter( # removal of empty and dangling objects # - def cleanup(self: InferenceModel) -> None: + def cleanup( + self, + keep_parameters: str | Sequence[str] | None = None, + ) -> None: """ Cleans the internal model structure by removing empty and dangling objects by calling - :py:meth:`remove_empty_categories`, :py:meth:`remove_dangling_parameters_from_groups` and - :py:meth:`remove_empty_parameter_groups` in that order. + :py:meth:`remove_empty_categories`, :py:meth:`remove_dangling_parameters_from_groups` + (receiving *keep_parameters*), and :py:meth:`remove_empty_parameter_groups` in that order. """ self.remove_empty_categories() - self.remove_dangling_parameters_from_groups() + self.remove_dangling_parameters_from_groups(keep_parameters=keep_parameters) self.remove_empty_parameter_groups() - def remove_empty_categories(self: InferenceModel) -> None: + def remove_empty_categories(self) -> None: """ Removes all categories that contain no processes. """ @@ -1268,7 +1282,10 @@ def remove_empty_categories(self: InferenceModel) -> None: if category.processes ] - def remove_dangling_parameters_from_groups(self: InferenceModel) -> None: + def remove_dangling_parameters_from_groups( + self, + keep_parameters: str | Sequence[str] | None = None, + ) -> None: """ Removes names of parameters from parameter groups that are not assigned to any process in any category. @@ -1276,15 +1293,22 @@ def remove_dangling_parameters_from_groups(self: InferenceModel) -> None: # get a list of all parameters parameter_names = self.get_parameters("*", flat=True) + # get list of parameters to keep + if keep_parameters: + keep_parameters = self.get_parameters(keep_parameters, flat=True) + # go through groups and remove dangling parameters for group in self.parameter_groups: group.parameter_names[:] = [ parameter_name for parameter_name in group.parameter_names - if parameter_name in parameter_names + if ( + parameter_name in parameter_names or + (keep_parameters and parameter_name in keep_parameters) + ) ] - def remove_empty_parameter_groups(self: InferenceModel) -> None: + def remove_empty_parameter_groups(self) -> None: """ Removes parameter groups that contain no parameter names. """ @@ -1299,7 +1323,7 @@ def remove_empty_parameter_groups(self: InferenceModel) -> None: # def iter_processes( - self: InferenceModel, + self, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, ) -> Generator[tuple[DotDict, DotDict], None, None]: @@ -1314,7 +1338,7 @@ def iter_processes( yield (category_name, process) def iter_parameters( - self: InferenceModel, + self, parameter: str | Sequence[str] | None = None, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, @@ -1335,7 +1359,7 @@ def iter_parameters( # def scale_process( - self: InferenceModel, + self, scale: int | float, process: str | Sequence[str] | None = None, category: str | Sequence[str] | None = None, diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index c789503b2..1ce943428 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -102,7 +102,7 @@ def write( blocks.observations = [ ("bin", list(rates)), ("observation", [ - round(_rates["data"], self.rate_precision) + int(round(_rates["data"], self.rate_precision)) for _rates in rates.values() ]), ] @@ -300,7 +300,7 @@ def write( if blocks.line_parameters: blocks.line_parameters = self.align_lines(list(blocks.line_parameters)) if blocks.groups: - blocks.groups = self.align_lines(list(blocks.groups)) + blocks.groups = self.align_lines(list(blocks.groups), end=3) if blocks.mc_stats: blocks.mc_stats = self.align_lines(list(blocks.mc_stats)) @@ -516,32 +516,36 @@ def get_shapes(param_name): def align_lines( cls, lines: Sequence[Any], + end: int = -1, ) -> list[str]: lines = [ (line.split() if isinstance(line, str) else list(map(str, law.util.flatten(line)))) for line in lines ] - lengths = {len(line) for line in lines} + lengths = {min(len(line), 1e9 if end < 0 else end) for line in lines} if len(lengths) > 1: raise Exception( f"line alignment cannot be performed with lines of varying lengths: {lengths}", ) - # convert to rows and get the maximum width per row - n_rows = list(lengths)[0] - rows = [ + # convert to columns and get the maximum width per column + n_cols = lengths.pop() + cols = [ [line[j] for line in lines] - for j in range(n_rows) + for j in range(n_cols) ] max_widths = [ - max(len(s) for s in row) - for row in rows + max(len(s) for s in col) + for col in cols ] # stitch back return [ - cls.col_sep.join(f"{s: <{max_widths[j]}}" for j, s in enumerate(line)) + cls.col_sep.join( + f"{s: <{max_widths[j]}}" if end < 0 or j < end else s + for j, s in enumerate(line) + ) for line in lines ] diff --git a/columnflow/tasks/cms/inference.py b/columnflow/tasks/cms/inference.py index 85ad1e7d7..502987363 100644 --- a/columnflow/tasks/cms/inference.py +++ b/columnflow/tasks/cms/inference.py @@ -41,14 +41,21 @@ def create_branch_map(self): def get_mc_datasets(self, proc_obj: dict) -> list[str]: """ - Helper to find automatic datasets + Helper to find mc datasets. :param proc_obj: process object from an InferenceModel - :return: List of dataset names corresponding to the process *proc_obj* + :return: List of dataset names corresponding to the process *proc_obj*. """ - # when datasets are defined on the process object itself, return them + # when datasets are defined on the process object itself, interpret them as patterns if proc_obj.config_mc_datasets: - return proc_obj.config_mc_datasets + return [ + dataset.name + for dataset in self.config_inst.datasets + if ( + dataset.is_mc and + law.util.multi_match(dataset.name, proc_obj.config_mc_datasets, mode=any) + ) + ] # if not, check the config return [ @@ -56,6 +63,25 @@ def get_mc_datasets(self, proc_obj: dict) -> list[str]: for dataset_inst in get_datasets_from_process(self.config_inst, proc_obj.config_process) ] + def get_data_datasets(self, cat_obj: dict) -> list[str]: + """ + Helper to find data datasets. + + :param cat_obj: category object from an InferenceModel + :return: List of dataset names corresponding to the category *cat_obj*. + """ + if not cat_obj.config_data_datasets: + return [] + + return [ + dataset.name + for dataset in self.config_inst.datasets + if ( + dataset.is_data and + law.util.multi_match(dataset.name, cat_obj.config_data_datasets, mode=any) + ) + ] + def workflow_requires(self): reqs = super().workflow_requires() @@ -74,9 +100,8 @@ def workflow_requires(self): if self.inference_model_inst.require_shapes_for_parameter(param_obj) ) - if cat_obj.config_data_datasets: - for dataset in cat_obj.config_data_datasets: - data_dataset_params[dataset]["variables"].add(cat_obj.config_variable) + for dataset in self.get_data_datasets(cat_obj): + data_dataset_params[dataset]["variables"].add(cat_obj.config_variable) # set workflow requirements per mc dataset reqs["merged_hists"] = set( @@ -130,7 +155,7 @@ def requires(self): branch=-1, workflow="local", ) - for dataset in cat_obj.config_data_datasets + for dataset in self.get_data_datasets(cat_obj) } return reqs diff --git a/modules/law b/modules/law index 5593f184f..673c2ac16 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 5593f184f16d3cba83f818b8051bb1c30b26f1b0 +Subproject commit 673c2ac16eb8da9304a6c749e557f9c42ad4d976 From 81ca127fd7a7946782a5a355d939ee26eaf5bd73 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 15 Aug 2024 10:32:04 +0200 Subject: [PATCH 20/61] Add skip_func to check to other csp's. --- columnflow/calibration/__init__.py | 1 + columnflow/production/__init__.py | 1 + columnflow/selection/__init__.py | 1 + 3 files changed, 3 insertions(+) diff --git a/columnflow/calibration/__init__.py b/columnflow/calibration/__init__.py index a92d1859a..22d793bf4 100644 --- a/columnflow/calibration/__init__.py +++ b/columnflow/calibration/__init__.py @@ -106,6 +106,7 @@ def update_cls_dict(cls_name, cls_dict, get_attr): "data_only, nominal_only or shifts_only are set", ) + if "skip_func" not in cls_dict: def skip_func(self): # check mc_only and data_only if getattr(self, "dataset_inst", None): diff --git a/columnflow/production/__init__.py b/columnflow/production/__init__.py index 5c6921d9b..00190e05b 100644 --- a/columnflow/production/__init__.py +++ b/columnflow/production/__init__.py @@ -106,6 +106,7 @@ def update_cls_dict(cls_name, cls_dict, get_attr): "data_only, nominal_only or shifts_only are set", ) + if "skip_func" not in cls_dict: def skip_func(self): # check mc_only and data_only if getattr(self, "dataset_inst", None): diff --git a/columnflow/selection/__init__.py b/columnflow/selection/__init__.py index aa544531f..214b17729 100644 --- a/columnflow/selection/__init__.py +++ b/columnflow/selection/__init__.py @@ -120,6 +120,7 @@ def update_cls_dict(cls_name, cls_dict, get_attr): "data_only, nominal_only or shifts_only are set", ) + if "skip_func" not in cls_dict: def skip_func(self): # check mc_only and data_only if getattr(self, "dataset_inst", None): From eaf3968071c2e4d39d2e6c3dd03cb2700097f5a1 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 15 Aug 2024 12:38:57 +0200 Subject: [PATCH 21/61] Format. --- columnflow/inference/cms/datacard.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index 1ce943428..81adc2475 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -164,7 +164,8 @@ def write( elif _param_obj.type != param_obj.type: raise ValueError( f"misconfigured parameter '{param_name}' with type '{_param_obj.type}' " - f"that was previously seen with incompatible type '{param_obj.type}'") + f"that was previously seen with incompatible type '{param_obj.type}'", + ) # get the effect effect = _param_obj.effect From 2f18bc9672956507ef6cddc7503899bdd95427b7 Mon Sep 17 00:00:00 2001 From: Philip Daniel Keicher Date: Thu, 15 Aug 2024 15:54:45 +0200 Subject: [PATCH 22/61] include CF_CERN_USER_FIRSTCHAR in remote bootstrap --- columnflow/tasks/framework/remote_bootstrap.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/columnflow/tasks/framework/remote_bootstrap.sh b/columnflow/tasks/framework/remote_bootstrap.sh index 6c9825d70..b8bea1f8e 100755 --- a/columnflow/tasks/framework/remote_bootstrap.sh +++ b/columnflow/tasks/framework/remote_bootstrap.sh @@ -11,6 +11,7 @@ bootstrap_htcondor_standalone() { export CF_ON_HTCONDOR="1" export CF_HTCONDOR_FLAVOR="{{cf_htcondor_flavor}}" export CF_CERN_USER="{{cf_cern_user}}" + export CF_CERN_USER_FIRSTCHAR="${CF_CERN_USER:0:1}" export CF_REPO_BASE="${LAW_JOB_HOME}/repo" export CF_DATA="${LAW_JOB_HOME}/cf_data" export CF_SOFTWARE_BASE="{{cf_software_base}}" @@ -161,6 +162,7 @@ bootstrap_crab() { export CF_ON_GRID="1" export CF_REMOTE_ENV="1" export CF_CERN_USER="{{cf_cern_user}}" + export CF_CERN_USER_FIRSTCHAR="${CF_CERN_USER:0:1}" export CF_REPO_BASE="${LAW_JOB_HOME}/repo" export CF_DATA="${LAW_JOB_HOME}/cf_data" export CF_SOFTWARE_BASE="${CF_DATA}/software" From b36963651b53046a4a739ba229a0325097f50a8a Mon Sep 17 00:00:00 2001 From: Mathis Frahm Date: Fri, 16 Aug 2024 08:48:37 +0200 Subject: [PATCH 23/61] update cms private work label to newest recommentations --- columnflow/plotting/plot_util.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/columnflow/plotting/plot_util.py b/columnflow/plotting/plot_util.py index 57201c738..276903bf6 100644 --- a/columnflow/plotting/plot_util.py +++ b/columnflow/plotting/plot_util.py @@ -31,12 +31,13 @@ label_options = { "wip": "Work in progress", "pre": "Preliminary", - "pw": "Private work", - "pwip": "Private work in progress", + "pw": "Private work (CMS data/simulation)", + "pwip": "Private work in progress (CMS)", "sim": "Simulation", "simwip": "Simulation work in progress", "simpre": "Simulation preliminary", - "simpw": "Simulation private work", + "simpw": "Private work (CMS simulation)", + "datapw": "Private work (CMS data)", "od": "OpenData", "odwip": "OpenData work in progress", "odpw": "OpenData private work", @@ -52,12 +53,15 @@ def get_cms_label(ax: plt.Axes, llabel: str) -> dict: :param llabel: The left label of the CMS label. :return: A dictionary with the CMS label configuration. """ + llabel = label_options.get(llabel, llabel) cms_label_kwargs = { "ax": ax, - "llabel": label_options.get(llabel, llabel), + "llabel": llabel, "fontsize": 22, "data": False, } + if "CMS" in llabel: + cms_label_kwargs["exp"] = "" return cms_label_kwargs From 4d0c728cef0ac68b78ef39e0d2c39e4bbccca71a Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Tue, 20 Aug 2024 17:22:16 +0200 Subject: [PATCH 24/61] remove over and underflow bin when writing datacards --- columnflow/inference/cms/datacard.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index 81adc2475..ff09ba27a 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -360,6 +360,17 @@ def write_shapes( # create the output file out_file = uproot.recreate(shapes_path) + # helper to remove underflow and overflow values + def remove_flow(h): + ax = h.axes[0] + view = h.view(flow=True) + if ax.traits.underflow: + view.value[0] = 0.0 + view.variance[0] = 0.0 + if ax.traits.overflow: + view.value[-1] = 0.0 + view.variance[-1] = 0.0 + # iterate through shapes for cat_name, hists in self.histograms.items(): cat_obj = self.inference_model_inst.get_category(cat_name) @@ -391,7 +402,9 @@ def fill_empty(h): h_nom = _hists["nominal"].copy() * scale fill_empty(h_nom) nom_name = nom_pattern.format(category=cat_name, process=proc_name) + remove_flow(h_nom) out_file[nom_name] = h_nom + _rates[proc_name] = h_nom.sum().value # helper to return the two variations @@ -479,6 +492,8 @@ def get_shapes(param_name): parameter=param_obj.name, direction="Up", ) + remove_flow(h_down) + remove_flow(h_up) out_file[down_name] = h_down out_file[up_name] = h_up From 67b383b09f7de4faa105416c397e32e385b392bb Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Wed, 21 Aug 2024 15:36:27 +0200 Subject: [PATCH 25/61] add new control Flag for handling of flow bins --- columnflow/inference/__init__.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/columnflow/inference/__init__.py b/columnflow/inference/__init__.py index db04b1e1a..73d450ab5 100644 --- a/columnflow/inference/__init__.py +++ b/columnflow/inference/__init__.py @@ -105,6 +105,20 @@ def any_from_rate(self: ParameterTransformations) -> bool: return any(t.from_rate for t in self) +class FlowStrategy(enum.Enum): + """ + Strategy to handle over- and underflow bin contents. + """ + + ignore = "ignore" + warn = "warn" + remove = "remove" + move = "move" + + def __str__(self) -> str: + return self.value + + class InferenceModel(Derivable): """ Interface to statistical inference models with connections to config objects (such as @@ -121,6 +135,7 @@ class InferenceModel(Derivable): config_variable: ht config_data_datasets: [data_mu_a] data_from_processes: [] + flow_strategy: warn mc_stats: 10 processes: - name: HH @@ -215,6 +230,7 @@ def __init__(self, *args, **kwargs) -> None: self.add_representer(ParameterType, self._str_repr) self.add_representer(ParameterTransformation, self._str_repr) self.add_representer(ParameterTransformations, self._list_repr) + self.add_representer(FlowStrategy, self._str_repr) def ignore_aliases(self, *args, **kwargs) -> bool: return True @@ -266,6 +282,7 @@ def category_spec( config_variable: str | None = None, config_data_datasets: Sequence[str] | None = None, data_from_processes: Sequence[str] | None = None, + flow_strategy: FlowStrategy | str = FlowStrategy.warn, mc_stats: int | float | tuple | None = None, empty_bin_value: float = 1e-5, ) -> DotDict: @@ -280,6 +297,8 @@ def category_spec( real data. - *data_from_processes*: Optional list of names of :py:meth:`process_spec` objects that, when *config_data_datasets* is not defined, make of a fake data contribution. + - *flow_strategy*: A :py:class:`FlowStrategy` instance describing the strategy to handle + over- and underflow bin contents. - *mc_stats*: Either *None* to disable MC stat uncertainties, or an integer, a float or a tuple of thereof to control the options of MC stat options. - *empty_bin_value*: When bins are no content, they are filled with this value. @@ -290,6 +309,11 @@ def category_spec( ("config_variable", str(config_variable) if config_variable else None), ("config_data_datasets", list(map(str, config_data_datasets or []))), ("data_from_processes", list(map(str, data_from_processes or []))), + ("flow_strategy", ( + flow_strategy if + isinstance(flow_strategy, FlowStrategy) + else FlowStrategy[flow_strategy]) + ), ("mc_stats", mc_stats), ("empty_bin_value", empty_bin_value), ("processes", []), From efdfa76ad525b002d0d159c65df0ae3542d5afaf Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Wed, 21 Aug 2024 15:39:35 +0200 Subject: [PATCH 26/61] added flag (with 4 options) to handle under and overflow bins. Options are: warn about flows, ignore flows, move flows to edge bins or remove flow completly. Also applied flow handling on data_obs. --- columnflow/inference/cms/datacard.py | 79 ++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 21 deletions(-) diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index ff09ba27a..111fec960 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -13,7 +13,9 @@ from columnflow import __version__ as cf_version from columnflow.types import Sequence, Any -from columnflow.inference import InferenceModel, ParameterType, ParameterTransformation +from columnflow.inference import ( + InferenceModel, ParameterType, ParameterTransformation, FlowStrategy, +) from columnflow.util import DotDict, maybe_import, real_path, ensure_dir, safe_div np = maybe_import("np") @@ -361,30 +363,65 @@ def write_shapes( out_file = uproot.recreate(shapes_path) # helper to remove underflow and overflow values - def remove_flow(h): + def handle_flow(cat_obj, h, name): + # stop early if flow is ignored altogether + if cat_obj.flow_strategy == FlowStrategy.ignore: + return + + # get objects and flow contents ax = h.axes[0] view = h.view(flow=True) - if ax.traits.underflow: + underflow = (view.value[0], view.variance[0]) if ax.traits.underflow else (0.0, 0.0) + overflow = (view.value[-1], view.variance[-1]) if ax.traits.overflow else (0.0, 0.0) + + # nothing to do if flow bins are emoty + if not underflow[0] and not overflow[0]: + return + + # warn in case of flow content + if cat_obj.flow_strategy == FlowStrategy.warn: + if underflow[0]: + logger.warning( + f"underflow content detected in category '{cat_obj.name}' for histogram " + f"'{name}' ({underflow[0] / view.value.sum() * 100:.1f}% of integral)", + ) + if overflow[0]: + logger.warning( + f"overflow content detected in category '{cat_obj.name}' for histogram " + f"'{name}' ({overflow[0] / view.value.sum() * 100:.1f}% of integral)", + ) + return + + # here, we can already remove overflow values + if underflow[0]: view.value[0] = 0.0 view.variance[0] = 0.0 - if ax.traits.overflow: + if overflow[0]: view.value[-1] = 0.0 view.variance[-1] = 0.0 + # finally handle move + if cat_obj.flow_strategy == FlowStrategy.move: + if underflow[0]: + view.value[1] += underflow[0] + view.variance[1] += underflow[1] + if overflow[0]: + view.value[-2] += overflow[0] + view.variance[-2] += overflow[1] + + # helper to fill empty bins in-place + def fill_empty(cat_obj, h): + if not cat_obj.empty_bin_value: + return + value = h.view().value + mask = value <= 0 + value[mask] = cat_obj.empty_bin_value + h.view().variance[mask] = cat_obj.empty_bin_value + # iterate through shapes for cat_name, hists in self.histograms.items(): cat_obj = self.inference_model_inst.get_category(cat_name) - # helper to fill empty bins in-place - if cat_obj.empty_bin_value: - def fill_empty(h): - value = h.view().value - mask = value <= 0 - value[mask] = cat_obj.empty_bin_value - h.view().variance[mask] = cat_obj.empty_bin_value - else: - fill_empty = lambda h: None - _rates = rates[cat_name] = OrderedDict() _effects = effects[cat_name] = OrderedDict() for proc_name, _hists in hists.items(): @@ -400,11 +437,10 @@ def fill_empty(h): # nominal shape h_nom = _hists["nominal"].copy() * scale - fill_empty(h_nom) + fill_empty(cat_obj, h_nom) nom_name = nom_pattern.format(category=cat_name, process=proc_name) - remove_flow(h_nom) + handle_flow(cat_obj, h_nom, nom_name) out_file[nom_name] = h_nom - _rates[proc_name] = h_nom.sum().value # helper to return the two variations @@ -475,8 +511,8 @@ def get_shapes(param_name): continue # empty bins are always filled - fill_empty(h_down) - fill_empty(h_up) + fill_empty(cat_obj, h_down) + fill_empty(cat_obj, h_up) # save them when they represent real shapes if param_obj.type.is_shape: @@ -492,8 +528,8 @@ def get_shapes(param_name): parameter=param_obj.name, direction="Up", ) - remove_flow(h_down) - remove_flow(h_up) + handle_flow(cat_obj, h_down, down_name) + handle_flow(cat_obj, h_up, up_name) out_file[down_name] = h_down out_file[up_name] = h_up @@ -515,6 +551,7 @@ def get_shapes(param_name): # simply save the data histogram h_data = hists["data"]["nominal"].copy() data_name = data_pattern.format(category=cat_name) + handle_flow(cat_obj, h_data, data_name) out_file[data_name] = h_data _rates["data"] = h_data.sum().value From a4751bfda940c75f703a5114ff0a23ff8bcc3c4e Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Wed, 21 Aug 2024 16:42:45 +0200 Subject: [PATCH 27/61] linting --- columnflow/inference/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/columnflow/inference/__init__.py b/columnflow/inference/__init__.py index 73d450ab5..5d2a1f475 100644 --- a/columnflow/inference/__init__.py +++ b/columnflow/inference/__init__.py @@ -310,10 +310,9 @@ def category_spec( ("config_data_datasets", list(map(str, config_data_datasets or []))), ("data_from_processes", list(map(str, data_from_processes or []))), ("flow_strategy", ( - flow_strategy if - isinstance(flow_strategy, FlowStrategy) - else FlowStrategy[flow_strategy]) - ), + flow_strategy + if isinstance(flow_strategy, FlowStrategy) + else FlowStrategy[flow_strategy])), ("mc_stats", mc_stats), ("empty_bin_value", empty_bin_value), ("processes", []), From 196bf96ed73bd28825de9ce5bddd7e9c996104b0 Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Wed, 21 Aug 2024 16:42:45 +0200 Subject: [PATCH 28/61] linting, update comment for helper function handling application of flowstrategies --- columnflow/inference/__init__.py | 7 +++---- columnflow/inference/cms/datacard.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/columnflow/inference/__init__.py b/columnflow/inference/__init__.py index 73d450ab5..5d2a1f475 100644 --- a/columnflow/inference/__init__.py +++ b/columnflow/inference/__init__.py @@ -310,10 +310,9 @@ def category_spec( ("config_data_datasets", list(map(str, config_data_datasets or []))), ("data_from_processes", list(map(str, data_from_processes or []))), ("flow_strategy", ( - flow_strategy if - isinstance(flow_strategy, FlowStrategy) - else FlowStrategy[flow_strategy]) - ), + flow_strategy + if isinstance(flow_strategy, FlowStrategy) + else FlowStrategy[flow_strategy])), ("mc_stats", mc_stats), ("empty_bin_value", empty_bin_value), ("processes", []), diff --git a/columnflow/inference/cms/datacard.py b/columnflow/inference/cms/datacard.py index 111fec960..241834b69 100644 --- a/columnflow/inference/cms/datacard.py +++ b/columnflow/inference/cms/datacard.py @@ -362,7 +362,7 @@ def write_shapes( # create the output file out_file = uproot.recreate(shapes_path) - # helper to remove underflow and overflow values + # helper to handle and apply flow strategy to histogram def handle_flow(cat_obj, h, name): # stop early if flow is ignored altogether if cat_obj.flow_strategy == FlowStrategy.ignore: From e98ff70bdca75ab86f8a74f25c6ab925a0899961 Mon Sep 17 00:00:00 2001 From: Marcel Rieger Date: Thu, 22 Aug 2024 14:08:18 +0200 Subject: [PATCH 29/61] Style. --- columnflow/inference/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/columnflow/inference/__init__.py b/columnflow/inference/__init__.py index 5d2a1f475..7926a9f78 100644 --- a/columnflow/inference/__init__.py +++ b/columnflow/inference/__init__.py @@ -312,7 +312,8 @@ def category_spec( ("flow_strategy", ( flow_strategy if isinstance(flow_strategy, FlowStrategy) - else FlowStrategy[flow_strategy])), + else FlowStrategy[flow_strategy] + )), ("mc_stats", mc_stats), ("empty_bin_value", empty_bin_value), ("processes", []), From adae3f726025ceaa4b45e029ada2910eb39bb894 Mon Sep 17 00:00:00 2001 From: haddadanas Date: Thu, 22 Aug 2024 16:50:05 +0200 Subject: [PATCH 30/61] Round the value for Lumi for 2d Plots --- columnflow/plotting/plot_functions_2d.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/columnflow/plotting/plot_functions_2d.py b/columnflow/plotting/plot_functions_2d.py index de2aa0bb5..65fda58de 100644 --- a/columnflow/plotting/plot_functions_2d.py +++ b/columnflow/plotting/plot_functions_2d.py @@ -163,7 +163,7 @@ def plot_2d( "loc": "upper right", }, "cms_label_cfg": { - "lumi": config_inst.x.luminosity.get("nominal") / 1000, # pb -> fb + "lumi": round(0.001 * config_inst.x.luminosity.get("nominal"), 2), # /pb -> /fb }, "plot2d_cfg": { "norm": cbar_norm, From 0ed154ac15318699cdc92042b94116e1ad26da86 Mon Sep 17 00:00:00 2001 From: Mathis Frahm Date: Tue, 10 Sep 2024 14:11:18 +0200 Subject: [PATCH 31/61] remove unnecessary job submission when setting MergeReductionStats.n_inputs=-1 --- columnflow/tasks/reduction.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/columnflow/tasks/reduction.py b/columnflow/tasks/reduction.py index 2a120cf66..dca0ae49d 100644 --- a/columnflow/tasks/reduction.py +++ b/columnflow/tasks/reduction.py @@ -399,6 +399,8 @@ class MergeReducedEvents( f"removed after successful merging; default: {default_keep_reduced_events}", ) + max_merge_factor = 50 + sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox")) # upstream requirements @@ -415,7 +417,7 @@ def merge_factor(self) -> int: Required by law.tasks.ForestMerge. """ # return as many inputs as leafs are present to create the output of this tree, capped at 50 - return min(self.file_merging, 50) + return min(self.file_merging, self.max_merge_factor) def is_sandboxed(self): # when the task is a merge forest, consider it sandboxed @@ -443,7 +445,7 @@ def create_branch_map(self): def merge_workflow_requires(self): return { "stats": self.reqs.MergeReductionStats.req(self), - "events": self.reqs.ReduceEvents.req(self, _exclude={"branches"}), + "events": self.reqs.ReduceEvents.req_different_branching(self, branches=((0, -1),)), } def merge_requires(self, start_branch, end_branch): From 86ae927bd4eb50ca758a5fe3cdaee567d52e3f35 Mon Sep 17 00:00:00 2001 From: Anas <103462379+haddadanas@users.noreply.github.com> Date: Mon, 7 Oct 2024 10:16:41 +0200 Subject: [PATCH 32/61] syntax and linting fixes in the docs --- docs/user_guide/ml.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/user_guide/ml.md b/docs/user_guide/ml.md index a55c90ba1..56780c988 100644 --- a/docs/user_guide/ml.md +++ b/docs/user_guide/ml.md @@ -354,8 +354,8 @@ class MyModel(MLModel): # store parameters of interest in the ml_model_inst, e.g. via the parameters attribute self.parameters = { "batchsize": int(self.parameters.get("batchsize", 1024)), - "layers": tuple(int(layer) for layer in self.parameters.get("layers, (64, 64, 64)), - "ml_process_weights": ml_process_weights + "layers": tuple(int(layer) for layer in self.parameters.get("layers", (64, 64, 64))), + "ml_process_weights": ml_process_weights, } # create representation of ml_model_inst From dff7a31be45c2f00e85b79b561ae23fa2aecba22 Mon Sep 17 00:00:00 2001 From: Mathis Frahm Date: Mon, 14 Oct 2024 13:07:49 +0200 Subject: [PATCH 33/61] filter dataset lfns based on broken_files aux --- columnflow/tasks/external.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index d631a5a2b..26b834fbb 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -142,10 +142,12 @@ def get_dataset_lfns_dasgoclient( if code != 0: raise Exception(f"dasgoclient query failed:\n{out}") + broken_files = dataset_inst[shift_inst.name].get_aux("broken_files", []) + return [ line.strip() for line in out.strip().split("\n") - if line.strip().endswith(".root") + if line.strip().endswith(".root") and line.strip() not in broken_files ] def iter_nano_files( From 9a7f02f282ff3edc69747084531192b794282ff4 Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Wed, 23 Oct 2024 15:24:32 +0200 Subject: [PATCH 34/61] After applying the fix on mirrored targets in law, no parquets files could be found by union task. The fix was to swap path with abspath so that paths are resolved correctly. --- columnflow/tasks/union.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/columnflow/tasks/union.py b/columnflow/tasks/union.py index a8c2ef732..ff74546c3 100644 --- a/columnflow/tasks/union.py +++ b/columnflow/tasks/union.py @@ -122,11 +122,11 @@ def run(self): read_columns = {Route(c) for c in read_columns} # iterate over chunks of events and diffs - files = [inputs["events"]["events"].path] + files = [inputs["events"]["events"].abspath] if self.producer_insts: - files.extend([inp["columns"].path for inp in inputs["producers"]]) + files.extend([inp["columns"].abspath for inp in inputs["producers"]]) if self.ml_model_insts: - files.extend([inp["mlcolumns"].path for inp in inputs["ml"]]) + files.extend([inp["mlcolumns"].abspath for inp in inputs["ml"]]) for (events, *columns), pos in self.iter_chunked_io( files, source_type=len(files) * ["awkward_parquet"], @@ -150,9 +150,9 @@ def run(self): chunk = tmp_dir.child(f"file_{pos.index}.{self.file_type}", type="f") output_chunks[pos.index] = chunk if self.file_type == "parquet": - self.chunked_io.queue(sorted_ak_to_parquet, (events, chunk.path)) + self.chunked_io.queue(sorted_ak_to_parquet, (events, chunk.abspath)) else: # root - self.chunked_io.queue(sorted_ak_to_root, (events, chunk.path)) + self.chunked_io.queue(sorted_ak_to_root, (events, chunk.abspath)) # merge output files sorted_chunks = [output_chunks[key] for key in sorted(output_chunks)] From d1749f39dde9df83679b9d4f7d1538adb2c9d02b Mon Sep 17 00:00:00 2001 From: Bogdan Wiederspan Date: Wed, 23 Oct 2024 17:29:27 +0200 Subject: [PATCH 35/61] updated law --- modules/law | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/law b/modules/law index 673c2ac16..7b27f589f 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 673c2ac16eb8da9304a6c749e557f9c42ad4d976 +Subproject commit 7b27f589f47ce642b5c4145fb855d88075ef5704 From 633e309be76fac75700deb5dc003c1081ff3b964 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 24 Oct 2024 12:32:26 +0200 Subject: [PATCH 36/61] Hotfix typo in cmsse setup. --- sandboxes/_setup_cmssw.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sandboxes/_setup_cmssw.sh b/sandboxes/_setup_cmssw.sh index 3f40d7764..624fb89e1 100644 --- a/sandboxes/_setup_cmssw.sh +++ b/sandboxes/_setup_cmssw.sh @@ -333,7 +333,7 @@ setup_cmssw() { # prepend persistent path fragments again to ensure priority for local packages and # remove the conda based python fragments since there are too many overlaps between packages - export PYTHONPATH="${CF_PERSISTENT_PATH}:$( echo ${PYTHONPATH} | sed "s|${CF_CONDA_PYTHONPATH}||g" )" + export PYTHONPATH="${CF_PERSISTENT_PYTHONPATH}:$( echo ${PYTHONPATH} | sed "s|${CF_CONDA_PYTHONPATH}||g" )" export PATH="${CF_PERSISTENT_PATH}:${PATH}" # mark this as a bash sandbox for law From 57b9512ca68413846b3912dc15ab9edd84e53f90 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 24 Oct 2024 12:38:27 +0200 Subject: [PATCH 37/61] Consistent use of .abspath in tasks. --- columnflow/tasks/cms/inference.py | 2 +- columnflow/tasks/cutflow.py | 2 +- columnflow/tasks/external.py | 2 +- columnflow/tasks/framework/base.py | 4 ++-- columnflow/tasks/framework/decorators.py | 4 ++-- columnflow/tasks/histograms.py | 2 +- columnflow/tasks/ml.py | 10 +++++----- columnflow/tasks/production.py | 2 +- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/columnflow/tasks/cms/inference.py b/columnflow/tasks/cms/inference.py index 502987363..9386a47f6 100644 --- a/columnflow/tasks/cms/inference.py +++ b/columnflow/tasks/cms/inference.py @@ -265,7 +265,7 @@ def run(self): outputs = self.output() writer = DatacardWriter(self.inference_model_inst, {cat_obj.name: hists}) with outputs["card"].localize("w") as tmp_card, outputs["shapes"].localize("w") as tmp_shapes: - writer.write(tmp_card.path, tmp_shapes.path, shapes_path_ref=outputs["shapes"].basename) + writer.write(tmp_card.abspath, tmp_shapes.abspath, shapes_path_ref=outputs["shapes"].basename) CreateDatacardsWrapper = wrapper_factory( diff --git a/columnflow/tasks/cutflow.py b/columnflow/tasks/cutflow.py index 7b89472d5..7eb098175 100644 --- a/columnflow/tasks/cutflow.py +++ b/columnflow/tasks/cutflow.py @@ -160,7 +160,7 @@ def prepare_hists(steps): histograms[var_key] = h.Weight() for arr, pos in self.iter_chunked_io( - inputs["selection"]["masks"].path, + inputs["selection"]["masks"].abspath, source_type="awkward_parquet", read_columns=load_columns, ): diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index 26b834fbb..32c634a6d 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -426,7 +426,7 @@ def run(self): # helper function to fetch generic files def fetch_file(src, counter=[0]): - dst = os.path.join(tmp_dir.path, self.create_unique_basename(src)) + dst = os.path.join(tmp_dir.abspath, self.create_unique_basename(src)) src = src[0] if isinstance(src, tuple) else src if src.startswith(("http://", "https://")): # download via wget diff --git a/columnflow/tasks/framework/base.py b/columnflow/tasks/framework/base.py index bbda49a88..8cbc7e804 100644 --- a/columnflow/tasks/framework/base.py +++ b/columnflow/tasks/framework/base.py @@ -887,7 +887,7 @@ def target(self, *path, **kwargs): else law.MirroredDirectoryTarget ) return mirrored_target_cls( - path=local_target.path, + path=local_target.abspath, remote_target=wlcg_target, local_target=local_target, ) @@ -1390,7 +1390,7 @@ def run_command(self, cmd, optional=False, **kwargs): if "cwd" not in kwargs and self.run_command_in_tmp: tmp_dir = law.LocalDirectoryTarget(is_tmp=True) tmp_dir.touch() - kwargs["cwd"] = tmp_dir.path + kwargs["cwd"] = tmp_dir.abspath self.publish_message("cwd: {}".format(kwargs.get("cwd", os.getcwd()))) # call it diff --git a/columnflow/tasks/framework/decorators.py b/columnflow/tasks/framework/decorators.py index 8e6fc4ed0..c2b7e2152 100644 --- a/columnflow/tasks/framework/decorators.py +++ b/columnflow/tasks/framework/decorators.py @@ -72,10 +72,10 @@ def after_call(state: Any) -> None: continue if output.path.endswith((".pdf", ".png")): if not isinstance(output, law.LocalTarget): - task.logger.warning(f"cannot show non-local plot at '{output.path}'") + task.logger.warning(f"cannot show non-local plot at '{output.abspath}'") continue elif output.path not in view_paths: - view_paths.append(output.path) + view_paths.append(output.abspath) # loop through paths and view them for path in view_paths: diff --git a/columnflow/tasks/histograms.py b/columnflow/tasks/histograms.py index c392282f9..fb2907867 100644 --- a/columnflow/tasks/histograms.py +++ b/columnflow/tasks/histograms.py @@ -172,7 +172,7 @@ def run(self): mode="r", ) as inps: for (events, *columns), pos in self.iter_chunked_io( - [inp.path for inp in inps], + [inp.abspath for inp in inps], source_type=len(file_targets) * ["awkward_parquet"] + [None] * len(reader_targets), read_columns=(len(file_targets) + len(reader_targets)) * [read_columns], chunk_size=self.weight_producer_inst.get_min_chunk_size(), diff --git a/columnflow/tasks/ml.py b/columnflow/tasks/ml.py index cd114c621..df71a2c67 100644 --- a/columnflow/tasks/ml.py +++ b/columnflow/tasks/ml.py @@ -204,7 +204,7 @@ def run(self): mode="r", ) as inps: for (events, *columns), pos in self.iter_chunked_io( - [inp.path for inp in inps], + [inp.abspath for inp in inps], source_type=len(files) * ["awkward_parquet"] + [None] * len(reader_targets), read_columns=(len(files) + len(reader_targets)) * [read_columns], ): @@ -253,7 +253,7 @@ def run(self): # save as parquet via a thread in the same pool chunk = tmp_dir.child(f"file_{f}_{pos.index}.parquet", type="f") output_chunks[f][pos.index] = chunk - self.chunked_io.queue(sorted_ak_to_parquet, (fold_events, chunk.path)) + self.chunked_io.queue(sorted_ak_to_parquet, (fold_events, chunk.abspath)) # merge output files of all folds for _output_chunks, output in zip(output_chunks, outputs["mlevents"].targets): @@ -778,7 +778,7 @@ def run(self): mode="r", ) as inps: for (events, *columns), pos in self.iter_chunked_io( - [inp.path for inp in inps], + [inp.abspath for inp in inps], source_type=len(file_targets) * ["awkward_parquet"] + [None] * len(reader_targets), read_columns=(len(file_targets) + len(reader_targets)) * [read_columns], ): @@ -828,7 +828,7 @@ def run(self): # save as parquet via a thread in the same pool chunk = tmp_dir.child(f"file_{pos.index}.parquet", type="f") output_chunks[pos.index] = chunk - self.chunked_io.queue(sorted_ak_to_parquet, (events, chunk.path)) + self.chunked_io.queue(sorted_ak_to_parquet, (events, chunk.abspath)) # merge output files sorted_chunks = [output_chunks[key] for key in sorted(output_chunks)] @@ -1033,7 +1033,7 @@ def prepare_inputs(self: PlotMLResultsBase) -> dict[str, ak.Array]: "which is not implemented yet.", ) - events = ak.from_parquet(inp["mlcolumns"].path) + events = ak.from_parquet(inp["mlcolumns"].abspath) # masking with leaf categories category_mask = False diff --git a/columnflow/tasks/production.py b/columnflow/tasks/production.py index 22657f71b..4e785fd78 100644 --- a/columnflow/tasks/production.py +++ b/columnflow/tasks/production.py @@ -144,7 +144,7 @@ def run(self): # save as parquet via a thread in the same pool chunk = tmp_dir.child(f"file_{pos.index}.parquet", type="f") output_chunks[pos.index] = chunk - self.chunked_io.queue(sorted_ak_to_parquet, (events, chunk.path)) + self.chunked_io.queue(sorted_ak_to_parquet, (events, chunk.abspath)) # merge output files sorted_chunks = [output_chunks[key] for key in sorted(output_chunks)] From b67645853b20f55d6199bf19dea528ac0893ced5 Mon Sep 17 00:00:00 2001 From: Marcel Rieger Date: Thu, 24 Oct 2024 13:13:57 +0200 Subject: [PATCH 38/61] Hotfix user package encapsulation. --- setup.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/setup.sh b/setup.sh index 6562bf2a5..68ca978f5 100644 --- a/setup.sh +++ b/setup.sh @@ -133,7 +133,10 @@ setup_columnflow() { # PYTHONPATH # Ammended PYTHONPATH variable. # PYTHONWARNINGS - # Set to "ignore". + # Set to "ignore" when not defined already. + # PYTHONNOUSERSITE + # Set to "1" when not defined alreedy, to prevent python from loading packages from e.g. + # "$HOME/.local", which can lead to encapsulation and debugging issues. # GLOBUS_THREAD_MODEL # Set to "none". # VIRTUAL_ENV_DISABLE_PROMPT @@ -591,6 +594,7 @@ cf_setup_software_stack() { export MAMBA_ROOT_PREFIX="${CF_CONDA_BASE}" export MAMBA_EXE="${MAMBA_ROOT_PREFIX}/bin/micromamba" export PYTHONWARNINGS="${PYTHONWARNINGS:-ignore}" + export PYTHONNOUSERSITE="${PYTHONNOUSERSITE:-1}" export GLOBUS_THREAD_MODEL="${GLOBUS_THREAD_MODEL:-none}" export VIRTUAL_ENV_DISABLE_PROMPT="${VIRTUAL_ENV_DISABLE_PROMPT:-1}" export X509_CERT_DIR="${X509_CERT_DIR:-/cvmfs/grid.cern.ch/etc/grid-security/certificates}" From 0652ad1eb169deb80f32a26466df30ecbfda5230 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 25 Oct 2024 08:48:57 +0200 Subject: [PATCH 39/61] Hotfix stray path access. --- columnflow/tasks/production.py | 2 +- modules/law | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/columnflow/tasks/production.py b/columnflow/tasks/production.py index 4e785fd78..6790ec213 100644 --- a/columnflow/tasks/production.py +++ b/columnflow/tasks/production.py @@ -110,7 +110,7 @@ def run(self): ) as inps: # iterate over chunks of events and diffs for (events, *cols), pos in self.iter_chunked_io( - [inp.path for inp in inps], + [inp.abspath for inp in inps], source_type=["awkward_parquet"] + [None] * n_ext, read_columns=[read_columns] * (1 + n_ext), chunk_size=self.producer_inst.get_min_chunk_size(), diff --git a/modules/law b/modules/law index 7b27f589f..fb914c414 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 7b27f589f47ce642b5c4145fb855d88075ef5704 +Subproject commit fb914c414867edc808dcbe98ae31276ecf4b55f9 From b3db9ec0648c9ce0a8cdf39bb89853433af84e0d Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 28 Oct 2024 08:26:02 +0100 Subject: [PATCH 40/61] Update upstream law. --- columnflow/tasks/framework/base.py | 9 +++++---- modules/law | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/columnflow/tasks/framework/base.py b/columnflow/tasks/framework/base.py index 8cbc7e804..43002d099 100644 --- a/columnflow/tasks/framework/base.py +++ b/columnflow/tasks/framework/base.py @@ -871,15 +871,16 @@ def target(self, *path, **kwargs): # get other options loc, wlcg_fs, store_parts_modifier = (location[1:] + [None, None, None])[:3] kwargs.setdefault("store_parts_modifier", store_parts_modifier) + # create the wlcg target + wlcg_kwargs = kwargs.copy() + wlcg_kwargs.setdefault("fs", wlcg_fs) + wlcg_target = self.wlcg_target(*path, **wlcg_kwargs) + # TODO: add rule for falling back to wlcg target # create the local target local_kwargs = kwargs.copy() loc_key = "fs" if (loc and law.config.has_section(loc)) else "store" local_kwargs.setdefault(loc_key, loc) local_target = self.local_target(*path, **local_kwargs) - # create the wlcg target - wlcg_kwargs = kwargs.copy() - wlcg_kwargs.setdefault("fs", wlcg_fs) - wlcg_target = self.wlcg_target(*path, **wlcg_kwargs) # build the mirrored target from these two mirrored_target_cls = ( law.MirroredFileTarget diff --git a/modules/law b/modules/law index fb914c414..7cc152c7c 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit fb914c414867edc808dcbe98ae31276ecf4b55f9 +Subproject commit 7cc152c7ccf496f10c3374bb48d59c90a352bb14 From d67acd24352d8eb5d13d32d79e3cc776ea5eb686 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 28 Oct 2024 13:44:37 +0100 Subject: [PATCH 41/61] Update upstream law. --- modules/law | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/law b/modules/law index 7cc152c7c..0aae7e6ff 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 7cc152c7ccf496f10c3374bb48d59c90a352bb14 +Subproject commit 0aae7e6ffb4aea537a2f7e76ebdd66936a08c959 From fcbf85d0af3299b0af69a0db37076fcef965e7fc Mon Sep 17 00:00:00 2001 From: Anas Haddad Date: Wed, 30 Oct 2024 18:16:09 +0100 Subject: [PATCH 42/61] ensure proper type conversion to float of the weights before summing --- columnflow/selection/stats.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/columnflow/selection/stats.py b/columnflow/selection/stats.py index 7f296fac6..705a06b65 100644 --- a/columnflow/selection/stats.py +++ b/columnflow/selection/stats.py @@ -148,15 +148,15 @@ def increment_stats( f"but found a sequence: {obj}", ) if len(obj) == 1: - weights = obj[0] + weights = ak.values_astype(obj[0], float) elif len(obj) == 2: - weights, weight_mask = obj + weights, weight_mask = ak.values_astype(obj[0], float), obj[1] else: raise Exception(f"cannot interpret as weights and optional mask: '{obj}'") elif op == self.NUM: weight_mask = obj else: # SUM - weights = obj + weights = ak.values_astype(obj, float) # when mask is an Ellipsis, it cannot be AND joined to other masks, so convert to true mask if weight_mask is Ellipsis: From 5369de3b2f3ac12563e3573a222164349c8497c9 Mon Sep 17 00:00:00 2001 From: Anas Haddad Date: Wed, 30 Oct 2024 18:16:43 +0100 Subject: [PATCH 43/61] added inclusive normalization weights to simulate unstitched datasets --- columnflow/production/normalization.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/columnflow/production/normalization.py b/columnflow/production/normalization.py index b666fc562..3af29ecb8 100644 --- a/columnflow/production/normalization.py +++ b/columnflow/production/normalization.py @@ -197,6 +197,10 @@ def normalization_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Arra norm_weight = events.mc_weight * process_weight events = set_ak_column(events, self.weight_name, norm_weight, value_type=np.float32) + # If we are stitching, we also compute the inclusive weight for debugging purposes + if self.allow_stitching: + incl_norm_weight = events.mc_weight * self.inclusive_weight + events = set_ak_column(events, f"{self.weight_name}_incl", incl_norm_weight, value_type=np.float32) return events @@ -310,6 +314,11 @@ def normalization_weights_setup( f"{inclusive_dataset}", ) inclusive_xsec = inclusive_proc.get_xsec(self.config_inst.campaign.ecm).nominal + self.inclusive_weight = ( + lumi * inclusive_xsec / normalization_selection_stats[inclusive_dataset.name]["sum_mc_weight"] + if self.dataset_inst == inclusive_dataset + else 0 + ) for process_id, br in branching_ratios.items(): sum_weights = merged_selection_stats["sum_mc_weight_per_process"][str(process_id)] process_weight_table[0, process_id] = lumi * inclusive_xsec * br / sum_weights @@ -331,6 +340,8 @@ def normalization_weights_init(self: Producer) -> None: Initializes the normalization weights producer by setting up the normalization weight column. """ self.produces.add(self.weight_name) + if self.allow_stitching: + self.produces.add(f"{self.weight_name}_incl") stitched_normalization_weights = normalization_weights.derive( From bf150ebaeade869a2694f2f322837bbe796cbc95 Mon Sep 17 00:00:00 2001 From: Anas <103462379+haddadanas@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:43:37 +0100 Subject: [PATCH 44/61] convert to np.float64 --- columnflow/selection/stats.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/columnflow/selection/stats.py b/columnflow/selection/stats.py index 705a06b65..b4856bc11 100644 --- a/columnflow/selection/stats.py +++ b/columnflow/selection/stats.py @@ -148,15 +148,15 @@ def increment_stats( f"but found a sequence: {obj}", ) if len(obj) == 1: - weights = ak.values_astype(obj[0], float) + weights = ak.values_astype(obj[0], np.float64) elif len(obj) == 2: - weights, weight_mask = ak.values_astype(obj[0], float), obj[1] + weights, weight_mask = ak.values_astype(obj[0], np.float64), obj[1] else: raise Exception(f"cannot interpret as weights and optional mask: '{obj}'") elif op == self.NUM: weight_mask = obj else: # SUM - weights = ak.values_astype(obj, float) + weights = ak.values_astype(obj, np.float64) # when mask is an Ellipsis, it cannot be AND joined to other masks, so convert to true mask if weight_mask is Ellipsis: From 05e9227addd3abf676bc2b643867eb2b893ed565 Mon Sep 17 00:00:00 2001 From: Anas Haddad Date: Wed, 6 Nov 2024 14:56:35 +0100 Subject: [PATCH 45/61] implemented Marcel's suggestions --- columnflow/production/normalization.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/columnflow/production/normalization.py b/columnflow/production/normalization.py index 3af29ecb8..343bc359d 100644 --- a/columnflow/production/normalization.py +++ b/columnflow/production/normalization.py @@ -176,7 +176,12 @@ def normalization_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Arra attribute. When py:attr`allow_stitching` is set to True, the sum of event weights is computed for all datasets with a leaf process contained in the leaf processes of the py:attr:`dataset_inst`. For stitching, the process_id needs to be reconstructed for each leaf - process on a per event basis. + process on a per event basis. Moreover, when stitching is enabled, the an additional normalization + weight is computed for the inclusive dataset only and stored in a column named + py:attr:`weight_name`_inclusive_only. This weight resembles the normalization weight for the + inclusive dataset, as if it were unstitched and should therefore only be applied, when using the + inclusive dataset as a standalone dataset. + """ # read the process id column process_id = np.asarray(events.process_id) @@ -198,9 +203,9 @@ def normalization_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Arra events = set_ak_column(events, self.weight_name, norm_weight, value_type=np.float32) # If we are stitching, we also compute the inclusive weight for debugging purposes - if self.allow_stitching: + if self.allow_stitching and self.dataset_inst == self.get_inclusive_dataset(): incl_norm_weight = events.mc_weight * self.inclusive_weight - events = set_ak_column(events, f"{self.weight_name}_incl", incl_norm_weight, value_type=np.float32) + events = set_ak_column(events, f"{self.weight_name}_inclusive_only", incl_norm_weight, value_type=np.float32) return events @@ -210,11 +215,6 @@ def normalization_weights_requires(self: Producer, reqs: dict) -> None: Adds the requirements needed by the underlying py:attr:`task` to access selection stats into *reqs*. """ - if self.allow_stitching: - self.stitching_datasets = self.get_stitching_datasets() - else: - self.stitching_datasets = [self.dataset_inst] - # check that all datasets are known for dataset in self.stitching_datasets: if not self.config_inst.has_dataset(dataset): @@ -341,7 +341,12 @@ def normalization_weights_init(self: Producer) -> None: """ self.produces.add(self.weight_name) if self.allow_stitching: - self.produces.add(f"{self.weight_name}_incl") + self.stitching_datasets = self.get_stitching_datasets() + else: + self.stitching_datasets = [self.dataset_inst] + + if self.allow_stitching and self.dataset_inst == self.get_inclusive_dataset(): + self.produces.add(f"{self.weight_name}_inclusive_only") stitched_normalization_weights = normalization_weights.derive( From 88c4f2fce2ddc51b598ceee88a4b6a7141194c7b Mon Sep 17 00:00:00 2001 From: Anas Haddad Date: Wed, 6 Nov 2024 17:44:38 +0100 Subject: [PATCH 46/61] more fixes :) --- columnflow/production/normalization.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/columnflow/production/normalization.py b/columnflow/production/normalization.py index 343bc359d..d5271b279 100644 --- a/columnflow/production/normalization.py +++ b/columnflow/production/normalization.py @@ -176,9 +176,9 @@ def normalization_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Arra attribute. When py:attr`allow_stitching` is set to True, the sum of event weights is computed for all datasets with a leaf process contained in the leaf processes of the py:attr:`dataset_inst`. For stitching, the process_id needs to be reconstructed for each leaf - process on a per event basis. Moreover, when stitching is enabled, the an additional normalization + process on a per event basis. Moreover, when stitching is enabled, an additional normalization weight is computed for the inclusive dataset only and stored in a column named - py:attr:`weight_name`_inclusive_only. This weight resembles the normalization weight for the + `_inclusive_only`. This weight resembles the normalization weight for the inclusive dataset, as if it were unstitched and should therefore only be applied, when using the inclusive dataset as a standalone dataset. @@ -203,9 +203,9 @@ def normalization_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Arra events = set_ak_column(events, self.weight_name, norm_weight, value_type=np.float32) # If we are stitching, we also compute the inclusive weight for debugging purposes - if self.allow_stitching and self.dataset_inst == self.get_inclusive_dataset(): + if self.allow_stitching and self.dataset_inst == self.inclusive_dataset: incl_norm_weight = events.mc_weight * self.inclusive_weight - events = set_ak_column(events, f"{self.weight_name}_inclusive_only", incl_norm_weight, value_type=np.float32) + events = set_ak_column(events, self.weight_name_incl, incl_norm_weight, value_type=np.float32) return events @@ -296,7 +296,7 @@ def normalization_weights_setup( # create a event weight lookup table process_weight_table = sp.sparse.lil_matrix((1, max_id + 1), dtype=np.float32) if self.allow_stitching and self.get_xsecs_from_inclusive_dataset: - inclusive_dataset = self.get_inclusive_dataset() + inclusive_dataset = self.inclusive_dataset logger.info(f"using inclusive dataset {inclusive_dataset.name} for cross section lookup") # get the branching ratios from the inclusive sample @@ -342,11 +342,13 @@ def normalization_weights_init(self: Producer) -> None: self.produces.add(self.weight_name) if self.allow_stitching: self.stitching_datasets = self.get_stitching_datasets() + self.inclusive_dataset = self.get_inclusive_dataset() else: self.stitching_datasets = [self.dataset_inst] - if self.allow_stitching and self.dataset_inst == self.get_inclusive_dataset(): - self.produces.add(f"{self.weight_name}_inclusive_only") + if self.allow_stitching and self.dataset_inst == self.inclusive_dataset: + self.weight_name_incl = f"{self.weight_name}_inclusive_only" + self.produces.add(self.weight_name_incl) stitched_normalization_weights = normalization_weights.derive( From 15459d10b2cfbfecb01554675515bb4b67f9e82c Mon Sep 17 00:00:00 2001 From: Anas Haddad Date: Wed, 6 Nov 2024 17:54:04 +0100 Subject: [PATCH 47/61] skip init for tasks without dataset_inst attr --- columnflow/production/normalization.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/columnflow/production/normalization.py b/columnflow/production/normalization.py index d5271b279..203b57b63 100644 --- a/columnflow/production/normalization.py +++ b/columnflow/production/normalization.py @@ -339,6 +339,9 @@ def normalization_weights_init(self: Producer) -> None: """ Initializes the normalization weights producer by setting up the normalization weight column. """ + if getattr(self, "dataset_inst", None) is None: + return + self.produces.add(self.weight_name) if self.allow_stitching: self.stitching_datasets = self.get_stitching_datasets() From 55f77c1f1bf41568ea224269203fb16c5ef31a42 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 7 Nov 2024 18:48:53 +0100 Subject: [PATCH 48/61] Fix default slurm flavor. --- setup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.sh b/setup.sh index 68ca978f5..b1ae1c81a 100644 --- a/setup.sh +++ b/setup.sh @@ -318,7 +318,7 @@ cf_setup_common_variables() { # used by law.cfg and, in turn, tasks/framework/remote.py local cf_htcondor_flavor_default="naf" local cf_slurm_flavor_default="maxwell" - local cf_slurm_partition_default="cms-uhh" + local cf_slurm_partition_default="maxgpu" local hname="$( hostname 2> /dev/null )" if [ "$?" = "0" ]; then # lxplus From ef4f9594dc6b4531e4c9817f96efe46e001e7090 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 7 Nov 2024 18:49:49 +0100 Subject: [PATCH 49/61] Add claw = 'cf_sandbox SANDBOX law ...' shorthand. --- analysis_templates/cms_minimal/setup.sh | 3 +++ bin/claw | 33 +++++++++++++++++++++++++ setup.sh | 3 +++ 3 files changed, 39 insertions(+) create mode 100755 bin/claw diff --git a/analysis_templates/cms_minimal/setup.sh b/analysis_templates/cms_minimal/setup.sh index 379e8169b..a6f164b30 100644 --- a/analysis_templates/cms_minimal/setup.sh +++ b/analysis_templates/cms_minimal/setup.sh @@ -134,6 +134,9 @@ setup___cf_short_name_lc__() { # source law's bash completion scipt source "$( law completion )" "" + # add completion to the claw command + complete -o bashdefault -o default -F _law_complete claw + # silently index law index -q fi diff --git a/bin/claw b/bin/claw new file mode 100755 index 000000000..3e986b2c2 --- /dev/null +++ b/bin/claw @@ -0,0 +1,33 @@ +#!/usr/bin/env bash + +# Executable that conveniently triggers "cf_sandbox SANDBOX law ..." commands. +# The SANDBOX is determined with the following precedence: +# 1. CLAW_SANDBOX (env) +# 2. analysis.default_columnar_sandbox (law.cfg) +# 3. venv_columnar_dev (default) + +action() { + # get the target sandbox + local sandbox + if [ ! -z "${CLAW_SANDBOX}" ]; then + sandbox="${CLAW_SANDBOX}" + fi + if [ -z "${sandbox}" ]; then + local sandbox_tmp="$( law config analysis.default_columnar_sandbox 2>/dev/null )" + if [ "$?" = "0" ]; then + # extract the name of the sandbox, remove file extension, potentially add '_dev' suffix + sandbox_tmp="$( basename "${sandbox_tmp}" )" + sandbox_tmp="${sandbox_tmp%.*}" + [[ "${sandbox_tmp}" = *_dev ]] || sandbox_tmp="${sandbox_tmp}_dev" + sandbox="${sandbox_tmp}" + fi + fi + if [ -z "${sandbox}" ]; then + sandbox="venv_columnar_dev" + fi + # echo "sandbox '${sandbox}'" + + # run the command + cf_sandbox "${sandbox}" law "$@" +} +action "$@" diff --git a/setup.sh b/setup.sh index b1ae1c81a..56da91ade 100644 --- a/setup.sh +++ b/setup.sh @@ -241,6 +241,9 @@ setup_columnflow() { # source law's bash completion scipt source "$( law completion )" "" + # add completion to the claw command + complete -o bashdefault -o default -F _law_complete claw + # silently index law index -q fi From 273b0ba7049e4999b742910ab53a67ad94526233 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 7 Nov 2024 18:50:04 +0100 Subject: [PATCH 50/61] Update upstream law. --- modules/law | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/law b/modules/law index 0aae7e6ff..540f48f4e 160000 --- a/modules/law +++ b/modules/law @@ -1 +1 @@ -Subproject commit 0aae7e6ffb4aea537a2f7e76ebdd66936a08c959 +Subproject commit 540f48f4e4e759541099cde49967a04bbe6772a9 From 94d2820f718596513c08a08b1f0bf77f1ffde9a7 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 7 Nov 2024 19:33:21 +0100 Subject: [PATCH 51/61] Draft for xrdcp fallback in nano file access. --- columnflow/tasks/external.py | 49 +++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index 32c634a6d..d3bb838f6 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -156,6 +156,7 @@ def iter_nano_files( fs: str | Sequence[str] | None = None, lfn_indices: list[int] | None = None, eager_lookup: bool | int = 1, + skip_fallback: bool = False, ) -> None: """ Generator function that reduces the boilerplate code for looping over files referred to by @@ -222,9 +223,19 @@ def iter_nano_files( is_local = law.target.file.get_scheme(fs_base) in (None, "file") logger.debug(f"fs {selected_fs} is {'local' if is_local else 'remote'}") target_cls = law.LocalFileTarget if is_local else law.wlcg.WLCGFileTarget + logger.debug(f"checking fs {selected_fs} for lfn {lfn}") + + # try an optional fallback to pre-emptively fetch the lfn if necessary + if not is_local and not skip_fallback: + # TODO: improve tmp file location, maybe still use the wlcg fs' cache if set + input_file = law.LocalFileTarget(is_tmp="root") + input_file = self._fetch_lfn_fallback(lfn, selected_fs, input_file) + if input_file: + input_stat = input_file.stat() + task.publish_message(f"using fs {selected_fs} via pre-emptive fetch") + break # measure the time required to perform the stat query - logger.debug(f"checking fs {selected_fs} for lfn {lfn}") input_file = target_cls(lfn.lstrip(os.sep) if is_local else lfn, fs=selected_fs) t1 = time.perf_counter() input_stat = input_file.exists(stat=True) @@ -269,6 +280,42 @@ def iter_nano_files( yield (lfn_index, input_file) + def _fetch_lfn_fallback( + self, + lfn: str, + selected_fs: str, + destination: law.LocalFileTarget, + force: bool = False, + ) -> law.LocalFileTarget | None: + # check if the file needs to be fetched in the first place + if not force: + # when gfal2 is available, no need to fetch + try: + import gfal2 # noqa: F401 + return None + except ImportError: + pass + + # get the base uri and check if the protocol is supported + base = ( + law.config.get_expanded(selected_fs, "base_filecopy", None) or + law.config.get_expanded(selected_fs, "base") + ) + scheme = law.target.file.get_scheme(base) + if scheme != "root": + raise NotImplementedError(f"fetching lfn via {scheme}:// is not supported") + uri = base + lfn + + # fetch via xrdcp + destination.parent.touch() + cmd = f"xrdcp -f {uri} {destination.path}" + code = law.util.interruptable_popen(cmd, shell=True, executable="/bin/bash")[0] + if code != 0: + logger.warning(f"xrdcp failed for {uri}") + return None + + return destination + GetDatasetLFNsWrapper = wrapper_factory( base_cls=AnalysisTask, From 986d9f9c52c4c1536458b5d0ff7c76fc0bcb5a9e Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 8 Nov 2024 11:11:40 +0100 Subject: [PATCH 52/61] Improve xrdcp fallback location in cache. --- columnflow/tasks/external.py | 56 +++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index d3bb838f6..b1cb29777 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -227,11 +227,10 @@ def iter_nano_files( # try an optional fallback to pre-emptively fetch the lfn if necessary if not is_local and not skip_fallback: - # TODO: improve tmp file location, maybe still use the wlcg fs' cache if set - input_file = law.LocalFileTarget(is_tmp="root") - input_file = self._fetch_lfn_fallback(lfn, selected_fs, input_file) + input_file, input_stat, is_tmp = self._fetch_lfn_fallback(lfn, selected_fs) if input_file: - input_stat = input_file.stat() + if is_tmp: + input_file.is_tmp = True task.publish_message(f"using fs {selected_fs} via pre-emptive fetch") break @@ -284,15 +283,16 @@ def _fetch_lfn_fallback( self, lfn: str, selected_fs: str, - destination: law.LocalFileTarget, force: bool = False, - ) -> law.LocalFileTarget | None: + ) -> tuple[law.LocalFileTarget | None, os.stat_result | None, bool]: + is_tmp = False + # check if the file needs to be fetched in the first place if not force: # when gfal2 is available, no need to fetch try: import gfal2 # noqa: F401 - return None + return None, None, is_tmp except ImportError: pass @@ -306,15 +306,45 @@ def _fetch_lfn_fallback( raise NotImplementedError(f"fetching lfn via {scheme}:// is not supported") uri = base + lfn - # fetch via xrdcp - destination.parent.touch() - cmd = f"xrdcp -f {uri} {destination.path}" + # helper to fetch via xrdcp + def fetch(path): + cmd = f"xrdcp -f {uri} {path}" + code = law.util.interruptable_popen(cmd, shell=True, executable="/bin/bash")[0] + if code != 0: + logger.warning(f"xrdcp failed for {uri}") + return code == 0 + + # if the corresponding fs has a cache and the lfn is already in there, return it + # (no need to perform in/validation checks via mtime for lfns) + wlcg_fs = law.wlcg.WLCGFileSystem(selected_fs) + if wlcg_fs.cache and lfn in wlcg_fs.cache: + destination = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) + return destination, destination.stat(), is_tmp + + # fetch the file + destination = law.LocalFileTarget(is_tmp="root") + cmd = f"xrdcp -f {uri} {destination.abspath}" code = law.util.interruptable_popen(cmd, shell=True, executable="/bin/bash")[0] if code != 0: logger.warning(f"xrdcp failed for {uri}") - return None - - return destination + return None, None, is_tmp + + # when there is a cache, move the file there + stat = destination.stat() + if wlcg_fs.cache: + with wlcg_fs.cache.lock(lfn): + wlcg_fs.cache.allocate(stat.st_size) + clfn = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) + destination.move_to_local(clfn) + destination = clfn + + else: + # here, the destination will be temporary, but set its tmp flag to False to prevent + # deletion at the end of this method and set the decision for later use instead + destination.is_tmp = False + is_tmp = True + + return destination, stat, is_tmp GetDatasetLFNsWrapper = wrapper_factory( From b137be4e26551fb540f599451a4d77038d22b99c Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 8 Nov 2024 11:25:10 +0100 Subject: [PATCH 53/61] Finalize xrdcp fallback logic. --- columnflow/tasks/external.py | 43 ++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index b1cb29777..4915ffebe 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -285,14 +285,25 @@ def _fetch_lfn_fallback( selected_fs: str, force: bool = False, ) -> tuple[law.LocalFileTarget | None, os.stat_result | None, bool]: - is_tmp = False - + """ + Fetches an *lfn* via fallback mechanisms. Currently, only ``xrdcp`` for remote file systems + *selected_fs* with `root://` bases is supported. Unless *force* is *True*, no fallbacks are + performed in case they are not necessary in the first place (determine by the availability + of the ``gfal2`` package). + + :param lfn: Logical file name to fetch. + :param selected_fs: Name of the file system to fetch the LFN from. + :param force: When *True*, forces the fallback to be performed, defaults to *False*. + :return: Tuple of the fetched file, its stat, and a flag indicating whether the file is + temporary. *None*'s are returned when the file was not fetched. + """ # check if the file needs to be fetched in the first place + no_result = None, None, False if not force: # when gfal2 is available, no need to fetch try: import gfal2 # noqa: F401 - return None, None, is_tmp + return no_result except ImportError: pass @@ -306,28 +317,20 @@ def _fetch_lfn_fallback( raise NotImplementedError(f"fetching lfn via {scheme}:// is not supported") uri = base + lfn - # helper to fetch via xrdcp - def fetch(path): - cmd = f"xrdcp -f {uri} {path}" - code = law.util.interruptable_popen(cmd, shell=True, executable="/bin/bash")[0] - if code != 0: - logger.warning(f"xrdcp failed for {uri}") - return code == 0 - # if the corresponding fs has a cache and the lfn is already in there, return it # (no need to perform in/validation checks via mtime for lfns) wlcg_fs = law.wlcg.WLCGFileSystem(selected_fs) if wlcg_fs.cache and lfn in wlcg_fs.cache: destination = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) - return destination, destination.stat(), is_tmp + return destination, destination.stat(), False - # fetch the file + # fetch the file into a temporary location first destination = law.LocalFileTarget(is_tmp="root") cmd = f"xrdcp -f {uri} {destination.abspath}" code = law.util.interruptable_popen(cmd, shell=True, executable="/bin/bash")[0] if code != 0: logger.warning(f"xrdcp failed for {uri}") - return None, None, is_tmp + return no_result # when there is a cache, move the file there stat = destination.stat() @@ -337,14 +340,12 @@ def fetch(path): clfn = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) destination.move_to_local(clfn) destination = clfn + return destination, stat, False - else: - # here, the destination will be temporary, but set its tmp flag to False to prevent - # deletion at the end of this method and set the decision for later use instead - destination.is_tmp = False - is_tmp = True - - return destination, stat, is_tmp + # here, the destination will be temporary, but set its tmp flag to False to prevent its + # deletion when this method goes out of scope, and set the decision for later use instead + destination.is_tmp = False + return destination, stat, True GetDatasetLFNsWrapper = wrapper_factory( From 53003c6ee01b00e771e05e6564df99b17201dfa1 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 8 Nov 2024 14:19:37 +0100 Subject: [PATCH 54/61] Typos. --- columnflow/tasks/external.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index 4915ffebe..af533a266 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -288,7 +288,7 @@ def _fetch_lfn_fallback( """ Fetches an *lfn* via fallback mechanisms. Currently, only ``xrdcp`` for remote file systems *selected_fs* with `root://` bases is supported. Unless *force* is *True*, no fallbacks are - performed in case they are not necessary in the first place (determine by the availability + performed in case they are not necessary in the first place (determined by the availability of the ``gfal2`` package). :param lfn: Logical file name to fetch. @@ -339,8 +339,7 @@ def _fetch_lfn_fallback( wlcg_fs.cache.allocate(stat.st_size) clfn = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) destination.move_to_local(clfn) - destination = clfn - return destination, stat, False + return clfn, stat, False # here, the destination will be temporary, but set its tmp flag to False to prevent its # deletion when this method goes out of scope, and set the decision for later use instead From 62c25b5b745a8f0a6dd1f7ac2114cdc3d4cb0220 Mon Sep 17 00:00:00 2001 From: Philip Keicher Date: Thu, 14 Nov 2024 14:29:04 +0100 Subject: [PATCH 55/61] hotfix: add repository-unspecific columnflow files to exclude of BundleRepo to avoid unnecessary copies --- columnflow/tasks/framework/remote.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index 5099719d3..be7e8f747 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -26,7 +26,22 @@ class BundleRepo(AnalysisTask, law.git.BundleGitRepository, law.tasks.TransferLo user = user_parameter_inst version = None - exclude_files = ["docs", "tests", "data", "assets", ".law", ".setups", ".data", ".github"] + exclude_files = [ + "docs", + "tests", + "data", + "assets", + ".law", + ".setups", + ".data", + ".github", + # also make sure that CF specific files that are not part of + # the repository are excluded + os.environ["CF_STORE_LOCAL"], + os.environ["CF_SOFTWARE_BASE"], + os.environ["CF_VENV_BASE"], + os.environ["CF_CONDA_BASE"], + ] def get_repo_path(self): # required by BundleGitRepository From 5c88c5e6884c418963bdc950cd8bd31db1927a7d Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Thu, 14 Nov 2024 15:30:05 +0100 Subject: [PATCH 56/61] Address review comments. --- columnflow/tasks/external.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/columnflow/tasks/external.py b/columnflow/tasks/external.py index af533a266..ada63c782 100644 --- a/columnflow/tasks/external.py +++ b/columnflow/tasks/external.py @@ -170,6 +170,7 @@ def iter_nano_files( :param fs: Name of the local or remote file system where the LFNs are located, defaults to None :param lfn_indices: List of indices of LFNs that are processed by this *task* instance, defaults to None :param eager_lookup: Look at the next fs if stat takes too long, defaults to 1 + :param skip_fallback: Skip the fallback mechanism to fetch the LFN, defaults to False :raises TypeError: If *task* is not of type :external+law:py:class:`~law.workflow.base.BaseWorkflow` or not a task analyzing a single branch in the task tree :raises Exception: If current task is not complete as indicated with ``self.complete()`` @@ -292,7 +293,8 @@ def _fetch_lfn_fallback( of the ``gfal2`` package). :param lfn: Logical file name to fetch. - :param selected_fs: Name of the file system to fetch the LFN from. + :param selected_fs: Name of the file system to fetch the LFN from. When remote, its *base* + or *base_filecopy* should use the `root://` protocol. :param force: When *True*, forces the fallback to be performed, defaults to *False*. :return: Tuple of the fetched file, its stat, and a flag indicating whether the file is temporary. *None*'s are returned when the file was not fetched. From 4e72ca6383093d9cf69558a9116e82b531c1e955 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 15 Nov 2024 18:48:43 +0100 Subject: [PATCH 57/61] Make merging tasks remote workflows. --- columnflow/tasks/reduction.py | 49 +++++++++++++++++++++++++++++------ columnflow/tasks/selection.py | 9 +++---- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/columnflow/tasks/reduction.py b/columnflow/tasks/reduction.py index dca0ae49d..6ee9c852a 100644 --- a/columnflow/tasks/reduction.py +++ b/columnflow/tasks/reduction.py @@ -260,6 +260,8 @@ class MergeReductionStats( SelectorStepsMixin, CalibratorsMixin, DatasetTask, + law.LocalWorkflow, + RemoteWorkflow, ): n_inputs = luigi.IntParameter( @@ -279,6 +281,7 @@ class MergeReductionStats( # upstream requirements reqs = Requirements( + RemoteWorkflow.reqs, ReduceEvents=ReduceEvents, ) @@ -298,10 +301,31 @@ def resolve_param_values(cls, params): return params + def create_branch_map(self): + # single branch without payload + return {0: None} + + def workflow_requires(self): + reqs = super().workflow_requires() + if self.merged_size == 0: + return reqs + + reqs["events"] = self.reqs.ReduceEvents.req_different_branching( + self, + branches=((0, self.n_inputs),), + ) + return reqs + def requires(self): if self.merged_size == 0: return [] - return self.reqs.ReduceEvents.req(self, branches=((0, self.n_inputs),)) + + return self.reqs.ReduceEvents.req_different_branching( + self, + workflow="local", + branches=((0, self.n_inputs),), + _exclude={"branch"}, + ) def output(self): return {"stats": self.target(f"stats_n{self.n_inputs}.json")} @@ -429,7 +453,7 @@ def is_sandboxed(self): @law.workflow_property(setter=True, cache=True, empty_value=0) def file_merging(self): # check if the merging stats are present - stats = self.reqs.MergeReductionStats.req(self).output()["stats"] + stats = self.reqs.MergeReductionStats.req_different_branching(self, branch=0).output()["stats"] return stats.load(formatter="json")["merge_factor"] if stats.exists() else 0 @law.dynamic_workflow_condition @@ -444,14 +468,14 @@ def create_branch_map(self): def merge_workflow_requires(self): return { - "stats": self.reqs.MergeReductionStats.req(self), + "stats": self.reqs.MergeReductionStats.req_different_branching(self), "events": self.reqs.ReduceEvents.req_different_branching(self, branches=((0, -1),)), } def merge_requires(self, start_branch, end_branch): return { - "stats": self.reqs.MergeReductionStats.req(self), - "events": self.reqs.ReduceEvents.req( + "stats": self.reqs.MergeReductionStats.req_different_branching(self, branch=0), + "events": self.reqs.ReduceEvents.req_different_branching( self, branches=((start_branch, end_branch),), workflow="local", @@ -508,6 +532,7 @@ class ProvideReducedEvents( CalibratorsMixin, DatasetTask, law.LocalWorkflow, + RemoteWorkflow, ): skip_merging = luigi.BoolParameter( @@ -524,18 +549,26 @@ class ProvideReducedEvents( # upstream requirements reqs = Requirements( + RemoteWorkflow.reqs, ReduceEvents=ReduceEvents, MergeReductionStats=MergeReductionStats, MergeReducedEvents=MergeReducedEvents, ) + @classmethod + def _resolve_workflow_parameters(cls, params): + # always fallback to local workflows + params["effective_workflow"] = "local" + + return super()._resolve_workflow_parameters(params) + @law.workflow_property(setter=True, cache=True, empty_value=0) def file_merging(self): if self.skip_merging or self.dataset_info_inst.n_files == 1: return 1 # check if the merging stats are present - stats = self.reqs.MergeReductionStats.req(self).output()["stats"] + stats = self.reqs.MergeReductionStats.req_different_branching(self, branch=0).output()["stats"] return stats.load(formatter="json")["merge_factor"] if stats.exists() else 0 @law.dynamic_workflow_condition @@ -576,7 +609,7 @@ def workflow_requires(self): reqs["events"] = self._req_reduced_events() else: # here, the merging is unclear so require the stats - reqs["reduction_stats"] = self.reqs.MergeReductionStats.req(self) + reqs["reduction_stats"] = self.reqs.MergeReductionStats.req_different_branching(self) if self.force_merging: # require merged events when forced @@ -598,7 +631,7 @@ def requires(self): if self.skip_merging or (not self.force_merging and self.dataset_info_inst.n_files == 1): reqs["events"] = self._req_reduced_events() else: - reqs["reduction_stats"] = self.reqs.MergeReductionStats.req(self) + reqs["reduction_stats"] = self.reqs.MergeReductionStats.req_different_branching(self, branch=0) if self.force_merging: reqs["events"] = self._req_merged_reduced_events() diff --git a/columnflow/tasks/selection.py b/columnflow/tasks/selection.py index 24746b007..a59080bd9 100644 --- a/columnflow/tasks/selection.py +++ b/columnflow/tasks/selection.py @@ -286,6 +286,7 @@ class MergeSelectionStats( CalibratorsMixin, DatasetTask, law.tasks.ForestMerge, + RemoteWorkflow, ): # flag that sets the *hists* output to optional if True selection_hists_optional = default_selection_hists_optional @@ -296,11 +297,9 @@ class MergeSelectionStats( # merge 25 stats files into 1 at every step of the merging cascade merge_factor = 25 - # skip receiving some parameters via req - exclude_params_req_get = {"workflow"} - # upstream requirements reqs = Requirements( + RemoteWorkflow.reqs, SelectEvents=SelectEvents, ) @@ -309,10 +308,10 @@ def create_branch_map(self): return law.tasks.ForestMerge.create_branch_map(self) def merge_workflow_requires(self): - return self.reqs.SelectEvents.req(self, _exclude={"branches"}) + return self.reqs.SelectEvents.req_different_branching(self, _exclude={"branches"}) def merge_requires(self, start_branch, end_branch): - return self.reqs.SelectEvents.req( + return self.reqs.SelectEvents.req_different_branching( self, branches=((start_branch, end_branch),), workflow="local", From 7f0a1dd3219ec7748dcf02f4a9ede21aebed7248 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Fri, 15 Nov 2024 18:49:12 +0100 Subject: [PATCH 58/61] Add --htcondor-disk parameter. --- columnflow/tasks/framework/remote.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index be7e8f747..ce6e5fd35 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -568,6 +568,13 @@ class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkfl description="requested memeory in MB; empty value leads to the cluster default setting; " "empty default", ) + htcondor_disk = law.BytesParameter( + default=law.NO_FLOAT, + unit="GB", + significant=False, + description="requested disk space in GB; empty value leads to the cluster default setting; " + "empty default", + ) htcondor_flavor = luigi.ChoiceParameter( default=_default_htcondor_flavor, choices=( @@ -697,6 +704,12 @@ def htcondor_job_config(self, config, job_num, branches): if self.htcondor_memory is not None and self.htcondor_memory > 0: config.custom_content.append(("Request_Memory", self.htcondor_memory)) + # request disk space + if self.htcondor_disk is not None and self.htcondor_disk > 0: + # TODO: the exact conversion might be flavor dependent in the future, use kB for npw + # e.g. https://confluence.desy.de/pages/viewpage.action?pageId=128354529 + config.custom_content.append(("RequestDisk", self.htcondor_disk * 1024**2)) + # render variables config.render_variables["cf_bootstrap_name"] = "htcondor_standalone" if self.htcondor_flavor not in ("", law.NO_STR): From 859111e5a86aaa68114ead4922ac450cb133f25e Mon Sep 17 00:00:00 2001 From: Mathis Frahm Date: Mon, 18 Nov 2024 09:43:45 +0100 Subject: [PATCH 59/61] produce inclusive_only norm weight only when get_br_from_inclusive_dataset=True --- columnflow/production/normalization.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/columnflow/production/normalization.py b/columnflow/production/normalization.py index 203b57b63..dc305e628 100644 --- a/columnflow/production/normalization.py +++ b/columnflow/production/normalization.py @@ -203,7 +203,7 @@ def normalization_weights(self: Producer, events: ak.Array, **kwargs) -> ak.Arra events = set_ak_column(events, self.weight_name, norm_weight, value_type=np.float32) # If we are stitching, we also compute the inclusive weight for debugging purposes - if self.allow_stitching and self.dataset_inst == self.inclusive_dataset: + if self.allow_stitching and self.get_xsecs_from_inclusive_dataset and self.dataset_inst == self.inclusive_dataset: incl_norm_weight = events.mc_weight * self.inclusive_weight events = set_ak_column(events, self.weight_name_incl, incl_norm_weight, value_type=np.float32) return events @@ -349,7 +349,7 @@ def normalization_weights_init(self: Producer) -> None: else: self.stitching_datasets = [self.dataset_inst] - if self.allow_stitching and self.dataset_inst == self.inclusive_dataset: + if self.allow_stitching and self.get_xsecs_from_inclusive_dataset and self.dataset_inst == self.inclusive_dataset: self.weight_name_incl = f"{self.weight_name}_inclusive_only" self.produces.add(self.weight_name_incl) From c79c6824cb5a4af8488dcc6acab1afda36cbf577 Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 25 Nov 2024 20:37:43 +0100 Subject: [PATCH 60/61] Default disk value from config. --- analysis_templates/cms_minimal/law.cfg | 1 + columnflow/tasks/framework/base.py | 2 +- columnflow/tasks/framework/remote.py | 9 +++++++-- law.cfg | 1 + 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/analysis_templates/cms_minimal/law.cfg b/analysis_templates/cms_minimal/law.cfg index 3fce2ce3e..0dcbc6d15 100644 --- a/analysis_templates/cms_minimal/law.cfg +++ b/analysis_templates/cms_minimal/law.cfg @@ -56,6 +56,7 @@ skip_ensure_proxy: False # some remote workflow parameter defaults htcondor_flavor: $CF_HTCONDOR_FLAVOR htcondor_share_software: False +htcondor_disk: -1 slurm_flavor: $CF_SLURM_FLAVOR slurm_partition: $CF_SLURM_PARTITION diff --git a/columnflow/tasks/framework/base.py b/columnflow/tasks/framework/base.py index 43002d099..85bd05e86 100644 --- a/columnflow/tasks/framework/base.py +++ b/columnflow/tasks/framework/base.py @@ -142,7 +142,7 @@ def req_params(cls, inst: AnalysisTask, **kwargs) -> dict: _prefer_cli = law.util.make_set(kwargs.get("_prefer_cli", [])) | { "version", "workflow", "job_workers", "poll_interval", "walltime", "max_runtime", "retries", "acceptance", "tolerance", "parallel_jobs", "shuffle_jobs", "htcondor_cpus", - "htcondor_gpus", "htcondor_memory", "htcondor_pool", "pilot", + "htcondor_gpus", "htcondor_memory", "htcondor_disk", "htcondor_pool", "pilot", } kwargs["_prefer_cli"] = _prefer_cli diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index ce6e5fd35..a1f565307 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -528,6 +528,11 @@ def common_destination_info(self, info: dict[str, str]) -> dict[str, str]: _default_htcondor_flavor = law.config.get_expanded("analysis", "htcondor_flavor", law.NO_STR) _default_htcondor_share_software = law.config.get_expanded_boolean("analysis", "htcondor_share_software", False) +_default_htcondor_disk = law.util.parse_bytes( + law.config.get_expanded_float("analysis", "htcondor_disk", law.NO_FLOAT), + input_unit="GB", + unit="GB", +) class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkflowMixin): @@ -569,11 +574,11 @@ class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkfl "empty default", ) htcondor_disk = law.BytesParameter( - default=law.NO_FLOAT, + default=_default_htcondor_disk, unit="GB", significant=False, description="requested disk space in GB; empty value leads to the cluster default setting; " - "empty default", + f"{'empty default' if _default_htcondor_disk <= 0 else 'default: ' + str(_default_htcondor_disk)}", ) htcondor_flavor = luigi.ChoiceParameter( default=_default_htcondor_flavor, diff --git a/law.cfg b/law.cfg index 1d07fb288..6650da0ff 100644 --- a/law.cfg +++ b/law.cfg @@ -52,6 +52,7 @@ skip_ensure_proxy: False # some remote workflow parameter defaults htcondor_flavor: $CF_HTCONDOR_FLAVOR htcondor_share_software: False +htcondor_disk: -1 slurm_flavor: $CF_SLURM_FLAVOR slurm_partition: $CF_SLURM_PARTITION From c7a4cce3b79afb76a1a3a140df99bc6160d37e7e Mon Sep 17 00:00:00 2001 From: "Marcel R." Date: Mon, 25 Nov 2024 20:39:56 +0100 Subject: [PATCH 61/61] Typo. --- columnflow/tasks/framework/remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/columnflow/tasks/framework/remote.py b/columnflow/tasks/framework/remote.py index a1f565307..8a747118e 100644 --- a/columnflow/tasks/framework/remote.py +++ b/columnflow/tasks/framework/remote.py @@ -570,7 +570,7 @@ class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkfl default=law.NO_FLOAT, unit="MB", significant=False, - description="requested memeory in MB; empty value leads to the cluster default setting; " + description="requested memory in MB; empty value leads to the cluster default setting; " "empty default", ) htcondor_disk = law.BytesParameter(