diff --git a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py index faccc1f7d..29087c830 100644 --- a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py +++ b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py @@ -617,6 +617,8 @@ def __init__( gather_task_label, gather_dataset_type_name = self._add_gather_task( pipeline_graph, input_metadata_dataset_type ) + if gather_task_label is None or gather_dataset_type_name is None: + continue metadata_refs[gather_task_label] = refs_for_type consolidate_config.input_names.append(gather_dataset_type_name) pipeline_graph.add_task( @@ -637,18 +639,19 @@ def __init__( # to do that again in process_subgraph, even though that's where most # QG builders do their queries. self.gather_inputs: dict[str, list[DatasetKey]] = {} + self.existing_inputs: list[DatasetRef] = [] for gather_task_label, gather_input_refs in metadata_refs.items(): gather_inputs_for_task: list[DatasetKey] = [] for ref in gather_input_refs: dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) - self.existing_datasets.inputs[dataset_key] = ref + self.existing_inputs.append(ref) gather_inputs_for_task.append(dataset_key) self.gather_inputs[gather_task_label] = gather_inputs_for_task @classmethod def _add_gather_task( cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType - ) -> tuple[str, str]: + ) -> tuple[str | None, str | None]: """Add a single configuration of `GatherResourceUsageTask` to a pipeline graph. @@ -663,15 +666,19 @@ def _add_gather_task( Returns ------- - gather_task_label : `str` - Label of the new task in the pipeline. - gather_dataset_type_name : `str - Name of the task's output table dataset type. + gather_task_label : `str` or `None` + Label of the new task in the pipeline, or `None` if the given + dataset type is not a metadata dataset type or is itself a + gatherResourceUsage metadata dataset type. + gather_dataset_type_name : `str or `None` + Name of the task's output table dataset type, or `None` if the + given dataset type is not a metadata dataset type or is itself a + gatherResourceUsage metadata dataset type. """ if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None: - return + return None, None elif "gatherResourceUsage" in input_metadata_dataset_type.name: - return + return None, None else: input_task_label = m.group(1) gather_task_label = f"{input_task_label}_gatherResourceUsage" @@ -689,6 +696,9 @@ def _add_gather_task( def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: skeleton = QuantumGraphSkeleton(subgraph.tasks.keys()) + for ref in self.existing_inputs: + skeleton.add_dataset_node(ref.datasetType.name, ref.dataId) + skeleton.set_dataset_ref(ref) consolidate_inputs = [] for task_node in subgraph.tasks.values(): if task_node.task_class is GatherResourceUsageTask: