Skip to content

Commit

Permalink
Use query-results' natural grouping by dataset type when possible.
Browse files Browse the repository at this point in the history
When processing all dataset types in a collection together, this can
represent a huge decrease in memory usage, by querying for and then
processing only one dataset type at a time.
  • Loading branch information
TallJimbo committed Jul 10, 2023
1 parent 21f011b commit a423901
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 4 deletions.
29 changes: 29 additions & 0 deletions python/lsst/daf/butler/core/datasets/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,41 @@ def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[Da
-------
grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ]
Grouped `DatasetRef` instances.
Notes
-----
When lazy item-iterables are acceptable instead of a full mapping,
`iter_by_type` can in some cases be far more efficient.
"""
result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict()
for ref in refs:
result.setdefault(ref.datasetType, []).append(ref)
return result

@staticmethod
def iter_by_type(
refs: Iterable[DatasetRef],
) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
"""Group an iterable of `DatasetRef` by `DatasetType` with special
hooks for custom iterables that can do this efficiently.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
`DatasetRef` instances to group. If this has a
``_iter_by_dataset_type`` method, it will be called with no
arguments and the result reutrnd.
Returns
-------
grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
`Iterable` [ `DatasetRef` ] ]]
Grouped `DatasetRef` instances.
"""
if hasattr(refs, "_iter_by_dataset_type"):
return refs._iter_by_dataset_type()
return DatasetRef.groupByType(refs).items()

def makeCompositeRef(self) -> DatasetRef:
"""Create a `DatasetRef` of the composite from a component ref.
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/daf/butler/registries/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ def removeDatasets(self, refs: Iterable[DatasetRef]) -> None:
# Docstring inherited from lsst.daf.butler.registry.Registry
progress = Progress("lsst.daf.butler.Registry.removeDatasets", level=logging.DEBUG)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Removing datasets by type"
DatasetRef.iter_by_type(refs), desc="Removing datasets by type"
):
storage = self._managers.datasets[datasetType.name]
try:
Expand All @@ -703,7 +703,7 @@ def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
f"Collection '{collection}' has type {collectionRecord.type.name}, not TAGGED."
)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Associating datasets by type"
DatasetRef.iter_by_type(refs), desc="Associating datasets by type"
):
storage = self._managers.datasets[datasetType.name]
try:
Expand Down Expand Up @@ -731,7 +731,7 @@ def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
f"Collection '{collection}' has type {collectionRecord.type.name}; expected TAGGED."
)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Disassociating datasets by type"
DatasetRef.iter_by_type(refs), desc="Disassociating datasets by type"
):
storage = self._managers.datasets[datasetType.name]
storage.disassociate(collectionRecord, refsForType)
Expand All @@ -744,7 +744,7 @@ def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespa
progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG)
collectionRecord = self._managers.collections.find(collection)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Certifying datasets by type"
DatasetRef.iter_by_type(refs), desc="Certifying datasets by type"
):
storage = self._managers.datasets[datasetType.name]
storage.certify(
Expand Down
22 changes: 22 additions & 0 deletions python/lsst/daf/butler/registry/queries/_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,21 @@ def explain_no_results(self, execute: bool = True) -> Iterable[str]:
"""
raise NotImplementedError()

def _iter_by_dataset_type(self) -> Iterator[tuple[DatasetType, Iterable[DatasetRef]]]:
"""Group results by dataset type.
This is a private hook for the interface defined by
`DatasetRef.iter_by_type`, enabling much more efficient
processing of heterogeneous `DatasetRef` iterables when they come
directly from queries.
"""
for parent_results in self.byParentDatasetType():
for component in parent_results.components:
dataset_type = parent_results.parentDatasetType
if component is not None:
dataset_type = dataset_type.makeComponentDatasetType(component)
yield (dataset_type, parent_results.withComponents((component,)))


class ParentDatasetQueryResults(DatasetQueryResults):
"""An object that represents results from a query for datasets with a
Expand Down Expand Up @@ -570,6 +585,13 @@ def __iter__(self) -> Iterator[DatasetRef]:
def __repr__(self) -> str:
return f"<DatasetRef iterator for [components of] {self._dataset_type.name}>"

@property
def components(self) -> Sequence[str | None]:
"""The components of the parent dataset type included in these results
(`~collections.abc.Sequence` [ `str` or `None` ]).
"""
return self._components

def byParentDatasetType(self) -> Iterator[ParentDatasetQueryResults]:
# Docstring inherited from DatasetQueryResults.
yield self
Expand Down

0 comments on commit a423901

Please sign in to comment.