Skip to content

Commit

Permalink
Create and populate day_obs table
Browse files Browse the repository at this point in the history
  • Loading branch information
dhirving committed Mar 13, 2024
1 parent fffc04c commit a25b244
Showing 1 changed file with 58 additions and 8 deletions.
66 changes: 58 additions & 8 deletions migrations/dimensions-config/1fae088c80b6.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
"""

from __future__ import annotations

import logging
from dataclasses import dataclass
from collections.abc import Iterator

import alembic
import sqlalchemy as sa
from alembic import op
from lsst.daf.butler import Timespan

# revision identifiers, used by Alembic.
revision = "1fae088c80b6"
Expand All @@ -25,17 +28,18 @@

def upgrade() -> None:
"""Perform schema upgrade."""
_migrate_groups()
ctx = _Context()
_migrate_day_obs(ctx)
_migrate_groups(ctx)


def downgrade() -> None:
"""Perform schema downgrade."""
raise NotImplementedError()


def _migrate_groups() -> None:
ctx = _Context()

def _migrate_groups(ctx: _Context) -> None:
# Create group table
table = op.create_table(
"group",
sa.Column("instrument", sa.String(32), primary_key=True),
Expand All @@ -48,7 +52,7 @@ def _migrate_groups() -> None:
schema=ctx.schema,
)

# Populate the new table based on the data in the exposure table.
# Populate group table based on the data in the exposure table.
exposure_table = ctx.get_table("exposure")
select = sa.select(
exposure_table.columns["instrument"],
Expand Down Expand Up @@ -85,12 +89,58 @@ def _migrate_groups() -> None:
)


def _migrate_day_obs(ctx: _Context) -> None:
# Before doing anything else, generate the rows for the new day_obs table
# from the data in the exposure table. This is prone to failure due to the
# need to import instrument classes.
day_obs_rows = list(_generate_day_obs_rows(ctx))

# Create day_obs table
table = op.create_table(
"day_obs",
sa.Column("instrument", sa.String(32), primary_key=True),
sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=False),
sa.Column("timespan_begin", sa.BigInteger),
sa.Column("timespan_end", sa.BigInteger),
sa.schema.ForeignKeyConstraint(
columns=["instrument"],
refcolumns=["instrument.name"],
name="fkey_day_obs_instrument_name_instrument",
),
schema=ctx.schema,
)

# Populate the day_obs table based on the data in the exposure table.
op.bulk_insert(table, day_obs_rows)


def _generate_day_obs_rows(ctx: _Context) -> Iterator[dict]:
exposure_table = ctx.get_table("exposure")
select = sa.select(
exposure_table.columns["instrument"],
exposure_table.columns["day_obs"],
).distinct()
rows = ctx.bind.execute(select).all()

for row in rows:
day_obs = row.day_obs
offset = 0
timespan = Timespan.from_day_obs(day_obs, offset).to_simple()
yield {
"instrument": row.instrument,
"id": day_obs,
"timespan_begin": timespan[0],
"timespan_end": timespan[1],
}


class _Context:
def __init__(self) -> None:
self.mig_context = alembic.context.get_context()
self.schema = self.mig_context.version_table_schema
self.bind = self.mig_context.bind
assert self.bind is not None
bind = self.mig_context.bind
assert bind is not None, "Can't run offline -- need access to database to migrate data."
self.bind = bind
self.metadata = sa.schema.MetaData(schema=self.schema)

def get_table(self, table_name: str) -> sa.Table:
Expand Down

0 comments on commit a25b244

Please sign in to comment.