From 5afdd42f5c7d23d23b3d0367c139fc5dd1b27784 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 15 Jun 2023 14:48:32 -0400 Subject: [PATCH 1/4] Ensure QG output refs have the right run. We look up all outputs refs in the skip_existing_in collections, but we don't want to put those refs in the graph unless they're in the actually from the output run. --- python/lsst/pipe/base/graphBuilder.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index 1e130010..deee57f8 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -645,10 +645,12 @@ def resolveRef(self, dataset_type: DatasetType, data_id: DataCoordinate) -> Data self.resolved[key] = resolved return resolved - def resolveDict(self, dataset_type: DatasetType, refs: dict[DataCoordinate, _RefHolder]) -> None: + def resolveDict( + self, dataset_type: DatasetType, refs: dict[DataCoordinate, _RefHolder], is_output: bool + ) -> None: """Resolve all unresolved references in the provided dictionary.""" for data_id, holder in refs.items(): - if holder.ref is None: + if holder.ref is None or (is_output and holder.ref.run != self.run): holder.ref = self.resolveRef(holder.dataset_type, data_id) @@ -1213,7 +1215,7 @@ def resolveDatasetRefs( # Resolve the missing refs, just so they look like all of the others; # in the end other code will make sure they never appear in the QG. for dataset_type, refDict in self.missing.items(): - idMaker.resolveDict(dataset_type, refDict) + idMaker.resolveDict(dataset_type, refDict, is_output=False) # Copy the resolved DatasetRefs to the _QuantumScaffolding objects, # replacing the unresolved refs there, and then look up prerequisites. @@ -1331,14 +1333,16 @@ def resolveDatasetRefs( task.prerequisites[datasetType][ref.dataId] = _RefHolder(datasetType, ref) # Resolve all quantum inputs and outputs. - for datasetDict in (quantum.inputs, quantum.outputs): - for dataset_type, refDict in datasetDict.items(): - idMaker.resolveDict(dataset_type, refDict) + for dataset_type, refDict in quantum.inputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=False) + for dataset_type, refDict in quantum.outputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=True) # Resolve task initInputs and initOutputs. - for datasetDict in (task.initInputs, task.initOutputs): - for dataset_type, refDict in datasetDict.items(): - idMaker.resolveDict(dataset_type, refDict) + for dataset_type, refDict in task.initInputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=False) + for dataset_type, refDict in task.initOutputs.items(): + idMaker.resolveDict(dataset_type, refDict, is_output=True) # Actually remove any quanta that we decided to skip above. if dataIdsSucceeded: @@ -1378,7 +1382,7 @@ def resolveDatasetRefs( if global_dataset_types: self.globalInitOutputs = _DatasetDict.fromSubset(global_dataset_types, self.initOutputs) for dataset_type, refDict in self.globalInitOutputs.items(): - idMaker.resolveDict(dataset_type, refDict) + idMaker.resolveDict(dataset_type, refDict, is_output=True) def makeQuantumGraph( self, From a9022a5ddf835a221d56deb12273985bec893003 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 20 Jun 2023 10:50:56 -0400 Subject: [PATCH 2/4] Allow exception type to be defaulted when mocking task errors. --- python/lsst/pipe/base/tests/mocks/_pipeline_task.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py index 392fda4c..7169a119 100644 --- a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py +++ b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py @@ -49,7 +49,7 @@ def mock_task_defs( originals: Iterable[TaskDef], unmocked_dataset_types: Iterable[str] = (), - force_failures: Mapping[str, tuple[str, type[Exception]]] | None = None, + force_failures: Mapping[str, tuple[str, type[Exception] | None]] | None = None, ) -> list[TaskDef]: """Create mocks for an iterable of TaskDefs. @@ -61,7 +61,7 @@ def mock_task_defs( Names of overall-input dataset types that should not be replaced with mocks. force_failures : `~collections.abc.Mapping` [ `str`, `tuple` [ `str`, \ - `type` [ `Exception` ] ] ] + `type` [ `Exception` ] or `None` ] ] Mapping from original task label to a 2-tuple indicating that some quanta should raise an exception when executed. The first entry is a data ID match using the butler expression language (i.e. a string of @@ -87,7 +87,8 @@ def mock_task_defs( if original_task_def.label in force_failures: condition, exception_type = force_failures[original_task_def.label] config.fail_condition = condition - config.fail_exception = get_full_type_name(exception_type) + if exception_type is not None: + config.fail_exception = get_full_type_name(exception_type) mock_task_def = TaskDef( config=config, taskClass=MockPipelineTask, label=get_mock_name(original_task_def.label) ) From e6bc5bf6479caa5c341e7552c8455cad98eded66 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 20 Jun 2023 10:51:38 -0400 Subject: [PATCH 3/4] Don't consider partial quanta in non-output collections an error. Partial quanta only need to be clobbered if they are in the output RUN collection. --- python/lsst/pipe/base/graphBuilder.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index deee57f8..c9ee2978 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -1260,7 +1260,7 @@ def resolveDatasetRefs( continue else: dataIdsFailed.append(quantum.dataId) - if not clobberOutputs: + if not clobberOutputs and run_exists: raise OutputExistsError( f"Quantum {quantum.dataId} of task with label " f"'{quantum.task.taskDef.label}' has some outputs that exist " @@ -1355,25 +1355,21 @@ def resolveDatasetRefs( ) for dataId in dataIdsSucceeded: del task.quanta[dataId] - elif clobberOutputs: + elif clobberOutputs and run_exists: _LOG.info( "Found %d successful quanta for task with label '%s' " "that will need to be clobbered during execution.", len(dataIdsSucceeded), task.taskDef.label, ) - else: - raise AssertionError("OutputExistsError should have already been raised.") if dataIdsFailed: - if clobberOutputs: + if clobberOutputs and run_exists: _LOG.info( "Found %d failed/incomplete quanta for task with label '%s' " "that will need to be clobbered during execution.", len(dataIdsFailed), task.taskDef.label, ) - else: - raise AssertionError("OutputExistsError should have already been raised.") # Collect initOutputs that do not belong to any task. global_dataset_types: set[DatasetType] = set(self.initOutputs) From e79cf3c8681c83ca409a7d851a3fbe523fa43897 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 20 Jun 2023 12:02:41 -0400 Subject: [PATCH 4/4] Add changelog entry. --- doc/changes/DM-39672.bugfix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changes/DM-39672.bugfix.md diff --git a/doc/changes/DM-39672.bugfix.md b/doc/changes/DM-39672.bugfix.md new file mode 100644 index 00000000..34e3494b --- /dev/null +++ b/doc/changes/DM-39672.bugfix.md @@ -0,0 +1 @@ +Fix a bug in `QuantumGraph` generation that could result in datasets from `skip_existing_in` collections being used as outputs, and another that prevented `QuantumGraph` generation when a `skip_existing_in` collection has some outputs from a failed quantum.