From a42390197fa32d182a8baa8b2d00579887c768bf Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 8 Jul 2023 11:50:57 -0400 Subject: [PATCH] Use query-results' natural grouping by dataset type when possible. When processing all dataset types in a collection together, this can represent a huge decrease in memory usage, by querying for and then processing only one dataset type at a time. --- python/lsst/daf/butler/core/datasets/ref.py | 29 +++++++++++++++++++ python/lsst/daf/butler/registries/sql.py | 8 ++--- .../daf/butler/registry/queries/_results.py | 22 ++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/python/lsst/daf/butler/core/datasets/ref.py b/python/lsst/daf/butler/core/datasets/ref.py index 9c7faefd63..1789b2a260 100644 --- a/python/lsst/daf/butler/core/datasets/ref.py +++ b/python/lsst/daf/butler/core/datasets/ref.py @@ -580,12 +580,41 @@ def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[Da ------- grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ] Grouped `DatasetRef` instances. + + Notes + ----- + When lazy item-iterables are acceptable instead of a full mapping, + `iter_by_type` can in some cases be far more efficient. """ result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict() for ref in refs: result.setdefault(ref.datasetType, []).append(ref) return result + @staticmethod + def iter_by_type( + refs: Iterable[DatasetRef], + ) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]: + """Group an iterable of `DatasetRef` by `DatasetType` with special + hooks for custom iterables that can do this efficiently. + + Parameters + ---------- + refs : `~collections.abc.Iterable` [ `DatasetRef` ] + `DatasetRef` instances to group. If this has a + ``_iter_by_dataset_type`` method, it will be called with no + arguments and the result reutrnd. + + Returns + ------- + grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \ + `Iterable` [ `DatasetRef` ] ]] + Grouped `DatasetRef` instances. + """ + if hasattr(refs, "_iter_by_dataset_type"): + return refs._iter_by_dataset_type() + return DatasetRef.groupByType(refs).items() + def makeCompositeRef(self) -> DatasetRef: """Create a `DatasetRef` of the composite from a component ref. diff --git a/python/lsst/daf/butler/registries/sql.py b/python/lsst/daf/butler/registries/sql.py index a216a7ff2a..f6ff9429c1 100644 --- a/python/lsst/daf/butler/registries/sql.py +++ b/python/lsst/daf/butler/registries/sql.py @@ -683,7 +683,7 @@ def removeDatasets(self, refs: Iterable[DatasetRef]) -> None: # Docstring inherited from lsst.daf.butler.registry.Registry progress = Progress("lsst.daf.butler.Registry.removeDatasets", level=logging.DEBUG) for datasetType, refsForType in progress.iter_item_chunks( - DatasetRef.groupByType(refs).items(), desc="Removing datasets by type" + DatasetRef.iter_by_type(refs), desc="Removing datasets by type" ): storage = self._managers.datasets[datasetType.name] try: @@ -703,7 +703,7 @@ def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None: f"Collection '{collection}' has type {collectionRecord.type.name}, not TAGGED." ) for datasetType, refsForType in progress.iter_item_chunks( - DatasetRef.groupByType(refs).items(), desc="Associating datasets by type" + DatasetRef.iter_by_type(refs), desc="Associating datasets by type" ): storage = self._managers.datasets[datasetType.name] try: @@ -731,7 +731,7 @@ def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None: f"Collection '{collection}' has type {collectionRecord.type.name}; expected TAGGED." ) for datasetType, refsForType in progress.iter_item_chunks( - DatasetRef.groupByType(refs).items(), desc="Disassociating datasets by type" + DatasetRef.iter_by_type(refs), desc="Disassociating datasets by type" ): storage = self._managers.datasets[datasetType.name] storage.disassociate(collectionRecord, refsForType) @@ -744,7 +744,7 @@ def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespa progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG) collectionRecord = self._managers.collections.find(collection) for datasetType, refsForType in progress.iter_item_chunks( - DatasetRef.groupByType(refs).items(), desc="Certifying datasets by type" + DatasetRef.iter_by_type(refs), desc="Certifying datasets by type" ): storage = self._managers.datasets[datasetType.name] storage.certify( diff --git a/python/lsst/daf/butler/registry/queries/_results.py b/python/lsst/daf/butler/registry/queries/_results.py index ac2073ea92..c024037455 100644 --- a/python/lsst/daf/butler/registry/queries/_results.py +++ b/python/lsst/daf/butler/registry/queries/_results.py @@ -529,6 +529,21 @@ def explain_no_results(self, execute: bool = True) -> Iterable[str]: """ raise NotImplementedError() + def _iter_by_dataset_type(self) -> Iterator[tuple[DatasetType, Iterable[DatasetRef]]]: + """Group results by dataset type. + + This is a private hook for the interface defined by + `DatasetRef.iter_by_type`, enabling much more efficient + processing of heterogeneous `DatasetRef` iterables when they come + directly from queries. + """ + for parent_results in self.byParentDatasetType(): + for component in parent_results.components: + dataset_type = parent_results.parentDatasetType + if component is not None: + dataset_type = dataset_type.makeComponentDatasetType(component) + yield (dataset_type, parent_results.withComponents((component,))) + class ParentDatasetQueryResults(DatasetQueryResults): """An object that represents results from a query for datasets with a @@ -570,6 +585,13 @@ def __iter__(self) -> Iterator[DatasetRef]: def __repr__(self) -> str: return f"" + @property + def components(self) -> Sequence[str | None]: + """The components of the parent dataset type included in these results + (`~collections.abc.Sequence` [ `str` or `None` ]). + """ + return self._components + def byParentDatasetType(self) -> Iterator[ParentDatasetQueryResults]: # Docstring inherited from DatasetQueryResults. yield self