diff --git a/doc/changes/DM-41962.bugfix.md b/doc/changes/DM-41962.bugfix.md new file mode 100644 index 00000000..5aaa7587 --- /dev/null +++ b/doc/changes/DM-41962.bugfix.md @@ -0,0 +1,4 @@ +Fix a storage class bug in registering dataset types in ``pipetask run``. + +Prior to this fix, the presence of multiple storage classes being associated with the same dataset type in a pipeline could cause the registered dataset type's storage class to be random and nondeterministic in regular `pipetask run` execution (but not quantum-backed butler execution). +It now follows the rules set by `PipelineGraph`, in which the definition in the task that produces the dataset wins. diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 1e00ca7d..64ebfc88 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -44,6 +44,7 @@ from lsst.daf.butler import DatasetRef, DatasetType from lsst.daf.butler.registry import ConflictingDefinitionError from lsst.pipe.base import PipelineDatasetTypes +from lsst.pipe.base import automatic_connection_constants as acc from lsst.utils.packages import Packages if TYPE_CHECKING: @@ -389,14 +390,36 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool pipelineDatasetTypes = PipelineDatasetTypes.fromPipeline( pipeline, registry=self.full_butler.registry, include_configs=True, include_packages=True ) - - for datasetTypes, is_input in ( + # The "registry dataset types" saved with the QG have had their storage + # classes carefully resolved by PipelineGraph, whereas the dataset + # types from PipelineDatasetTypes are a mess because it uses + # NamedValueSet and that ignores storage classes. It will be fully + # removed here (and deprecated everywhere) on DM-40441. + # Note that these "registry dataset types" include dataset types that + # are not actually registered yet; they're the PipelineGraph's + # determination of what _should_ be registered. + registry_storage_classes = { + dataset_type.name: dataset_type.storageClass_name for dataset_type in graph.registryDatasetTypes() + } + registry_storage_classes[acc.PACKAGES_INIT_OUTPUT_NAME] = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS + for dataset_types, is_input in ( (pipelineDatasetTypes.initIntermediates, True), (pipelineDatasetTypes.initOutputs, False), (pipelineDatasetTypes.intermediates, True), (pipelineDatasetTypes.outputs, False), ): - self._register_output_dataset_types(registerDatasetTypes, datasetTypes, is_input) + dataset_types = [ + ( + # The registry dataset types do not include components, + # but we don't support storage class overrides for those + # in other contexts anyway. + dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name]) + if not dataset_type.isComponent() + else dataset_type + ) + for dataset_type in dataset_types + ] + self._register_output_dataset_types(registerDatasetTypes, dataset_types, is_input) def _register_output_dataset_types( self, registerDatasetTypes: bool, datasetTypes: Iterable[DatasetType], is_input: bool