Skip to content

Commit

Permalink
Merge pull request #10 from lsst/tickets/DM-38498
Browse files Browse the repository at this point in the history
DM-38498: Add runs that check --extend-run.
  • Loading branch information
TallJimbo authored Aug 24, 2023
2 parents ca2447c + 6da8afe commit f310901
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 39 deletions.
16 changes: 16 additions & 0 deletions data/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ rc2 = (
"RC2", os.path.join(os.environ["DRP_PIPE_DIR"], "pipelines", "HSC", "DRP-RC2.yaml"), base_repo
)
.add("step1", where="instrument='HSC' AND exposure NOT IN (18202, 96980)")
# Add side runs (later runs do not build on these) to check --extend-run
# and --clobber-outputs.
.add_side_run(
"test-clobber-without-skip",
["calibrate", "consolidatePreSourceTable"],
where="skymap='ci_mw'",
extend_run=True,
clobber_outputs=True,
)
.add_side_run(
"test-skip-and-clobber",
["calibrate", "consolidatePreSourceTable"],
skip_existing_in_last=True,
extend_run=True,
clobber_outputs=True,
)
.add("step2a", where="skymap='ci_mw'")
.add("step2b", where="skymap='ci_mw' AND tract=0")
.add("step2cde")
Expand Down
198 changes: 159 additions & 39 deletions python/lsst/ci/middleware/scons.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)

import os
from collections.abc import Sequence
from collections.abc import Iterable, Sequence

