From 69c09e7b3e0a0173dc80b25cb2e7b40bb1ec83ba Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 30 Aug 2024 10:13:56 -0400 Subject: [PATCH 1/2] Adapt to change in QuantumGraphBuilder interface. --- python/lsst/analysis/tools/tasks/gatherResourceUsage.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py index faccc1f7d..95de70279 100644 --- a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py +++ b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py @@ -637,11 +637,12 @@ def __init__( # to do that again in process_subgraph, even though that's where most # QG builders do their queries. self.gather_inputs: dict[str, list[DatasetKey]] = {} + self.existing_inputs: list[DatasetRef] = [] for gather_task_label, gather_input_refs in metadata_refs.items(): gather_inputs_for_task: list[DatasetKey] = [] for ref in gather_input_refs: dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) - self.existing_datasets.inputs[dataset_key] = ref + self.existing_inputs.append(ref) gather_inputs_for_task.append(dataset_key) self.gather_inputs[gather_task_label] = gather_inputs_for_task @@ -689,6 +690,9 @@ def _add_gather_task( def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: skeleton = QuantumGraphSkeleton(subgraph.tasks.keys()) + for ref in self.existing_inputs: + skeleton.add_dataset_node(ref.datasetType.name, ref.dataId) + skeleton.set_dataset_ref(ref) consolidate_inputs = [] for task_node in subgraph.tasks.values(): if task_node.task_class is GatherResourceUsageTask: From a41db7483587e63d4b21daffc9b0a41633795469 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 30 Aug 2024 10:19:51 -0400 Subject: [PATCH 2/2] Fix broken non-metadata input checks in gatherResourceUsage. This is another reminder that MyPy isn't actually being run in this package despite all of the type annotations (DM-40716), and I suspect it was never seen at runtime because the CLI automatically avoids these cases. But that's no reason to leave it broken at a lower level. --- .../tools/tasks/gatherResourceUsage.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py index 95de70279..29087c830 100644 --- a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py +++ b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py @@ -617,6 +617,8 @@ def __init__( gather_task_label, gather_dataset_type_name = self._add_gather_task( pipeline_graph, input_metadata_dataset_type ) + if gather_task_label is None or gather_dataset_type_name is None: + continue metadata_refs[gather_task_label] = refs_for_type consolidate_config.input_names.append(gather_dataset_type_name) pipeline_graph.add_task( @@ -649,7 +651,7 @@ def __init__( @classmethod def _add_gather_task( cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType - ) -> tuple[str, str]: + ) -> tuple[str | None, str | None]: """Add a single configuration of `GatherResourceUsageTask` to a pipeline graph. @@ -664,15 +666,19 @@ def _add_gather_task( Returns ------- - gather_task_label : `str` - Label of the new task in the pipeline. - gather_dataset_type_name : `str - Name of the task's output table dataset type. + gather_task_label : `str` or `None` + Label of the new task in the pipeline, or `None` if the given + dataset type is not a metadata dataset type or is itself a + gatherResourceUsage metadata dataset type. + gather_dataset_type_name : `str or `None` + Name of the task's output table dataset type, or `None` if the + given dataset type is not a metadata dataset type or is itself a + gatherResourceUsage metadata dataset type. """ if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None: - return + return None, None elif "gatherResourceUsage" in input_metadata_dataset_type.name: - return + return None, None else: input_task_label = m.group(1) gather_task_label = f"{input_task_label}_gatherResourceUsage"