Skip to content

Commit

Permalink
Add naive update-day-obs command
Browse files Browse the repository at this point in the history
This implementation is a proof of concept that uses standard
butler APIs to do the updates. It may be too slow for large
repositories.
  • Loading branch information
timj committed Mar 6, 2024
1 parent 52ab7c7 commit 7a9c064
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/lsst.daf.butler_migrate/command-line.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ There are also few commands that were added for specific cases, not generally us
- ``dump-schema``
- ``set-namespace``
- ``rewrite-sqlite-registry``
- ``update-day-obs``

Sections below describe individual commands and their options.

Expand Down
12 changes: 12 additions & 0 deletions python/lsst/daf/butler_migrate/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..opt import (
class_argument,
dry_run_option,
instrument_argument,
manager_argument,
mig_path_exist_option,
mig_path_option,
Expand Down Expand Up @@ -183,3 +184,14 @@ def set_namespace(**kwargs: Any) -> None:
def dump_schema(**kwargs: Any) -> None:
"""Dump database schema in human-readable format."""
script.migrate_dump_schema(**kwargs)


@migrate.command(
short_help="Recalculate the day_obs values for exposures and visit for the given instrument.",
cls=ButlerCommand,
)
@repo_argument(required=True)
@instrument_argument()
def update_day_obs(**kwargs: Any) -> None:
"""Update the day_obs values if needed."""
script.update_day_obs(**kwargs)

Check warning on line 197 in python/lsst/daf/butler_migrate/cli/cmd/commands.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/cli/cmd/commands.py#L197

Added line #L197 was not covered by tests
17 changes: 17 additions & 0 deletions python/lsst/daf/butler_migrate/cli/opt/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@

from __future__ import annotations

__all__ = [

Check warning on line 24 in python/lsst/daf/butler_migrate/cli/opt/arguments.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/cli/opt/arguments.py#L24

Added line #L24 was not covered by tests
"tree_name_argument",
"class_argument",
"version_argument",
"revision_argument",
"manager_argument",
"namespace_argument",
"tables_argument",
"instrument_argument",
]

from lsst.daf.butler.cli.utils import MWArgumentDecorator

tree_name_argument = MWArgumentDecorator(
Expand Down Expand Up @@ -77,3 +88,9 @@
required=False,
nargs=-1,
)

instrument_argument = MWArgumentDecorator(

Check warning on line 92 in python/lsst/daf/butler_migrate/cli/opt/arguments.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/cli/opt/arguments.py#L92

Added line #L92 was not covered by tests
"instrument",
help="INSTRUMENT is the name of the instrument to use.",
required=True,
)
1 change: 1 addition & 0 deletions python/lsst/daf/butler_migrate/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@
from .migrate_trees import migrate_trees
from .migrate_upgrade import migrate_upgrade
from .rewrite_sqlite_registry import rewrite_sqlite_registry
from .update_day_obs import update_day_obs
173 changes: 173 additions & 0 deletions python/lsst/daf/butler_migrate/script/update_day_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# This file is part of daf_butler_migrate.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ["update_day_obs"]

import logging
from typing import Any

from lsst.daf.butler import Butler, DimensionRecord
from lsst.utils import doImportType
from lsst.utils.iteration import chunk_iterable

_LOG = logging.getLogger(__name__)


def updated_record(rec: DimensionRecord, **kwargs: Any) -> DimensionRecord:
"""Create a new record with modified fields.
Parameters
----------
rec : `lsst.daf.butler.DimensionRecord`
Record to be cloned.
**kwargs : `typing.Any`
Values to modify on copy. The keys must be known to the record.
Returns
-------
new : `lsst.daf.butler.DimensionRecord`
New record with updated values.
"""
d = rec.toDict()

Check warning on line 51 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L51

Added line #L51 was not covered by tests
# Should validate but this is not a general function for now.
d.update(kwargs)
return type(rec)(**d)

Check warning on line 54 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L53-L54

Added lines #L53 - L54 were not covered by tests


def update_day_obs(repo: str, instrument: str) -> None:
"""Update the day_obs for the given instrument.
Parameters
----------
repo : `str`
URI of butler repository to update.
instrument : `str`
Name of instrument to use to update records.
Notes
-----
This update code depends on being able to import the python instrument
class registered with the instrument record.
"""
# Connect to the butler.
butler = Butler.from_config(repo, writeable=True)

Check warning on line 73 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L73

Added line #L73 was not covered by tests

# Need the instrument class, since that is used to calculate day_obs.
# Do not depend directly on pipe_base but to load this class pipe_base
# will have to be available.
instr_records = list(butler.registry.queryDimensionRecords("instrument", instrument=instrument))

Check warning on line 78 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L78

Added line #L78 was not covered by tests
if len(instr_records) != 1:
if not instr_records:
raise RuntimeError(f"Unable to find an instrument record for instrument named {instrument}.")

Check warning on line 81 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L81

Added line #L81 was not covered by tests
else:
raise AssertionError(f"Impossibly got more than one instrument record named {instrument}.")
instrument_class_name = instr_records[0].class_name
instrument_cls = doImportType(instrument_class_name)
instr = instrument_cls()
translator = instr.translatorClass

Check warning on line 87 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L83-L87

Added lines #L83 - L87 were not covered by tests
if translator is None:
raise RuntimeError(

Check warning on line 89 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L89

Added line #L89 was not covered by tests
f"Instrument class {instrument_class_name} has no registered translator class."
" Unable to calculate the correct observing day."
)

# The naive approach is to query all the records and recalculate the
# day_obs and then update them all in one big transaction. This will
# be fine for a small repo but catastrophic with millions of exposure
# and visit records.
exposures_to_be_updated = {}
exposures = butler.registry.queryDimensionRecords("exposure", instrument=instrument)
counter = 0

Check warning on line 100 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L98-L100

Added lines #L98 - L100 were not covered by tests
for exp in exposures:
offset = translator.observing_date_to_offset(exp.timespan.begin)
day_obs = translator.observing_date_to_observing_day(exp.timespan.begin, offset)

Check warning on line 103 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L102-L103

Added lines #L102 - L103 were not covered by tests
if day_obs != exp.day_obs:
# Need to update the record. Immutable so need a copy.
exposures_to_be_updated[exp.id] = updated_record(exp, day_obs=day_obs)
counter += 1

Check warning on line 107 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L106-L107

Added lines #L106 - L107 were not covered by tests

_LOG.info(

Check warning on line 109 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L109

Added line #L109 was not covered by tests
"Number of exposure records needing to be updated: %d / %d", len(exposures_to_be_updated), counter
)

visits_to_be_updated = {}

Check warning on line 113 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L113

Added line #L113 was not covered by tests

# Work out the visits that need to be updated given the exposures we have
# updated. Chunk the queries.
for exposure_ids in chunk_iterable(exposures_to_be_updated, chunk_size=1_000):

# If there are modified exposures associated visits will have to be
# located and updated.
visit_defs = butler.registry.queryDimensionRecords(

Check warning on line 121 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L121

Added line #L121 was not covered by tests
"visit_definition",
where="exposure in (exps)",
bind={"exps": list(exposure_ids)},
instrument=instrument,
)

visit_to_exposure = {}

Check warning on line 128 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L128

Added line #L128 was not covered by tests
for defn in visit_defs:
# We do not need to store all the exposures along with the visit
# since by definition a visit has the same day_obs across all
# exposures.
visit_to_exposure[defn.visit] = defn.exposure

Check warning on line 133 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L133

Added line #L133 was not covered by tests

# Now retrieve that visits themselves.
visits = butler.registry.queryDimensionRecords(

Check warning on line 136 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L136

Added line #L136 was not covered by tests
"visit",
where="visit in (visits)",
bind={"visits": list(visit_to_exposure)},
instrument=instrument,
)

for visit in visits:
exposure_id = visit_to_exposure[visit.id]

Check warning on line 144 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L144

Added line #L144 was not covered by tests
# Index by exposure ID for later batching of inserts.
visits_to_be_updated[exposure_id] = updated_record(

Check warning on line 146 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L146

Added line #L146 was not covered by tests
visit, day_obs=exposures_to_be_updated[exposure_id].day_obs
)

_LOG.info("Number of visit records needing to be updated: %d", len(visits_to_be_updated))

Check warning on line 150 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L150

Added line #L150 was not covered by tests

# Batch inserts in smallish transactions so that on restart we will
# be able to ignore records that have already been fixed. It is important
# that visits are updated when the exposures are updated.
counter = 0

Check warning on line 155 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L155

Added line #L155 was not covered by tests
for exposure_ids in chunk_iterable(exposures_to_be_updated, chunk_size=1_000):
with butler.transaction():
counter += 1
_LOG.info("Updating exposure/visit records (chunk %d)", counter)

Check warning on line 159 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L158-L159

Added lines #L158 - L159 were not covered by tests
exposure_records = [exposures_to_be_updated[exposure_id] for exposure_id in exposure_ids]
butler.registry.insertDimensionData("exposure", *exposure_records, replace=True)

Check warning on line 161 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L161

Added line #L161 was not covered by tests

visit_records = [
visits_to_be_updated[exposure_id]
for exposure_id in exposure_ids
if exposure_id in visits_to_be_updated
]
# insertDimensionData results in skypix overlaps being recalculated
# and re-inserted even though we are only changing one unrelated
# item.
butler.registry.insertDimensionData("visit", *visit_records, replace=True)

Check warning on line 171 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L171

Added line #L171 was not covered by tests

return

Check warning on line 173 in python/lsst/daf/butler_migrate/script/update_day_obs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler_migrate/script/update_day_obs.py#L173

Added line #L173 was not covered by tests

0 comments on commit 7a9c064

Please sign in to comment.