Skip to content

Commit

Permalink
Merge pull request #38 from lsst-dm/tickets/DM-43105
Browse files Browse the repository at this point in the history
DM-43105: Add migration script for dimension universe 7
  • Loading branch information
dhirving committed Apr 8, 2024
2 parents 5bbff68 + 9e38d70 commit 9a81b3b
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 27 deletions.
9 changes: 8 additions & 1 deletion doc/lsst.daf.butler_migrate/migrations/dimensions-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,11 @@ Supports group and day_obs as dimensions.
- Rename ``group_name`` in the exposure table to ``group``.
- Update the ``exposure`` table so ``group`` and ``day_obs`` are foreign keys to the new tables.
- Remove ``group_id`` from ``exposure`` table.
- Update ``config:dimensions.json`` to universe 6.
- Update ``config:dimensions.json`` to universe 6.

daf_butler 6 to 7
=================

Migration script: `352c30854bb0.py <https://github.com/lsst-dm/daf_butler_migrate/blob/main/migrations/dimensions-config/352c30854bb0.py>`_

Adds ``can_see_sky`` column to the ``exposure`` table.
35 changes: 9 additions & 26 deletions migrations/dimensions-config/1fae088c80b6.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
from collections.abc import Iterator
from typing import Any, TypeAlias

import alembic
import sqlalchemy as sa
from alembic import op
from lsst.daf.butler import Timespan
from lsst.daf.butler_migrate.butler_attributes import ButlerAttributes
from lsst.daf.butler_migrate.migration_context import MigrationContext
from lsst.daf.butler_migrate.naming import make_string_length_constraint
from lsst.daf.butler_migrate.timespan import create_timespan_column_definitions, format_timespan_value
from lsst.utils import doImportType
Expand Down Expand Up @@ -45,7 +44,7 @@ def upgrade() -> None:
- Remove ``group_id`` from ``exposure`` table.
- Update ``config:dimensions.json`` to universe 6.
"""
ctx = _Context()
ctx = MigrationContext()
_lock_exposure_table(ctx)
_validate_initial_dimension_universe(ctx)
_migrate_day_obs(ctx)
Expand All @@ -58,7 +57,7 @@ def downgrade() -> None:
raise NotImplementedError()


def _lock_exposure_table(ctx: _Context) -> None:
def _lock_exposure_table(ctx: MigrationContext) -> None:
# In this migration we generate new tables based on the content of the
# exposure table, so make sure that it is not modified while we are
# working.
Expand All @@ -74,7 +73,7 @@ def _lock_exposure_table(ctx: _Context) -> None:
ctx.bind.execute(sa.text(f"LOCK TABLE {schema}exposure IN EXCLUSIVE MODE"))


def _validate_initial_dimension_universe(ctx: _Context) -> None:
def _validate_initial_dimension_universe(ctx: MigrationContext) -> None:
config = ctx.mig_context.config
allow_mismatch = config is not None and "1" == config.get_section_option(
"daf_butler_migrate_options", "allow_dimension_universe_mismatch"
Expand All @@ -94,7 +93,7 @@ def _validate_initial_dimension_universe(ctx: _Context) -> None:
raise


def _migrate_groups(ctx: _Context) -> None:
def _migrate_groups(ctx: MigrationContext) -> None:
# Create group table
_LOG.info("Creating group table")
check_constraints = []
Expand Down Expand Up @@ -169,7 +168,7 @@ def _migrate_groups(ctx: _Context) -> None:
)


def _migrate_day_obs(ctx: _Context) -> None:
def _migrate_day_obs(ctx: MigrationContext) -> None:
# Before doing anything else, generate the rows for the new day_obs table
# from the data in the exposure table. This is prone to failure due to the
# need to import instrument classes.
Expand Down Expand Up @@ -230,12 +229,12 @@ def _migrate_day_obs(ctx: _Context) -> None:
)


def _migrate_dimensions_json(ctx: _Context) -> None:
def _migrate_dimensions_json(ctx: MigrationContext) -> None:
_LOG.info("Updating dimensions.json in ButlerAttributes")
ctx.attributes.replace_dimensions_json(6)


def _generate_day_obs_rows(ctx: _Context) -> Iterator[dict]:
def _generate_day_obs_rows(ctx: MigrationContext) -> Iterator[dict]:
exposure_table = ctx.get_table("exposure")
select = sa.select(
exposure_table.columns["instrument"],
Expand Down Expand Up @@ -273,28 +272,12 @@ def _get_day_obs_offset(instrument_name: str, instrument: _Instrument, day_obs:
return round(offset.to_value("s"))


class _Context:
def __init__(self) -> None:
self.mig_context = alembic.context.get_context()
self.schema = self.mig_context.version_table_schema
bind = self.mig_context.bind
assert bind is not None, "Can't run offline -- need access to database to migrate data."
self.bind = bind
self.dialect = self.bind.dialect.name
self.is_sqlite = self.dialect == "sqlite"
self.metadata = sa.schema.MetaData(schema=self.schema)
self.attributes = ButlerAttributes(self.bind, self.schema)

def get_table(self, table_name: str) -> sa.Table:
return sa.schema.Table(table_name, self.metadata, autoload_with=self.bind, schema=self.schema)


_Instrument: TypeAlias = Any
"""A dynamically loaded lsst.obs_base.Instrument."""


class _InstrumentFetcher:
def __init__(self, ctx: _Context) -> None:
def __init__(self, ctx: MigrationContext) -> None:
self._instruments: dict[str, _Instrument] = {}
self._ctx = ctx

Expand Down
127 changes: 127 additions & 0 deletions migrations/dimensions-config/352c30854bb0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Migration script for dimensions.yaml namespace=daf_butler version=7.
Revision ID: 352c30854bb0
Revises: 1fae088c80b6
Create Date: 2024-03-28 12:14:53.021101
"""

