Skip to content

Commit

Permalink
Add a --end-date option to metadata update scripts
Browse files Browse the repository at this point in the history
Adding an --end-date option makes back-filling historic changes easier
by allowing work to be split into smaller, date-range tasks.

Adding this feature required some internal API changes, but as these
are not public, doesn't require a major version bump.
  • Loading branch information
kjsanger committed Nov 2, 2023
1 parent 21d7951 commit 2be8b51
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 54 deletions.
23 changes: 17 additions & 6 deletions scripts/apply-ont-metadata
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ secondary metadata (like `update-secondary-metadata` does), but that is purely a
optimisation to make the data available without waiting for an
`update-secondary-metadata` to be scheduled.
Only runs whose ML warehouse records have been updated recently are updated. The default
window for detecting changes is the 14 days prior to the time when the script is run.
This can be changed using the --begin-date CLI option.
Only runs whose ML warehouse records have been updated within the specified date range.
The default window for detecting changes is the 14 days prior to the time when the
script is run. This can be changed using the --begin-date and --end-date CLI options.
"""

parser = argparse.ArgumentParser(
Expand All @@ -54,12 +54,21 @@ add_logging_arguments(parser)
parser.add_argument(
"--begin-date",
"--begin_date",
help="Limit runs found to those changed after this date. Defaults to 14 days ago. "
"The argument must be an ISO8601 UTC date or date and time e.g. 2022-01-30, "
"2022-01-30T11:11:03Z",
help="Limit runs found to those changed at, or after this date. Defaults to "
"14 days ago. The argument must be an ISO8601 UTC date or date and time "
"e.g. 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now() - timedelta(days=14),
)
parser.add_argument(
"--end-date",
"--end_date",
help="Limit runs found to those changed at, or before this date. Defaults to "
"the current time. The argument must be an ISO8601 UTC date or date and time "
"e.g. 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(),
)
parser.add_argument(
"--zone",
help="Specify a federated iRODS zone in which to find "
Expand All @@ -70,6 +79,8 @@ parser.add_argument(
parser.add_argument(
"--database-config",
"--database_config",
"--db-config",
"--db_config",
help="Configuration file for database connection.",
type=argparse.FileType("r"),
required=True,
Expand Down
56 changes: 44 additions & 12 deletions scripts/locate-data-objects
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ add_logging_arguments(parser)
parser.add_argument(
"--database-config",
"--database_config",
"--db-config",
"--db_config",
help="Configuration file for database connection",
type=argparse.FileType("r"),
required=True,
Expand Down Expand Up @@ -160,12 +162,21 @@ ilup_parser = subparsers.add_parser(
ilup_parser.add_argument(
"--begin-date",
"--begin_date",
help="Limit data objects found to those changed after this date. Defaults to 14 "
"days ago. The argument must be an ISO8601 UTC date or date and time e.g. "
"2022-01-30, 2022-01-30T11:11:03Z",
help="Limit data objects found to those whose metadata was changed in the ML "
"warehouse at, or after after this date. Defaults to 14 days ago. The argument "
"must be an ISO8601 UTC date or date and time e.g. 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(timezone.utc) - timedelta(days=14),
)
ilup_parser.add_argument(
"--end-date",
"--end_date",
help="Limit data objects found to those whose metadata was changed in the ML "
"warehouse at, or before this date. Defaults to the current time. The argument "
"must be an ISO8601 UTC date or date and time e.g. 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(),
)
ilup_parser.add_argument(
"--skip-absent-runs",
"--skip_absent_runs",
Expand All @@ -185,17 +196,22 @@ def illumina_updates(cli_args):
with Session(engine) as session:
num_processed = num_errors = 0

iso_date = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ")
iso_begin = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ")
iso_end = cli_args.end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
skip_absent_runs = cli_args.skip_absent_runs

attempts_per_run = defaultdict(int)
success_per_run = defaultdict(int)

for i, c in enumerate(
illumina.find_components_changed(session, since=cli_args.begin_date)
illumina.find_updated_components(
session, since=cli_args.begin_date, until=cli_args.end_date
)
):
num_processed += 1
log.info("Finding data objects", item=i, component=c, since=iso_date)
log.info(
"Finding data objects", item=i, comp=c, since=iso_begin, until=iso_end
)

try:
avus = [
Expand All @@ -212,8 +228,9 @@ def illumina_updates(cli_args):
log.info(
"Skipping run after unsuccessful attempts to find it",
item=i,
component=c,
since=iso_date,
comp=c,
since=iso_begin,
until=iso_end,
attempts=attempts_per_run[c.id_run],
)
continue
Expand Down Expand Up @@ -254,6 +271,15 @@ ontup_parser.add_argument(
type=parse_iso_date,
default=datetime.now(timezone.utc) - timedelta(days=14),
)
ontup_parser.add_argument(
"--end-date",
"--end_date",
help="Limit collections found to those changed before date. Defaults to the"
"current time. The argument must be an ISO8601 UTC date or date and time e.g."
" 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(),
)
ontup_parser.add_argument(
"--report-tags",
"--report_tags",
Expand All @@ -267,16 +293,22 @@ def ont_updates(cli_args):
engine = sqlalchemy.create_engine(dbconfig.url)
with Session(engine) as session:
num_processed = num_errors = 0
iso_date = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ")
iso_begin = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ")
iso_end = cli_args.end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
report_tags = cli_args.report_tags

for i, c in enumerate(
ont.find_components_changed(
session, include_tags=report_tags, since=cli_args.begin_date
ont.find_updated_components(
session,
include_tags=report_tags,
since=cli_args.begin_date,
until=cli_args.end_date,
)
):
num_processed += 1
log.info("Finding collections", item=i, component=c, since=iso_date)
log.info(
"Finding collections", item=i, comp=c, since=iso_begin, until=iso_end
)

try:
avus = [
Expand Down
3 changes: 2 additions & 1 deletion scripts/update-secondary-metadata
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ add_logging_arguments(parser)
parser.add_argument(
"--database-config",
"--database_config",
"--db-config",
"--db_config",
help="Configuration file for database connection.",
type=argparse.FileType("r"),
required=True,
Expand Down Expand Up @@ -102,7 +104,6 @@ parser.add_argument(
parser.add_argument(
"--version", help="Print the version and exit.", action="store_true"
)

parser.add_argument(
"--zone",
help="Specify a federated iRODS zone in which to find data objects and/or "
Expand Down
2 changes: 1 addition & 1 deletion src/npg_irods/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import logging
import logging.config
from argparse import ArgumentParser, ArgumentTypeError
from datetime import datetime
from datetime import datetime, timedelta, timezone

import dateutil.parser
import structlog
Expand Down
17 changes: 10 additions & 7 deletions src/npg_irods/illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,20 @@ def find_flowcells_by_component(
return query.order_by(asc(IseqFlowcell.id_iseq_flowcell_tmp)).all()


def find_components_changed(sess: Session, since: datetime) -> Iterator[Component]:
def find_updated_components(
sess: Session, since: datetime, until: datetime
) -> Iterator[Component]:
"""Find in the ML warehouse any Illumina sequence components whose tracking
metadata has been changed since a given time.
metadata has been changed within a specified time range
A change is defined as the "recorded_at" column (Sample, Study, IseqFlowcell) or
"last_changed" colum (IseqProductMetrics) having a timestamp more recent than the
given time.
Args:
sess: An open SQL session.
since: A datetime query argument.
since: A datetime.
until: A datetime.
Returns:
An iterator over Components whose tracking metadata have changed.
Expand All @@ -296,10 +299,10 @@ def find_components_changed(sess: Session, since: datetime) -> Iterator[Componen
.join(IseqFlowcell.study)
.join(IseqFlowcell.iseq_product_metrics)
.filter(
(Sample.recorded_at >= since)
| (Study.recorded_at >= since)
| (IseqFlowcell.recorded_at >= since)
| (IseqProductMetrics.last_changed >= since)
Sample.recorded_at.between(since, until)
| Study.recorded_at.between(since, until)
| IseqFlowcell.recorded_at.between(since, until)
| IseqProductMetrics.last_changed.between(since, until)
)
.order_by(asc(IseqFlowcell.id_iseq_flowcell_tmp))
):
Expand Down
25 changes: 16 additions & 9 deletions src/npg_irods/ont.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ def apply_metadata(
experiment_name=None,
instrument_slot=None,
since: datetime = None,
until: datetime = None,
zone=None,
) -> (int, int, int):
"""Apply iRODS metadata on ONT run collections whose corresponding ML warehouse
records have been updated at, or more recently than, the specified time. This
function detects runs that are multiplexed and adds relevant tag identifier and
tag index primary metadata to the deplexed collections.
records have been updated within a specified time range. This function detects
runs that are multiplexed and adds relevant tag identifier and tag index primary
metadata to the deplexed collections.
Collections to annotate are identified by having ont:experiment_name and
ont:instrument_slot metadata already attached to them. This is done for example,
Expand All @@ -101,6 +102,7 @@ def apply_metadata(
instrument_slot: Limit updates to this instrument slot. Optional, requires
an experiment_name to be supplied.
since: A datetime. Limit updates to experiments changed at this time or later.
until: A datetime. Limit updates to experiments before at this time or earlier.
zone: The iRODS zone to search for metadata to update.
Returns:
Expand All @@ -109,6 +111,8 @@ def apply_metadata(
"""
if since is None:
since = datetime.fromtimestamp(0) # Everything since the Epoch
if until is None:
until = datetime.now()

