Skip to content

Commit

Permalink
Add tests for the --raise-on-partial-outputs option to pipetask.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Aug 23, 2024
1 parent 700c84e commit c209ef1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 5 deletions.
33 changes: 31 additions & 2 deletions data/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,38 @@ rc2 = (
PipelineCommands(
"RC2", os.path.join(os.environ["DRP_PIPE_DIR"], "pipelines", "HSC", "DRP-RC2.yaml"), base_repo
)
# Add side runs (later runs do not build on these) to check
# --raise-on-partial-outputs.
.add_side_run(
"test-raise-partial-outputs",
["isr", "characterizeImage"],
where="skymap='ci_mw'",
fail=[
# Configure one quantum to fail with AnnotatedPartialOutputsError.
# Due to command-line options this should be interpreted as
# a regular failure, and downstream quanta should not be run.
'''isr:lsst.pipe.base.AnnotatedPartialOutputsError:"instrument='HSC' AND exposure=95104"'''
],
raise_on_partial_outputs=True,
expect_failure=True,
)
.add_side_run(
"test-no-raise-partial-outputs",
["isr", "characterizeImage"],
where="skymap='ci_mw'",
fail=[
# Configure one quantum to fail with AnnotatedPartialOutputsError.
# By default this is treated as a qualified success, and downstream
# quanta will be run (we expect them to them do NoWorkFound, which
# just reflects the fact that it's a little weird to pick ISR as
# the task to raise this error, but that doesn't matter for testing
# the mechanics.
'''isr:lsst.pipe.base.AnnotatedPartialOutputsError:"instrument='HSC' AND exposure=95104"'''
],
expect_failure=False,
)
.add("step1", where="instrument='HSC' AND exposure NOT IN (18202, 96980)")
# Add side runs (later runs do not build on these) to check --extend-run
# and --clobber-outputs.
# Add more side runs to check --extend-run and --clobber-outputs.
.add_side_run(
"test-clobber-without-skip",
["calibrate", "consolidatePreSourceTable"],
Expand Down
26 changes: 23 additions & 3 deletions python/lsst/ci/middleware/scons.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ def add_side_run(
skip_existing_in_last: bool = False,
extend_run: bool = False,
clobber_outputs: bool = False,
raise_on_partial_outputs: bool = False,
expect_failure: bool | None = None,
) -> PipelineCommands:
"""Add a new QuantumGraph and its execution to the build, but do not
use it as input for later runs.
Expand Down Expand Up @@ -293,6 +295,12 @@ def add_side_run(
If `True`, pass ``--clobber-outputs`` to the QuantumGraph
generation command and the direct execution command. QBB execution
effectively inherits its clobber behavior from the quantum graph.
raise_on_partial_outputs : `bool`, optional
If `True`, pass ``--raise-on-partial-outputs`` to ``pipetask run``
and ``pipetask run-qbb``.
expect_failure : `bool`, optional
Override whether to expect the command to exit with a nonzero exit
code; default is ``bool(fail)``.
Returns
-------
Expand All @@ -318,17 +326,19 @@ def add_side_run(
qg,
suffix=suffix,
output_run=output_run,
expect_failure=bool(fail),
expect_failure=bool(fail) if expect_failure is None else expect_failure,
extend_run=extend_run,
clobber_outputs=clobber_outputs,
raise_on_partial_outputs=raise_on_partial_outputs,
)
if not extend_run:
self._add_qbb(
qg,
suffix=suffix,
output_run=output_run,
expect_failure=bool(fail),
expect_failure=bool(fail) if expect_failure is None else expect_failure,
pre_exec_init=not extend_run,
raise_on_partial_outputs=raise_on_partial_outputs,
)
return self

Expand Down Expand Up @@ -506,6 +516,7 @@ def _add_direct(
extend_run: bool = False,
clobber_outputs: bool = False,
auto_retry_mem: tuple[str, str] | None = None,
raise_on_partial_outputs: bool = False,
) -> File:
"""Make an SCons target for direct execution of the quantum graph
with ``pipetask run`` and a full butler.
Expand All @@ -527,6 +538,8 @@ def _add_direct(
If `True`, pass ``--clobber-outputs`` to ``pipetask run``.
auto_retry_mem : `tuple` [ `str`, `str` ], optional
See argument of the same name on `add`.
raise_on_partial_outputs : `bool`, optional
If `True`, pass ``--raise-on-partial-outputs`` to ``pipetask run``.
Returns
-------
Expand All @@ -543,6 +556,8 @@ def _add_direct(
extra_args.append("--clobber-outputs")
if auto_retry_mem:
extra_args.append(f"--memory-per-quantum {auto_retry_mem[0]}")
if raise_on_partial_outputs:
extra_args.append("--raise-on-partial-outputs")
cmds = [
# Untar the input data repository, which naturally makes a copy
# of it, with the name we'll use for the output data
Expand Down Expand Up @@ -598,6 +613,7 @@ def _add_qbb(
expect_failure: bool,
pre_exec_init: bool = True,
auto_retry_mem: tuple[str, str] | None = None,
raise_on_partial_outputs: bool = False,
) -> File:
"""Make an SCons target for direct execution of the quantum graph
with ``pipetask run-qbb`` and `lsst.daf.butler.QuantumBackedButler`.
Expand All @@ -617,7 +633,9 @@ def _add_qbb(
If `False`, do not run ``pipetask pre-exec-init-qbb`` at all.
auto_retry_mem : `tuple` [ `str`, `str` ], optional
See argument of the same name on `add`.
raise_on_partial_outputs : `bool`, optional
If `True`, pass ``--raise-on-partial-outputs`` to
``pipetask run-qbb``.
Returns
-------
Expand Down Expand Up @@ -645,6 +663,8 @@ def _add_qbb(
extra_args: list[str] = []
if auto_retry_mem:
extra_args.append(f"--memory-per-quantum {auto_retry_mem[0]}")
if raise_on_partial_outputs:
extra_args.append("--raise-on-partial-outputs")
commands.append(
# Execute the QG using QuantumBackedButler.
self._pipetask_cmd(
Expand Down
34 changes: 34 additions & 0 deletions tests/test_rc2_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,40 @@ def test_fgcm_refcats(self) -> None:
self.assertNotIn(231819, htm7_indices)
self.assertIn(231865, htm7_indices)

def test_partial_outputs(self) -> None:
"""Test that downstream tasks are run or not as appropriate when
partial output errors are raised.
"""
no_raise_direct = OutputRepoTests("RC2", "test-no-raise-partial-outputs-direct", {})
no_raise_qbb = OutputRepoTests("RC2", "test-no-raise-partial-outputs-qbb", {})
data_id = dict(instrument="HSC", detector=57)
for helper in (no_raise_direct, no_raise_qbb):
# When we don't raise, the ISR quantum that raised should have
# metadata and logs written, and so should downstream tasks that
# end up raising NoWorkFound.
self.assertTrue(helper.butler.exists(get_mock_name("isr_metadata"), data_id, exposure=95104))
self.assertTrue(helper.butler.exists(get_mock_name("isr_log"), data_id, exposure=95104))
self.assertTrue(
helper.butler.exists(get_mock_name("characterizeImage_metadata"), data_id, visit=95104)
)
self.assertTrue(
helper.butler.exists(get_mock_name("characterizeImage_log"), data_id, visit=95104)
)
raise_direct = OutputRepoTests("RC2", "test-raise-partial-outputs-direct", {})
raise_qbb = OutputRepoTests("RC2", "test-raise-partial-outputs-qbb", {})
for helper in (raise_direct, raise_qbb):
# When we do raise, the ISR quantum that raised should not have
# metadata written, but it should have logs, and downstream tasks
# should not have either, because they are never run.
self.assertFalse(helper.butler.exists(get_mock_name("isr_metadata"), data_id, exposure=95104))
self.assertTrue(helper.butler.exists(get_mock_name("isr_log"), data_id, exposure=95104))
self.assertFalse(
helper.butler.exists(get_mock_name("characterizeImage_metadata"), data_id, visit=95104)
)
self.assertFalse(
helper.butler.exists(get_mock_name("characterizeImage_log"), data_id, visit=95104)
)


if __name__ == "__main__":
unittest.main()

0 comments on commit c209ef1

Please sign in to comment.