Skip to content

Commit

Permalink
Remove Query.dataset_associations and replace it with general met…
Browse files Browse the repository at this point in the history
…hod.

General result iterator now returns dictionaries indexed by strings.
  • Loading branch information
andy-slac committed Aug 7, 2024
1 parent 52abca0 commit 519d90b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 39 deletions.
26 changes: 15 additions & 11 deletions python/lsst/daf/butler/queries/_general_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from ._base import QueryResultsBase
from .driver import QueryDriver
from .result_specs import GeneralResultSpec
from .tree import QueryTree, ResultColumn
from .tree import QueryTree


@final
Expand Down Expand Up @@ -68,20 +68,22 @@ def __init__(self, driver: QueryDriver, tree: QueryTree, spec: GeneralResultSpec
super().__init__(driver, tree)
self._spec = spec

def __iter__(self) -> Iterator[dict[ResultColumn, Any]]:
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Iterate over result rows.
Yields
------
row_dict : `dict` [`ResultColumn`, `Any`]
Result row as dictionary, the keys are `ResultColumn` instances.
row_dict : `dict` [`str`, `Any`]
Result row as dictionary, the keys the names of the dimensions,
dimension fields (separated from dimension by dot) or dataset type
fields (separated from dataset type name by dot).
"""
for page in self._driver.execute(self._spec, self._tree):
columns = tuple(page.spec.get_result_columns())
columns = tuple(str(column) for column in page.spec.get_result_columns())
for row in page.rows:
yield dict(zip(columns, row))

def iter_refs(self, dataset_type: DatasetType) -> Iterator[tuple[DatasetRef, dict[ResultColumn, Any]]]:
def iter_refs(self, dataset_type: DatasetType) -> Iterator[tuple[DatasetRef, dict[str, Any]]]:
"""Iterate over result rows and return DatasetRef constructed from each
row and an original row.
Expand All @@ -94,13 +96,15 @@ def iter_refs(self, dataset_type: DatasetType) -> Iterator[tuple[DatasetRef, dic
------
dataset_ref : `DatasetRef`
Dataset reference.
row_dict : `dict` [`ResultColumn`, `Any`]
Result row as dictionary, the keys are `ResultColumn` instances.
row_dict : `dict` [`str`, `Any`]
Result row as dictionary, the keys the names of the dimensions,
dimension fields (separated from dimension by dot) or dataset type
fields (separated from dataset type name by dot).
"""
dimensions = dataset_type.dimensions
id_key = ResultColumn(logical_table=dataset_type.name, field="dataset_id")
run_key = ResultColumn(logical_table=dataset_type.name, field="run")
data_id_keys = [ResultColumn(logical_table=element, field=None) for element in dimensions.required]
id_key = f"{dataset_type.name}.dataset_id"
run_key = f"{dataset_type.name}.run"
data_id_keys = dimensions.required
for row in self:
values = tuple(row[key] for key in data_id_keys)
data_id = DataCoordinate.from_required_values(dimensions, values)
Expand Down
47 changes: 27 additions & 20 deletions python/lsst/daf/butler/queries/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
DimensionRecordResultSpec,
GeneralResultSpec,
)
from .tree import DatasetSearch, Predicate, QueryTree, make_identity_query_tree
from .tree import DatasetFieldName, DatasetSearch, Predicate, QueryTree, make_identity_query_tree


@final
Expand Down Expand Up @@ -293,37 +293,44 @@ def dimension_records(self, element: str) -> DimensionRecordQueryResults:
result_spec = DimensionRecordResultSpec(element=self._driver.universe[element])
return DimensionRecordQueryResults(self._driver, tree, result_spec)

def dataset_associations(
def general(
self,
dataset_type: DatasetType,
collections: Iterable[str],
dimensions: DimensionGroup,
dimension_fields: Mapping[str, set[str]] = {},
dataset_fields: Mapping[str, set[DatasetFieldName]] = {},
find_first: bool = False,
) -> GeneralQueryResults:
"""Iterate over dataset-collection combinations where the dataset is in
the collection.
"""Execute query returning general result.
Parameters
----------
dataset_type : `DatasetType`
A dataset type object.
collections : `~collections.abc.Iterable` [`str`]
Names of the collections to search. Chained collections are
ignored.
dimensions : `DimensionGroup`
The dimensions that span all fields returned by this query.
dimension_fields : `~collections.abc.Mapping` [`str`, `set`[`str`]], \
optional
Dimension record fields included in this query, the key in the
mapping is dimension name.
dataset_fields : `~collections.abc.Mapping` \
[`str`, `set`[`DatasetFieldName`]], optional
Dataset fields included in this query, the key in the mapping is
dataset type name.
find_first : bool, optional
Whether this query requires find-first resolution for a dataset.
This can only be `True` if exactly one dataset type's fields are
included in the results.
Returns
-------
result : `GeneralQueryResults`
Query result that can be iterated over. The result includes all
columns needed to construct `DatasetRef`, plus ``collection`` and
``timespan`` columns.
Query result that can be iterated over.
"""
_, _, query = self._join_dataset_search_impl(dataset_type, collections)
result_spec = GeneralResultSpec(
dimensions=dataset_type.dimensions,
dimension_fields={},
dataset_fields={dataset_type.name: {"dataset_id", "run", "collection", "timespan"}},
find_first=False,
dimensions=dimensions,
dimension_fields=dimension_fields,
dataset_fields=dataset_fields,
find_first=find_first,
)
return GeneralQueryResults(self._driver, tree=query._tree, spec=result_spec)
return GeneralQueryResults(self._driver, tree=self._tree, spec=result_spec)

def materialize(
self,
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/queries/tree/_column_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ class ResultColumn(NamedTuple):
"""Column associated with the dimension element or dataset type, or `None`
if it is a dimension key column."""

def __str__(self) -> str:
return self.logical_table if self.field is None else f"{self.logical_table}.{self.field}"


class ColumnOrder:
"""Defines the position of columns within a result row and provides helper
Expand Down
11 changes: 7 additions & 4 deletions python/lsst/daf/butler/registry/sql_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
from ..direct_query_driver import DirectQueryDriver
from ..progress import Progress
from ..queries import Query
from ..queries.tree import ResultColumn
from ..registry import (
ArgumentError,
CollectionExpressionError,
Expand Down Expand Up @@ -2498,9 +2497,13 @@ def queryDatasetAssociations(
collections, datasetType, collectionTypes=collectionTypes, flattenChains=flattenChains
)
with self._query() as query:
result = query.dataset_associations(datasetType, resolved_collections)
timespan_key = ResultColumn(logical_table=datasetType.name, field="timespan")
collection_key = ResultColumn(logical_table=datasetType.name, field="collection")
query = query.join_dataset_search(datasetType, resolved_collections)
result = query.general(
datasetType.dimensions,
dataset_fields={datasetType.name: {"dataset_id", "run", "collection", "timespan"}},
)
timespan_key = f"{datasetType.name}.timespan"
collection_key = f"{datasetType.name}.collection"
for ref, row_dict in result.iter_refs(datasetType):
_LOG.debug("row_dict: %s", row_dict)
yield DatasetAssociation(ref, row_dict[collection_key], row_dict[timespan_key])
Expand Down
11 changes: 7 additions & 4 deletions python/lsst/daf/butler/remote_butler/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
DimensionRecord,
DimensionUniverse,
)
from ..queries.tree import ResultColumn
from ..registry import (
CollectionArgType,
CollectionSummary,
Expand Down Expand Up @@ -521,9 +520,13 @@ def queryDatasetAssociations(
collections, datasetType=datasetType, collectionTypes=collectionTypes, flattenChains=flattenChains
)
with self._butler._query() as query:
result = query.dataset_associations(datasetType, resolved_collections)
timespan_key = ResultColumn(logical_table=datasetType.name, field="timespan")
collection_key = ResultColumn(logical_table=datasetType.name, field="collection")
query = query.join_dataset_search(datasetType, resolved_collections)
result = query.general(
datasetType.dimensions,
dataset_fields={datasetType.name: {"dataset_id", "run", "collection", "timespan"}},
)
timespan_key = f"{datasetType.name}.timespan"
collection_key = f"{datasetType.name}.collection"
for ref, row_dict in result.iter_refs(datasetType):
yield DatasetAssociation(ref, row_dict[collection_key], row_dict[timespan_key])

Expand Down

0 comments on commit 519d90b

Please sign in to comment.