if experiment_name is None and instrument_slot is not None:
raise ValueError(
Expand All @@ -119,7 +123,9 @@ def apply_metadata(
num_found, num_updated, num_errors = 0, 0, 0

for i, c in enumerate(
find_components_changed(mlwh_session, include_tags=False, since=since)
find_updated_components(
mlwh_session, include_tags=False, since=since, until=until
)
):
if experiment_name is not None and c.experiment_name != experiment_name:
continue
Expand Down Expand Up @@ -309,15 +315,16 @@ def find_recent_expt(sess: Session, since: datetime) -> list[str]:
return [val for val, in rows]


def find_components_changed(
sess: Session, since: datetime, include_tags=True
def find_updated_components(
sess: Session, since: datetime, until: datetime, include_tags=True
) -> Iterator[Component]:
"""Return the components of runs whose ML warehouse metadata has been updated
at or since the given date and time.
Args:
sess: An open SQL session.
since: A datetime.
until: A datetime.
include_tags: Resolve the components to the granularity of individual tags,
rather than as whole runs. Optional, defaults to True.
Expand All @@ -335,9 +342,9 @@ def find_components_changed(
.join(OseqFlowcell.sample)
.join(OseqFlowcell.study)
.filter(
(Sample.recorded_at >= since)
| (Study.recorded_at >= since)
| (OseqFlowcell.recorded_at >= since)
Sample.recorded_at.between(since, until)
| Study.recorded_at.between(since, until)
| OseqFlowcell.recorded_at.between(since, until)
)
.group_by(*columns)
)
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@
TEST_SQL_STALE_REPLICATE = "setObjectReplStale"
TEST_SQL_INVALID_CHECKSUM = "setObjectChecksumInvalid"

# Counts of test fixture experiments
NUM_SIMPLE_EXPTS = 5
NUM_MULTIPLEXED_EXPTS = 3
NUM_INSTRUMENT_SLOTS = 5

# Dates when test fixture experiments were done
BEGIN = datetime(year=2020, month=1, day=1, hour=0, minute=0, second=0)
EARLY = datetime(year=2020, month=6, day=1, hour=0, minute=0, second=0)
LATE = datetime(year=2020, month=6, day=14, hour=0, minute=0, second=0)
Expand Down
Loading

0 comments on commit 2be8b51

Please sign in to comment.