Skip to content

Commit

Permalink
WIP on DirectQueryDriver.
Browse files Browse the repository at this point in the history
DO NOT MERGE.
  • Loading branch information
TallJimbo committed Mar 19, 2024
1 parent 37d676b commit 1e537b4
Show file tree
Hide file tree
Showing 10 changed files with 616 additions and 91 deletions.
15 changes: 6 additions & 9 deletions python/lsst/daf/butler/direct_query_driver/_convert_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,17 @@
import sqlalchemy

from ..dimensions import DimensionRecordSet
from ..queries.driver import DimensionRecordResultPage, PageKey
from ..queries.tree import ColumnSet

if TYPE_CHECKING:
from ..name_shrinker import NameShrinker
from ..queries.driver import DimensionRecordResultPage, PageKey
from ..queries.result_specs import DimensionRecordResultSpec


def convert_dimension_record_results(
raw_rows: Iterable[sqlalchemy.Row],
spec: DimensionRecordResultSpec,
next_key: PageKey | None,
name_shrinker: NameShrinker,
) -> DimensionRecordResultPage:
"""Convert a raw SQL result iterable into a page of `DimensionRecord`
query results.
Expand All @@ -60,19 +59,17 @@ def convert_dimension_record_results(
Specification for result objects.
next_key : `PageKey` or `None`
Key for the next page to add into the returned page object.
name_shrinker : `NameShrinker`
Object used to ensure dataset type field names fit inside the database
engine's identifier size limit. Unnecessary for dimension fields,
which are restricted to fit in the identifier size limit when a
universe is initialized.
Returns
-------
result_page : `DimensionRecordResultPage`
Page object that holds a `DimensionRecord` container.
"""
record_set = DimensionRecordSet(spec.element)
column_mapping = [(field, field) for field in spec.element.schema.names]
# Mapping from DimensionRecord attribute name to qualified column name.
column_mapping = list(zip(spec.element.schema.dimensions.names, spec.element.dimensions.names))
for field in spec.element.schema.remainder.names:
column_mapping.append((field, ColumnSet.get_qualified_name(spec.element.name, field)))
record_cls = spec.element.RecordClass
if not spec.element.temporal:
for raw_row in raw_rows:
Expand Down
243 changes: 176 additions & 67 deletions python/lsst/daf/butler/direct_query_driver/_driver.py

Large diffs are not rendered by default.

26 changes: 19 additions & 7 deletions python/lsst/daf/butler/direct_query_driver/_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,18 +494,30 @@ def join(self, other: QueryJoiner) -> QueryJoiner:
representing a logical AND (with no attempt at deduplication).
"""
join_on: list[sqlalchemy.ColumnElement] = []
for dimension_name in self.dimension_keys.keys() & other.dimension_keys.keys():
for column1, column2 in itertools.product(
self.dimension_keys[dimension_name], other.dimension_keys[dimension_name]
):
join_on.append(column1 == column2)
for dimension_name in other.dimension_keys.keys():
if dimension_name in self.dimension_keys:
for column1, column2 in itertools.product(
self.dimension_keys[dimension_name], other.dimension_keys[dimension_name]
):
join_on.append(column1 == column2)
self.dimension_keys[dimension_name].extend(other.dimension_keys[dimension_name])
if self.from_clause is None:
self.from_clause = other.from_clause
elif other.from_clause is not None:
self.from_clause = self.from_clause.join(other.from_clause, onclause=sqlalchemy.and_(*join_on))
self.where_terms += other.where_terms
join_on_sql: sqlalchemy.ColumnElement[bool]
match len(join_on):
case 0:
join_on_sql = sqlalchemy.true()

Check warning on line 510 in python/lsst/daf/butler/direct_query_driver/_query_builder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_query_builder.py#L510

Added line #L510 was not covered by tests
case 1:
(join_on_sql,) = join_on

Check warning on line 512 in python/lsst/daf/butler/direct_query_driver/_query_builder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_query_builder.py#L512

Added line #L512 was not covered by tests
case _:
join_on_sql = sqlalchemy.and_(*join_on)
self.from_clause = self.from_clause.join(other.from_clause, onclause=join_on_sql)
for logical_table, fields in other.fields.items():
self.fields[logical_table].update(fields)
self.timespans.update(other.timespans)
self.special.update(other.special)
self.where_terms += other.where_terms
if other.name_shrinker:
if self.name_shrinker is not None:
self.name_shrinker.update(other.name_shrinker)

Check warning on line 523 in python/lsst/daf/butler/direct_query_driver/_query_builder.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_query_builder.py#L523

Added line #L523 was not covered by tests
Expand Down
15 changes: 15 additions & 0 deletions python/lsst/daf/butler/direct_query_driver/_query_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ def iter_mandatory(self) -> Iterator[DimensionElement]:
class QueryProjectionPlan:
"""A struct describing the "projection" stage of a butler query.
This struct evaluates to `True` in boolean contexts if either
`needs_dimension_distinct` or `needs_dataset_distict` are `True`. In other
cases the projection is effectively a no-op, because the "joins"-stage rows
are already unique.
See `QueryPlan` and `QueryPlan.projection` for additional information.
"""

Expand All @@ -185,6 +190,16 @@ class QueryProjectionPlan:
used to make post-projection rows unique.
"""