from lsst.sconsUtils import state
from lsst.sconsUtils.utils import libraryLoaderEnvironment
Expand Down Expand Up @@ -186,7 +186,7 @@ def add(
fail: Sequence[str] = (),
skip_existing_in_last: bool = False,
) -> PipelineCommands:
"""Add a new QuantumGraph and its execution to the graph.
"""Add a new QuantumGraph and its execution to the build.
Parameters
----------
Expand Down Expand Up @@ -226,7 +226,7 @@ def add(
qg = self._add_qg(
suffix=suffix,
output_run=output_run,
step=step,
tasks_or_subsets=[step] if step is not None else [],
where=where,
fail=fail,
skip_existing_in_last=skip_existing_in_last,
Expand All @@ -237,6 +237,84 @@ def add(
self.last_qbb_repo = self._add_qbb(
qg, suffix=suffix, output_run=output_run, expect_failure=bool(fail)
)
self.last_output_run = output_run
return self

def add_side_run(
self,
suffix: str,
tasks_or_subsets: Iterable[str],
where: str = "",
fail: Sequence[str] = (),
skip_existing_in_last: bool = False,
extend_run: bool = False,
clobber_outputs: bool = False,
) -> PipelineCommands:
"""Add a new QuantumGraph and its execution to the build, but do not
use it as input for later runs.
Parameters
----------
suffix : `str`
Suffix for filenames and run collections that uniquely identifies
this run.
tasks_or_subsets : `~collections.abc.Iterable` [ `str` ]
Named subset or task labels to run. If empty, the full pipeline is
run.
where : `str`, optional
Data ID constraint expression passed as the ``--data-query``
argument to ``pipetask`` when building the graph.
fail : `~collections.abc.Sequence` [ `str` ]
Sequence of colon-separated ``task_label:error_type:where`` tuples
that identify quanta that should raise an exception.
skip_existing_in_last : `bool`, optional
If `True`, pass ``--skip-existing-in`` to the QuantumGraph
generation command with the input collections as the argument.
extend_run : `bool`, optional
If `True`, pass ``--extend-run`` to the QuantumGraph generation
command and the direct execution command, and do not run the
pre-exec-init logic for QBB execution.
clobber_outputs : `bool`, optional
If `True`, pass ``--clobber-outputs`` to the QuantumGraph
generation command and the direct execution command. QBB execution
effectively inherits its clobber behavior from the quantum graph.
Returns
-------
self : `PipelineCommands`
The instance this method was called on, to facilitate
method-chaining.
"""
if extend_run:
output_run = self.last_output_run
else:
output_run = self.run_template.format(name=self.name, suffix=suffix)
qg = self._add_qg(
suffix=suffix,
output_run=output_run,
tasks_or_subsets=tasks_or_subsets,
where=where,
fail=fail,
skip_existing_in_last=skip_existing_in_last,
extend_run=extend_run,
clobber_outputs=clobber_outputs,
)
self._add_direct(
qg,
suffix=suffix,
output_run=output_run,
expect_failure=bool(fail),
extend_run=extend_run,
clobber_outputs=clobber_outputs,
)
if not extend_run:
self._add_qbb(
qg,
suffix=suffix,
output_run=output_run,
expect_failure=bool(fail),
pre_exec_init=not extend_run,
)
return self

def finish(self) -> list[File]:
Expand Down Expand Up @@ -316,10 +394,12 @@ def _add_qg(
self,
suffix: str,
output_run: str,
step: str | None,
tasks_or_subsets: Iterable[str],
where: str,
fail: Sequence[str] = (),
skip_existing_in_last: bool = False,
extend_run: bool = False,
clobber_outputs: bool = False,
) -> File:
"""Make a SCons target for the quantum graph file.
Expand All @@ -329,9 +409,9 @@ def _add_qg(
Suffix that combines the step and group, if present.
output_run : `str`
Name of the output RUN collection.
step : `str`, optional
Named subset corresponding to a step in the pipeline. If `None`
(default), a graph is generated for the full pipeline.
tasks_or_subsets : `~collections.abc.Iterable [ `str` ]
Named subset or task labels to run. If empty, the full pipeline is
run.
where : `str`, optional
Data ID constraint expression passed as the ``--data-query``
argument to ``pipetask`` when building the graph.
Expand All @@ -341,6 +421,12 @@ def _add_qg(
skip_existing_in_last : `bool`, optional
If `True`, pass ``--skip-existing-in`` to the QuantumGraph
generation command with the input collections as the argument.
extend_run : `bool`, optional
If `True`, pass ``--extend-run`` to the QuantumGraph generation
command.
clobber_outputs : `bool`, optional
If `True`, pass ``--clobber-outputs`` to the QuantumGraph
generation command.
Returns
-------
Expand All @@ -353,6 +439,10 @@ def _add_qg(
fail_and_retry_args = [f"--mock-failure {f}" for f in fail]
if skip_existing_in_last:
fail_and_retry_args.append(f"--skip-existing-in {self.chain}")
if extend_run:
fail_and_retry_args.append("--extend-run")
if clobber_outputs:
fail_and_retry_args.append("--clobber-outputs")
targets = state.env.Command(
[File(qg_file), File(log)],
# We always build QGs and run direct processing using the previous
Expand All @@ -371,7 +461,10 @@ def _add_qg(
self._pipetask_cmd(
"qgraph",
f"-b {repo_in_cmd}",
"-p ${SOURCES[1]}" + ("" if step is None else f"#{step}"),
(
"-p ${SOURCES[1]}"
+ ("" if tasks_or_subsets is None else f"#{','.join(tasks_or_subsets)}")
),
f'-d "{where}"',
f"--input {DEFAULTS_COLLECTION}",
f"--output {self.chain}",
Expand All @@ -389,7 +482,15 @@ def _add_qg(
self.all_targets.extend(targets)
return targets[0]

def _add_direct(self, qg_file: File, suffix: str, output_run: str, expect_failure: bool = False) -> File:
def _add_direct(
self,
qg_file: File,
suffix: str,
output_run: str,
expect_failure: bool = False,
extend_run: bool = False,
clobber_outputs: bool = False,
) -> File:
"""Make an SCons target for direct execution of the quantum graph
with ``pipetask run`` and a full butler.
Expand All @@ -404,6 +505,10 @@ def _add_direct(self, qg_file: File, suffix: str, output_run: str, expect_failur
expect_failure : `bool`, optional
If `True`, expect the pipetask command to fail with a nonzero exit
code, and guard it accordingly to keep the SCons build running.
extend_run : `bool`, optional
If `True`, pass ``--extend-run`` to ``pipetask run``.
clobber_outputs : `bool`, optional
If `True`, pass ``--clobber-outputs`` to ``pipetask run``.
Returns
-------
Expand All @@ -413,6 +518,11 @@ def _add_direct(self, qg_file: File, suffix: str, output_run: str, expect_failur
repo_file = os.path.join(self.name, suffix + "-direct.tgz")
log = os.path.join(self.name, suffix + "-direct.log")
repo_in_cmd = "${TARGETS[0].base}"
extra_args = []
if extend_run:
extra_args.append("--extend-run")
if clobber_outputs:
extra_args.append("--clobber-outputs")
targets = state.env.Command(
[File(repo_file), File(log)],
# We use the last QBB repo as input, even for direct executions
Expand All @@ -432,6 +542,7 @@ def _add_direct(self, qg_file: File, suffix: str, output_run: str, expect_failur
f"--output {self.chain}",
f"--output-run {output_run}",
"--register-dataset-types",
*extra_args,
log="${TARGETS[1]}",
expect_failure=expect_failure,
),
Expand All @@ -441,7 +552,9 @@ def _add_direct(self, qg_file: File, suffix: str, output_run: str, expect_failur
self.all_targets.extend(targets)
return targets[0]

def _add_qbb(self, qg_file: File, suffix: str, output_run: str, expect_failure: bool) -> File:
def _add_qbb(
self, qg_file: File, suffix: str, output_run: str, expect_failure: bool, pre_exec_init: bool = True
) -> File:
"""Make an SCons target for direct execution of the quantum graph
with ``pipetask run-qbb`` and `lsst.daf.butler.QuantumBackedButler`.
Expand All @@ -456,6 +569,8 @@ def _add_qbb(self, qg_file: File, suffix: str, output_run: str, expect_failure:
expect_failure : `bool`
If `True`, expect the pipetask command to fail with a nonzero exit
code, and guard it accordingly to keep the SCons build running.
pre_exec_init : `bool`, optional
If `False`, do not run ``pipetask pre-exec-init-qbb`` at all.
Returns
-------
Expand All @@ -465,41 +580,46 @@ def _add_qbb(self, qg_file: File, suffix: str, output_run: str, expect_failure:
repo_file = os.path.join(self.name, suffix + "-qbb.tgz")
log = os.path.join(self.name, suffix + "-qbb.log")
repo_in_cmd = "${TARGETS[0].base}"
targets = state.env.Command(
[File(repo_file), File(log)],
[self.last_qbb_repo, qg_file],
[
# Untar the input data repository, which naturally makes a copy
# of it, with the name we'll use for the output data
# repository.
untar_repo_cmd("${SOURCES[0]}", repo_in_cmd),
commands = [
# Untar the input data repository, which naturally makes a copy of
# it, with the name we'll use for the output data repository.
untar_repo_cmd("${SOURCES[0]}", repo_in_cmd)
]
if pre_exec_init:
commands.append(
# Run pre-execution steps via QuantumBackedButler.
self._pipetask_cmd(
"pre-exec-init-qbb",
repo_in_cmd,
"${SOURCES[1]}",
log="${TARGETS[1]}",
),
# Execute the QG using QuantumBackedButler.
self._pipetask_cmd(
"run-qbb",
repo_in_cmd,
"${SOURCES[1]}",
log="${TARGETS[1]}",
expect_failure=expect_failure,
),
# Bring results home using butler transfer-from-graph.
python_cmd(
BUTLER_BIN,
"transfer-from-graph",
"${SOURCES[1]}",
repo_in_cmd,
"--no-transfer-dimensions",
"--update-output-chain",
"--register-dataset-types",
),
tar_repo_cmd(repo_in_cmd, "${TARGETS[0]}"),
],
)
)
commands += [
# Execute the QG using QuantumBackedButler.
self._pipetask_cmd(
"run-qbb",
repo_in_cmd,
"${SOURCES[1]}",
log="${TARGETS[1]}",
expect_failure=expect_failure,
),
# Bring results home using butler transfer-from-graph.
python_cmd(
BUTLER_BIN,
"transfer-from-graph",
"${SOURCES[1]}",
repo_in_cmd,
"--no-transfer-dimensions",
"--update-output-chain",
"--register-dataset-types",
),
tar_repo_cmd(repo_in_cmd, "${TARGETS[0]}"),
]
targets = state.env.Command(
[File(repo_file), File(log)],
[self.last_qbb_repo, qg_file],
commands,
)
self.all_targets.extend(targets)
return targets[0]
Expand Down

0 comments on commit f310901

Please sign in to comment.