Skip to content

Commit

Permalink
Use query-results' natural grouping by dataset type when possible.
Browse files Browse the repository at this point in the history
When processing all dataset types in a collection together, this can
represent a huge decrease in memory usage, by querying for and then
processing only one dataset type at a time.
  • Loading branch information
TallJimbo committed Jul 14, 2023
1 parent 9af7b4f commit a3ec38a
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 5 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-39939.perf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When passing lazy query-results objects directly to various registry methods (`associate`, `disassociate`, `removeDatasets`, and `certify`), query and process one dataset type at a time instead of querying for all of them and grouping by type in Python.
53 changes: 52 additions & 1 deletion python/lsst/daf/butler/core/datasets/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import sys
import uuid
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Protocol, runtime_checkable

from lsst.utils.classes import immutable

Expand Down Expand Up @@ -62,6 +62,27 @@ class AmbiguousDatasetError(Exception):
"""


@runtime_checkable
class _DatasetRefGroupedIterable(Protocol):
"""A package-private interface for iterables of `DatasetRef` that know how
to efficiently group their contents by `DatasetType`.
"""

def _iter_by_dataset_type(self) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
"""Iterate over `DatasetRef` instances, one `DatasetType` at a time.
Returns
-------
grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
`~collections.abc.Iterable` [ `DatasetRef` ]
An iterable of tuples, in which the first element is a dataset type
and the second is an iterable of `DatasetRef` objects with exactly
that dataset type.
"""
...

Check warning on line 83 in python/lsst/daf/butler/core/datasets/ref.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/core/datasets/ref.py#L83

Added line #L83 was not covered by tests


class DatasetIdGenEnum(enum.Enum):
"""Enum used to specify dataset ID generation options."""

Expand Down Expand Up @@ -580,12 +601,42 @@ def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[Da
-------
grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ]
Grouped `DatasetRef` instances.
Notes
-----
When lazy item-iterables are acceptable instead of a full mapping,
`iter_by_type` can in some cases be far more efficient.
"""
result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict()
for ref in refs:
result.setdefault(ref.datasetType, []).append(ref)
return result

@staticmethod
def iter_by_type(
refs: Iterable[DatasetRef],
) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
"""Group an iterable of `DatasetRef` by `DatasetType` with special
hooks for custom iterables that can do this efficiently.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
`DatasetRef` instances to group. If this satisfies the
`_DatasetRefGroupedIterable` protocol, its
`~_DatasetRefGroupedIterable._iter_by_dataset_type` method will
be called.
Returns
-------
grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
`Iterable` [ `DatasetRef` ] ]]
Grouped `DatasetRef` instances.
"""
if isinstance(refs, _DatasetRefGroupedIterable):
return refs._iter_by_dataset_type()
return DatasetRef.groupByType(refs).items()

def makeCompositeRef(self) -> DatasetRef:
"""Create a `DatasetRef` of the composite from a component ref.
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/daf/butler/registries/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ def removeDatasets(self, refs: Iterable[DatasetRef]) -> None:
# Docstring inherited from lsst.daf.butler.registry.Registry
progress = Progress("lsst.daf.butler.Registry.removeDatasets", level=logging.DEBUG)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Removing datasets by type"
DatasetRef.iter_by_type(refs), desc="Removing datasets by type"
):
storage = self._managers.datasets[datasetType.name]
try:
Expand All @@ -699,7 +699,7 @@ def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
f"Collection '{collection}' has type {collectionRecord.type.name}, not TAGGED."
)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Associating datasets by type"
DatasetRef.iter_by_type(refs), desc="Associating datasets by type"
):
storage = self._managers.datasets[datasetType.name]
try:
Expand Down Expand Up @@ -727,7 +727,7 @@ def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
f"Collection '{collection}' has type {collectionRecord.type.name}; expected TAGGED."
)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Disassociating datasets by type"
DatasetRef.iter_by_type(refs), desc="Disassociating datasets by type"
):
storage = self._managers.datasets[datasetType.name]
storage.disassociate(collectionRecord, refsForType)
Expand All @@ -740,7 +740,7 @@ def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespa
progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG)
collectionRecord = self._managers.collections.find(collection)
for datasetType, refsForType in progress.iter_item_chunks(
DatasetRef.groupByType(refs).items(), desc="Certifying datasets by type"
DatasetRef.iter_by_type(refs), desc="Certifying datasets by type"
):
storage = self._managers.datasets[datasetType.name]
storage.certify(
Expand Down
22 changes: 22 additions & 0 deletions python/lsst/daf/butler/registry/queries/_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,21 @@ def explain_no_results(self, execute: bool = True) -> Iterable[str]:
"""
raise NotImplementedError()

def _iter_by_dataset_type(self) -> Iterator[tuple[DatasetType, Iterable[DatasetRef]]]:
"""Group results by dataset type.
This is a private hook for the interface defined by
`DatasetRef.iter_by_type`, enabling much more efficient
processing of heterogeneous `DatasetRef` iterables when they come
directly from queries.
"""
for parent_results in self.byParentDatasetType():
for component in parent_results.components:
dataset_type = parent_results.parentDatasetType
if component is not None:
dataset_type = dataset_type.makeComponentDatasetType(component)

Check warning on line 544 in python/lsst/daf/butler/registry/queries/_results.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/queries/_results.py#L544

Added line #L544 was not covered by tests
yield (dataset_type, parent_results.withComponents((component,)))


class ParentDatasetQueryResults(DatasetQueryResults):
"""An object that represents results from a query for datasets with a
Expand Down Expand Up @@ -570,6 +585,13 @@ def __iter__(self) -> Iterator[DatasetRef]:
def __repr__(self) -> str:
return f"<DatasetRef iterator for [components of] {self._dataset_type.name}>"

@property
def components(self) -> Sequence[str | None]:
"""The components of the parent dataset type included in these results
(`~collections.abc.Sequence` [ `str` or `None` ]).
"""
return self._components

def byParentDatasetType(self) -> Iterator[ParentDatasetQueryResults]:
# Docstring inherited from DatasetQueryResults.
yield self
Expand Down

0 comments on commit a3ec38a

Please sign in to comment.