needs_dataset_distinct: bool = False
"""If `True`, the projection columns do not include collection-specific
dataset fields that were present in the "joins" stage, and hence a SELECT
DISTINCT [ON] or GROUP BY must be added to make post-projection rows
unique.
"""

def __bool__(self) -> bool:
return self.needs_dataset_distinct or self.needs_dataset_distinct

find_first_dataset: str | None = None
"""If not `None`, this is a find-first query for this dataset.
Expand Down
16 changes: 14 additions & 2 deletions python/lsst/daf/butler/direct_query_driver/_sql_column_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,13 @@ def apply_logical_and(
self, originals: qt.PredicateOperands, results: tuple[sqlalchemy.ColumnElement[bool], ...]
) -> sqlalchemy.ColumnElement[bool]:
# Docstring inherited.
return sqlalchemy.and_(*results)
match len(results):
case 0:
return sqlalchemy.true()
case 1:
return results[0]
case _:
return sqlalchemy.and_(*results)

def apply_logical_or(
self,
Expand All @@ -236,7 +242,13 @@ def apply_logical_or(
flags: PredicateVisitFlags,
) -> sqlalchemy.ColumnElement[bool]:
# Docstring inherited.
return sqlalchemy.or_(*results)
match len(results):
case 0:
return sqlalchemy.false()

Check warning on line 247 in python/lsst/daf/butler/direct_query_driver/_sql_column_visitor.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_query_driver/_sql_column_visitor.py#L247

Added line #L247 was not covered by tests
case 1:
return results[0]
case _:
return sqlalchemy.or_(*results)

def apply_logical_not(
self, original: qt.PredicateLeaf, result: sqlalchemy.ColumnElement[bool], flags: PredicateVisitFlags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,13 +740,10 @@ def _finish_query_builder(
# foreign key, and right now it isn't.
sql_projection.joiner.where(self._static.dataset.c.dataset_type_id == self._dataset_type_id)

Check warning on line 741 in python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py#L741

Added line #L741 was not covered by tests
sql_projection.distinct = (
# If there are multiple collections and we're searching any
# non-RUN collection, we could find the same dataset twice,
# which would yield duplicate rows unless the collection is
# included to make those rows unique.
# If there are multiple collections, this subquery might have
# non-unique rows.
len(collections) > 1
and not run_collections_only
and ("collection_key" not in fields)
and not fields
)
return sql_projection

Expand Down
28 changes: 28 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,34 @@ def make_relation(

@abstractmethod
def make_query_joiner(self, collections: Sequence[CollectionRecord], fields: Set[str]) -> QueryJoiner:
"""Make a `..direct_query_driver.QueryJoiner` that represents a search
for datasets of this type.
Parameters
----------
collections : `~collections.abc.Sequence` [ `CollectionRecord` ]
Collections to search, in order, after filtering out collections
with no datasets of this type via collection summaries.
fields : `~collections.abc.Set` [ `str` ]
Names of fields to make available in the joiner. Options include:
- ``dataset_id`` (UUID)
- ``run` (collection name, `str`)
- ``collection`` (collection name, `str`)
- ``collection_key`` (collection primary key, manager-dependent)
- ``timespan`` (validity range, or unbounded for non-calibrations)
- ``ingest_date`` (time dataset was ingested into repository)
Dimension keys for the dataset type's required dimensions are
always included.
Returns
-------
joiner : `..direct_query_driver.QueryJoiner`
A query-construction object representing a table or subquery. If
``fields`` is empty or ``len(collections) <= 1``, this is
guaranteed to have rows that are unique over dimension keys.
"""
raise NotImplementedError()

datasetType: DatasetType
Expand Down
61 changes: 61 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_dimensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,31 @@ def make_spatial_join_relation(

@abstractmethod
def make_query_joiner(self, element: DimensionElement, fields: Set[str]) -> QueryJoiner:
"""Make a `..direct_query_driver.QueryJoiner` that represents a
dimension element table.
Parameters
----------
element : `DimensionElement`
Dimension element the table corresponds to.
fields : `~collections.abc.Set` [ `str` ]
Names of fields to make available in the joiner. These can be any
metadata or alternate key field in the element's schema, including
the special ``region`` and ``timespan`` fields. Dimension keys in
the element's schema are always included.
Returns
-------
joiner : `..direct_query_driver.QueryJoiner`
A query-construction object representing a table or subquery. This
is guaranteed to have rows that are unique over dimension keys and
all possible key values for this dimension, so joining in a
dimension element table:
- never introduces duplicates into the query's result rows;
- never restricts the query's rows *except* to ensure
required-implied relationships are followed.
"""
raise NotImplementedError()

@abstractmethod
Expand All @@ -370,6 +395,42 @@ def process_query_overlaps(
predicate: Predicate,
join_operands: Iterable[DimensionGroup],
) -> tuple[Predicate, QueryBuilder]:
"""Process a query's WHERE predicate and dimensions to handle spatial
and temporal overlaps.
Parameters
----------
dimensions : `..dimensions.DimensionGroup`
Full dimensions of all tables to be joined into the query (even if
they are not included in the query results).
predicate : `..queries.tree.Predicate`
Boolean column expression that may contain user-provided spatial
and/or temporal overlaps intermixed with other constraints.
join_operands : `~collections.abc.Iterable` [ \
`..dimensions.DimensionGroup` ]
Dimensions of tables or subqueries that are already going to be
joined into the query that may establish their own spatial or
temporal relationships (e.g. a dataset search with both ``visit``
and ``patch`` dimensions).
Returns
-------
predicate : `..queries.tree.Predicate`
A version of the given predicate that preserves the overall
behavior of the filter while possibly rewriting overlap expressions
that have been partially moved into ``builder`` as some combination
of new nested predicates, joins, and postprocessing.
builder : `..direct_query_driver.QueryBuilder`
A query-construction helper object that includes any initial joins
and postprocessing needed to handle overlap expression extracted
from the original predicate.
Notes
-----
Implementations must delegate to `.queries.overlaps.OverlapsVisitor`
(possibly by subclassing it) to ensure "automatic" spatial and temporal
joins are added consistently by all query-construction implementations.
"""
raise NotImplementedError()

universe: DimensionUniverse
Expand Down
Loading

0 comments on commit 1e537b4

Please sign in to comment.