Skip to content

Commit

Permalink
Merge pull request #1047 from lsst/tickets/DM-45429
Browse files Browse the repository at this point in the history
DM-45429: Initial implementation of a general query result
  • Loading branch information
andy-slac committed Aug 14, 2024
2 parents f66ddda + 0fbe458 commit b716002
Show file tree
Hide file tree
Showing 20 changed files with 814 additions and 93 deletions.
27 changes: 26 additions & 1 deletion python/lsst/daf/butler/_dataset_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@

__all__ = ("DatasetAssociation",)

from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any
from typing import TYPE_CHECKING, Any

from ._dataset_ref import DatasetRef
from ._dataset_type import DatasetType
from ._timespan import Timespan

if TYPE_CHECKING:
from .queries._general_query_results import GeneralQueryResults


@dataclass(frozen=True, eq=True)
class DatasetAssociation:
Expand All @@ -59,6 +64,26 @@ class DatasetAssociation:
collection (`Timespan` or `None`).
"""

@classmethod
def from_query_result(
cls, result: GeneralQueryResults, dataset_type: DatasetType
) -> Iterator[DatasetAssociation]:
"""Construct dataset associations from the result of general query.
Parameters
----------
result : `GeneralQueryResults`
General query result returned by `Query.general` method. The result
has to include "{dataset_type.name}.timespan" and
"{dataset_type.name}.collection" columns.
dataset_type : `DatasetType`
Dataset type, query has to include this dataset type.
"""
timespan_key = f"{dataset_type.name}.timespan"
collection_key = f"{dataset_type.name}.collection"
for _, refs, row_dict in result.iter_tuples(dataset_type):
yield DatasetAssociation(refs[0], row_dict[collection_key], row_dict[timespan_key])

def __lt__(self, other: Any) -> bool:
# Allow sorting of associations
if not isinstance(other, type(self)):
Expand Down
146 changes: 146 additions & 0 deletions python/lsst/daf/butler/column_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"COLLECTION_NAME_MAX_LENGTH",
)

import datetime
import textwrap
import uuid
from abc import ABC, abstractmethod
Expand All @@ -53,6 +54,7 @@

from . import arrow_utils, ddl
from ._timespan import Timespan
from .pydantic_utils import SerializableRegion, SerializableTime

if TYPE_CHECKING:
from .name_shrinker import NameShrinker
Expand Down Expand Up @@ -87,6 +89,102 @@
# that actually changing the value is a (minor) schema change.


class ColumnValueSerializer(ABC):
"""Class that knows how to serialize and deserialize column values."""

@abstractmethod
def serialize(self, value: Any) -> Any:
"""Convert column value to something that can be serialized.
Parameters
----------
value : `Any`
Column value to be serialized.
Returns
-------
value : `Any`
Column value in serializable format.
"""
raise NotImplementedError

@abstractmethod
def deserialize(self, value: Any) -> Any:
"""Convert serialized value to column value.
Parameters
----------
value : `Any`
Serialized column value.
Returns
-------
value : `Any`
Deserialized column value.
"""
raise NotImplementedError


class _DefaultColumnValueSerializer(ColumnValueSerializer):
"""Default implementation of serializer for basic types."""

def serialize(self, value: Any) -> Any:
# Docstring inherited.
return value

def deserialize(self, value: Any) -> Any:
# Docstring inherited.
return value


class _TypeAdapterColumnValueSerializer(ColumnValueSerializer):
"""Implementation of serializer that uses pydantic type adapter."""

def __init__(self, type_adapter: pydantic.TypeAdapter):
# Docstring inherited.
self._type_adapter = type_adapter

def serialize(self, value: Any) -> Any:
# Docstring inherited.
return value if value is None else self._type_adapter.dump_python(value)

def deserialize(self, value: Any) -> Any:
# Docstring inherited.
return value if value is None else self._type_adapter.validate_python(value)


class _DateTimeColumnValueSerializer(ColumnValueSerializer):
"""Implementation of serializer for ingest_time column. That column can be
either in native database time appearing as `datetime.datetime` on Python
side or integer nanoseconds appearing as astropy.time.Time. We use pydantic
type adapter for astropy time, which serializes it into integer
nanoseconds. datetime is converted to string representation to distinguish
it from integer nanoseconds (timezone handling depends entirely on what
database returns).
"""

def __init__(self) -> None:
self._astropy_adapter = pydantic.TypeAdapter(SerializableTime)

def serialize(self, value: Any) -> Any:
# Docstring inherited.
if value is None:
return None
elif isinstance(value, datetime.datetime):
return value.isoformat()
else:
return self._astropy_adapter.dump_python(value)

def deserialize(self, value: Any) -> Any:
# Docstring inherited.
if value is None:
return None
elif isinstance(value, str):
return datetime.datetime.fromisoformat(value)
else:
return self._astropy_adapter.validate_python(value)


class _BaseColumnSpec(pydantic.BaseModel, ABC):
"""Base class for descriptions of table columns."""

Expand Down Expand Up @@ -134,6 +232,18 @@ def to_arrow(self) -> arrow_utils.ToArrow:
"""
raise NotImplementedError()