import logging

import sqlalchemy
from alembic import op
from lsst.daf.butler_migrate.migration_context import MigrationContext

# revision identifiers, used by Alembic.
revision = "352c30854bb0"
down_revision = "1fae088c80b6"
branch_labels = None
depends_on = None

# Logger name should start with lsst to work with butler logging option.
_LOG = logging.getLogger(f"lsst.{__name__}")

_NEW_COLUMN = "can_see_sky"
_TABLE = "exposure"
_OBSERVATION_TYPE_COLUMN = "observation_type"


def upgrade() -> None:
"""Upgrade from daf_butler universe version 6 to version 7 following update
of dimensions.yaml in DM-43101.
Adds ``can_see_sky`` column to the exposure table, and sets its initial
values based on the the ``observation_type`` column.
"""

ctx = MigrationContext()

_LOG.info("Checking that this is an unmodified daf_butler universe 6 repo")
ctx.attributes.validate_dimensions_json(6)

_LOG.info("Adding can_see_sky column to exposure table")
op.add_column(
_TABLE, sqlalchemy.Column(_NEW_COLUMN, sqlalchemy.Boolean, nullable=True), schema=ctx.schema
)

# Set values for existing data based on the exposure's observation_type,
# which is closely correlated with whether the sky is visible in the
# exposure.
#
# Any exposures with observation types not present not in the two calls to
# _populate_values below will be left as NULL.
_LOG.info("Populating can_see_sky column")
table = ctx.get_table(_TABLE)
_populate_values(
table,
True,
[
"science",
"object",
"standard",
"sky flat",
"standard_star",
"skyflat",
"focus",
"focusing",
"exp",
"skyexp",
],
)
_populate_values(table, False, ["dark", "bias", "agexp", "domeflat", "dome flat", "zero", "spot"])

unhandled_observation_types = _find_unhandled_observation_types(ctx)
if unhandled_observation_types:
_LOG.info(
"WARNING: No default value for can_see_sky is known for the following observation types:\n"
f"{unhandled_observation_types}\n"
"Exposure records with these observation types will have a NULL can_see_sky."
)
else:
_LOG.info("...can_see_sky values were set for all exposure records.")

_LOG.info("Updating dimensions.json in ButlerAttributes")
ctx.attributes.replace_dimensions_json(7)


def downgrade() -> None:
"""Perform schema downgrade."""
ctx = MigrationContext()

_LOG.info("Checking that this is an unmodified daf_butler universe 7 repo")
ctx.attributes.validate_dimensions_json(7)

_LOG.info("dropping can_see_sky column")
op.drop_column(_TABLE, _NEW_COLUMN, schema=ctx.schema)

_LOG.info("Updating dimensions.json in ButlerAttributes")
ctx.attributes.replace_dimensions_json(6)


def _populate_values(table: sqlalchemy.Table, can_see_sky: bool, observation_types: list[str]) -> None:
"""Set can_see_sky column to the specified value for all rows in the
exposure table that have an observation_type in the specified list.
"""
op.execute(
table.update()
.values({_NEW_COLUMN: can_see_sky})
.where(table.columns[_OBSERVATION_TYPE_COLUMN].in_(observation_types))
.where(table.columns[_NEW_COLUMN].is_(None))
)


def _find_unhandled_observation_types(ctx: MigrationContext) -> list[str]:
"""Return a list of observation types present in the exposure table that
have a NULL value for the ``can_see_sky`` column at least one of their
rows.
"""
table = ctx.get_table(_TABLE)
return list(
ctx.bind.execute(
sqlalchemy.select(table.columns[_OBSERVATION_TYPE_COLUMN])
.distinct()
.where(table.columns[_NEW_COLUMN].is_(None))
)
.scalars()
.all()
)
68 changes: 68 additions & 0 deletions python/lsst/daf/butler_migrate/migration_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# This file is part of daf_butler_migrate.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("MigrationContext",)


import alembic
import sqlalchemy

from .butler_attributes import ButlerAttributes


class MigrationContext:
"""Provides access to commonly-needed objects derived from the alembic
migration context.
"""

def __init__(self) -> None:
self.mig_context = (
alembic.context.get_context()
) #: Alembic migration context for the DB being migrated.
self.schema = (
self.mig_context.version_table_schema
) #: Database schema name for the repository being migrated.
bind = self.mig_context.bind
assert bind is not None, "Can't run offline -- need access to database to migrate data."
self.bind = bind #: A SQLAlchemy connection for the database being migrated.
self.dialect = self.bind.dialect.name #: SQLAlchemy dialect for the database being migrated.
self.is_sqlite = self.dialect == "sqlite" #: True if the database being migrated is SQLite.
self.metadata = sqlalchemy.schema.MetaData(
schema=self.schema
) # SQLAlchemy MetaData object for the DB being migrated.
self.attributes = ButlerAttributes(self.bind, self.schema)

def get_table(self, table_name: str) -> sqlalchemy.Table:
"""Create a SQLAlchemy table object for the current database.
Parameters
----------
table_name : `str`
Name of the table.
Returns
-------
table : ``sqlalchemy.Table``
Table object.
"""
return sqlalchemy.schema.Table(table_name, self.metadata, autoload_with=self.bind, schema=self.schema)

0 comments on commit 9a81b3b

Please sign in to comment.