diff --git a/doc/changes/DM-39672.bugfix.md b/doc/changes/DM-39672.bugfix.md new file mode 100644 index 000000000..34e3494be --- /dev/null +++ b/doc/changes/DM-39672.bugfix.md @@ -0,0 +1 @@ +Fix a bug in `QuantumGraph` generation that could result in datasets from `skip_existing_in` collections being used as outputs, and another that prevented `QuantumGraph` generation when a `skip_existing_in` collection has some outputs from a failed quantum. diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index 1e130010b..c9ee29784 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -645,10 +645,12 @@ def resolveRef(self, dataset_type: DatasetType, data_id: DataCoordinate) -> Data self.resolved[key] = resolved return resolved - def resolveDict(self, dataset_type: DatasetType, refs: dict[DataCoordinate, _RefHolder]) -> None: + def resolveDict( + self, dataset_type: DatasetType, refs: dict[DataCoordinate, _RefHolder], is_output: bool + ) -> None: """Resolve all unresolved references in the provided dictionary.""" for data_id, holder in refs.items(): - if holder.ref is None: + if holder.ref is None or (is_output and holder.ref.run != self.run): holder.ref = self.resolveRef(holder.dataset_type, data_id) @@ -1213,7 +1215,7 @@ def resolveDatasetRefs( # Resolve the missing refs, just so they look like all of the others; # in the end other code will make sure they never appear in the QG. for dataset_type, refDict in self.missing.items(): - idMaker.resolveDict(dataset_type, refDict) + idMaker.resolveDict(dataset_type, refDict, is_output=False) # Copy the resolved DatasetRefs to the _QuantumScaffolding objects, # replacing the unresolved refs there, and then look up prerequisites. @@ -1258,7 +1260,7 @@ def resolveDatasetRefs( continue else: dataIdsFailed.append(quantum.dataId) - if not clobberOutputs: + if not clobberOutputs and run_exists: raise OutputExistsError( f"Quantum {quantum.dataId} of task with label " f"'{quantum.task.taskDef.label}' has some outputs that exist " @@ -1331,14 +1333,16 @@ def resolveDatasetRefs( task.prerequisites[datasetType][ref.dataId] = _RefHolder(datasetType, ref) # Resolve all quantum inputs and outputs. - for datasetDict in (quantum.inputs, quantum.outputs): - for dataset_type, refDict in datasetDict.items(): - idMaker.resolveDict(dataset_type, refDict) + for dataset_type, refDict in quantum.inputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=False) + for dataset_type, refDict in quantum.outputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=True) # Resolve task initInputs and initOutputs. - for datasetDict in (task.initInputs, task.initOutputs): - for dataset_type, refDict in datasetDict.items(): - idMaker.resolveDict(dataset_type, refDict) + for dataset_type, refDict in task.initInputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=False) + for dataset_type, refDict in task.initOutputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=True) # Actually remove any quanta that we decided to skip above. if dataIdsSucceeded: @@ -1351,25 +1355,21 @@ def resolveDatasetRefs( ) for dataId in dataIdsSucceeded: del task.quanta[dataId] - elif clobberOutputs: + elif clobberOutputs and run_exists: _LOG.info( "Found %d successful quanta for task with label '%s' " "that will need to be clobbered during execution.", len(dataIdsSucceeded), task.taskDef.label, ) - else: - raise AssertionError("OutputExistsError should have already been raised.") if dataIdsFailed: - if clobberOutputs: + if clobberOutputs and run_exists: _LOG.info( "Found %d failed/incomplete quanta for task with label '%s' " "that will need to be clobbered during execution.", len(dataIdsFailed), task.taskDef.label, ) - else: - raise AssertionError("OutputExistsError should have already been raised.") # Collect initOutputs that do not belong to any task. global_dataset_types: set[DatasetType] = set(self.initOutputs) @@ -1378,7 +1378,7 @@ def resolveDatasetRefs( if global_dataset_types: self.globalInitOutputs = _DatasetDict.fromSubset(global_dataset_types, self.initOutputs) for dataset_type, refDict in self.globalInitOutputs.items(): - idMaker.resolveDict(dataset_type, refDict) + idMaker.resolveDict(dataset_type, refDict, is_output=True) def makeQuantumGraph( self, diff --git a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py index 392fda4cd..7169a119f 100644 --- a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py +++ b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py @@ -49,7 +49,7 @@ def mock_task_defs( originals: Iterable[TaskDef], unmocked_dataset_types: Iterable[str] = (), - force_failures: Mapping[str, tuple[str, type[Exception]]] | None = None, + force_failures: Mapping[str, tuple[str, type[Exception] | None]] | None = None, ) -> list[TaskDef]: """Create mocks for an iterable of TaskDefs. @@ -61,7 +61,7 @@ def mock_task_defs( Names of overall-input dataset types that should not be replaced with mocks. force_failures : `~collections.abc.Mapping` [ `str`, `tuple` [ `str`, \ - `type` [ `Exception` ] ] ] + `type` [ `Exception` ] or `None` ] ] Mapping from original task label to a 2-tuple indicating that some quanta should raise an exception when executed. The first entry is a data ID match using the butler expression language (i.e. a string of @@ -87,7 +87,8 @@ def mock_task_defs( if original_task_def.label in force_failures: condition, exception_type = force_failures[original_task_def.label] config.fail_condition = condition - config.fail_exception = get_full_type_name(exception_type) + if exception_type is not None: + config.fail_exception = get_full_type_name(exception_type) mock_task_def = TaskDef( config=config, taskClass=MockPipelineTask, label=get_mock_name(original_task_def.label) )