@abstractmethod
def serializer(self) -> ColumnValueSerializer:
"""Return object that converts values of this column to or from
serializable format.
Returns
-------
serializer : `ColumnValueSerializer`
A converter instance.
"""
raise NotImplementedError()

def display(self, level: int = 0, tab: str = " ") -> list[str]:
"""Return a human-reader-focused string description of this column as
a list of lines.
Expand Down Expand Up @@ -178,6 +288,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.uint64(), nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _DefaultColumnValueSerializer()


@final
class StringColumnSpec(_BaseColumnSpec):
Expand All @@ -198,6 +312,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.string(), nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _DefaultColumnValueSerializer()


@final
class HashColumnSpec(_BaseColumnSpec):
Expand All @@ -224,6 +342,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
nullable=self.nullable,
)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _DefaultColumnValueSerializer()


@final
class FloatColumnSpec(_BaseColumnSpec):
Expand All @@ -238,6 +360,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_primitive(self.name, pa.float64(), nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _DefaultColumnValueSerializer()


@final
class BoolColumnSpec(_BaseColumnSpec):
Expand All @@ -251,6 +377,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_primitive(self.name, pa.bool_(), nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _DefaultColumnValueSerializer()


@final
class UUIDColumnSpec(_BaseColumnSpec):
Expand All @@ -265,6 +395,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_uuid(self.name, nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(self.pytype))


@final
class RegionColumnSpec(_BaseColumnSpec):
Expand All @@ -284,6 +418,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_region(self.name, nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(SerializableRegion))


@final
class TimespanColumnSpec(_BaseColumnSpec):
Expand All @@ -299,6 +437,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
# Docstring inherited.
return arrow_utils.ToArrow.for_timespan(self.name, nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(self.pytype))


@final
class DateTimeColumnSpec(_BaseColumnSpec):
Expand All @@ -315,6 +457,10 @@ def to_arrow(self) -> arrow_utils.ToArrow:
assert self.nullable is not None, "nullable=None should be resolved by validators"
return arrow_utils.ToArrow.for_datetime(self.name, nullable=self.nullable)

def serializer(self) -> ColumnValueSerializer:
# Docstring inherited.
return _DateTimeColumnValueSerializer()


ColumnSpec = Annotated[
Union[
Expand Down
21 changes: 4 additions & 17 deletions python/lsst/daf/butler/direct_butler/_direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2193,32 +2193,19 @@ def dimensions(self) -> DimensionUniverse:
# Docstring inherited.
return self._registry.dimensions

@contextlib.contextmanager
def _query(self) -> Iterator[Query]:
def _query(self) -> contextlib.AbstractContextManager[Query]:
# Docstring inherited.
with self._query_driver(self._registry.defaults.collections, self.registry.defaults.dataId) as driver:
yield Query(driver)
return self._registry._query()

@contextlib.contextmanager
def _query_driver(
self,
default_collections: Iterable[str],
default_data_id: DataCoordinate,
) -> Iterator[DirectQueryDriver]:
) -> contextlib.AbstractContextManager[DirectQueryDriver]:
"""Set up a QueryDriver instance for use with this Butler. Although
this is marked as a private method, it is also used by Butler server.
"""
with self._caching_context():
driver = DirectQueryDriver(
self._registry._db,
self.dimensions,
self._registry._managers,
self._registry.dimension_record_cache,
default_collections=default_collections,
default_data_id=default_data_id,
)
with driver:
yield driver
return self._registry._query_driver(default_collections, default_data_id)

def _preload_cache(self) -> None:
"""Immediately load caches that are used for common operations."""
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
DataCoordinateResultPageConverter,
DatasetRefResultPageConverter,
DimensionRecordResultPageConverter,
GeneralResultPageConverter,
ResultPageConverter,
ResultPageConverterContext,
)
Expand Down Expand Up @@ -271,6 +272,8 @@ def _create_result_page_converter(self, spec: ResultSpec, builder: QueryBuilder)
return DatasetRefResultPageConverter(
spec, self.get_dataset_type(spec.dataset_type_name), context
)
case GeneralResultSpec():
return GeneralResultPageConverter(spec, context)
case _:
raise NotImplementedError(f"Result type '{spec.result_type}' not yet implemented")

Expand Down
Loading

0 comments on commit b716002

Please sign in to comment.