Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44488: Add tests for the --raise-on-partial-outputs option to pipetask. #25

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions data/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,38 @@ rc2 = (
PipelineCommands(
"RC2", os.path.join(os.environ["DRP_PIPE_DIR"], "pipelines", "HSC", "DRP-RC2.yaml"), base_repo
)
# Add side runs (later runs do not build on these) to check
# --raise-on-partial-outputs.
.add_side_run(
"test-raise-partial-outputs",
["isr", "characterizeImage"],
where="skymap='ci_mw'",
fail=[
# Configure one quantum to fail with AnnotatedPartialOutputsError.
# Due to command-line options this should be interpreted as
# a regular failure, and downstream quanta should not be run.
'''isr:lsst.pipe.base.AnnotatedPartialOutputsError:"instrument='HSC' AND exposure=95104"'''
],
raise_on_partial_outputs=True,
expect_failure=True,
)
.add_side_run(
"test-no-raise-partial-outputs",
["isr", "characterizeImage"],
where="skymap='ci_mw'",
fail=[
# Configure one quantum to fail with AnnotatedPartialOutputsError.
# By default this is treated as a qualified success, and downstream
# quanta will be run - we expect them to them do NoWorkFound, which
# just reflects the fact that it's a little weird to pick ISR as
# the task to raise this error, but that doesn't matter for testing
# the mechanics.
'''isr:lsst.pipe.base.AnnotatedPartialOutputsError:"instrument='HSC' AND exposure=95104"'''
],
expect_failure=False,
)
.add("step1", where="instrument='HSC' AND exposure NOT IN (18202, 96980)")
# Add side runs (later runs do not build on these) to check --extend-run
# and --clobber-outputs.
# Add more side runs to check --extend-run and --clobber-outputs.
.add_side_run(
"test-clobber-without-skip",
["calibrate", "consolidatePreSourceTable"],
Expand Down
26 changes: 23 additions & 3 deletions python/lsst/ci/middleware/scons.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ def add_side_run(
skip_existing_in_last: bool = False,
extend_run: bool = False,
clobber_outputs: bool = False,
raise_on_partial_outputs: bool = False,
expect_failure: bool | None = None,
) -> PipelineCommands:
"""Add a new QuantumGraph and its execution to the build, but do not
use it as input for later runs.
Expand Down Expand Up @@ -293,6 +295,12 @@ def add_side_run(
If `True`, pass ``--clobber-outputs`` to the QuantumGraph
generation command and the direct execution command. QBB execution
effectively inherits its clobber behavior from the quantum graph.
raise_on_partial_outputs : `bool`, optional
If `True`, pass ``--raise-on-partial-outputs`` to ``pipetask run``
and ``pipetask run-qbb``.
expect_failure : `bool`, optional
Override whether to expect the command to exit with a nonzero exit
code; default is ``bool(fail)``.

Returns
-------
Expand All @@ -318,17 +326,19 @@ def add_side_run(
qg,
suffix=suffix,
output_run=output_run,
expect_failure=bool(fail),
expect_failure=bool(fail) if expect_failure is None else expect_failure,
extend_run=extend_run,
clobber_outputs=clobber_outputs,
raise_on_partial_outputs=raise_on_partial_outputs,
)
if not extend_run:
self._add_qbb(
qg,
suffix=suffix,
output_run=output_run,
expect_failure=bool(fail),
expect_failure=bool(fail) if expect_failure is None else expect_failure,
pre_exec_init=not extend_run,
raise_on_partial_outputs=raise_on_partial_outputs,
)
return self

Expand Down Expand Up @@ -506,6 +516,7 @@ def _add_direct(
extend_run: bool = False,
clobber_outputs: bool = False,
auto_retry_mem: tuple[str, str] | None = None,
raise_on_partial_outputs: bool = False,
) -> File:
"""Make an SCons target for direct execution of the quantum graph
with ``pipetask run`` and a full butler.
Expand All @@ -527,6 +538,8 @@ def _add_direct(
If `True`, pass ``--clobber-outputs`` to ``pipetask run``.
auto_retry_mem : `tuple` [ `str`, `str` ], optional
See argument of the same name on `add`.
raise_on_partial_outputs : `bool`, optional
If `True`, pass ``--raise-on-partial-outputs`` to ``pipetask run``.

Returns
-------
Expand All @@ -543,6 +556,8 @@ def _add_direct(
extra_args.append("--clobber-outputs")
if auto_retry_mem:
extra_args.append(f"--memory-per-quantum {auto_retry_mem[0]}")
if raise_on_partial_outputs:
extra_args.append("--raise-on-partial-outputs")
cmds = [
# Untar the input data repository, which naturally makes a copy
# of it, with the name we'll use for the output data
Expand Down Expand Up @@ -598,6 +613,7 @@ def _add_qbb(
expect_failure: bool,
pre_exec_init: bool = True,
auto_retry_mem: tuple[str, str] | None = None,
raise_on_partial_outputs: bool = False,
) -> File:
"""Make an SCons target for direct execution of the quantum graph
with ``pipetask run-qbb`` and `lsst.daf.butler.QuantumBackedButler`.
Expand All @@ -617,7 +633,9 @@ def _add_qbb(
If `False`, do not run ``pipetask pre-exec-init-qbb`` at all.
auto_retry_mem : `tuple` [ `str`, `str` ], optional
See argument of the same name on `add`.

raise_on_partial_outputs : `bool`, optional
If `True`, pass ``--raise-on-partial-outputs`` to
``pipetask run-qbb``.

Returns
-------
Expand Down Expand Up @@ -645,6 +663,8 @@ def _add_qbb(
extra_args: list[str] = []
if auto_retry_mem:
extra_args.append(f"--memory-per-quantum {auto_retry_mem[0]}")
if raise_on_partial_outputs:
extra_args.append("--raise-on-partial-outputs")
commands.append(
# Execute the QG using QuantumBackedButler.
self._pipetask_cmd(
Expand Down
34 changes: 34 additions & 0 deletions tests/test_rc2_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,40 @@ def test_fgcm_refcats(self) -> None:
self.assertNotIn(231819, htm7_indices)
self.assertIn(231865, htm7_indices)

def test_partial_outputs(self) -> None:
"""Test that downstream tasks are run or not as appropriate when
partial output errors are raised.
"""
no_raise_direct = OutputRepoTests("RC2", "test-no-raise-partial-outputs-direct", {})
no_raise_qbb = OutputRepoTests("RC2", "test-no-raise-partial-outputs-qbb", {})
data_id = dict(instrument="HSC", detector=57)
for helper in (no_raise_direct, no_raise_qbb):
# When we don't raise, the ISR quantum that raised should have
# metadata and logs written, and so should downstream tasks that
# end up raising NoWorkFound.
self.assertTrue(helper.butler.exists(get_mock_name("isr_metadata"), data_id, exposure=95104))
self.assertTrue(helper.butler.exists(get_mock_name("isr_log"), data_id, exposure=95104))
self.assertTrue(
helper.butler.exists(get_mock_name("characterizeImage_metadata"), data_id, visit=95104)
)
self.assertTrue(
helper.butler.exists(get_mock_name("characterizeImage_log"), data_id, visit=95104)
)
raise_direct = OutputRepoTests("RC2", "test-raise-partial-outputs-direct", {})
raise_qbb = OutputRepoTests("RC2", "test-raise-partial-outputs-qbb", {})
for helper in (raise_direct, raise_qbb):
# When we do raise, the ISR quantum that raised should not have
# metadata written, but it should have logs, and downstream tasks
# should not have either, because they are never run.
self.assertFalse(helper.butler.exists(get_mock_name("isr_metadata"), data_id, exposure=95104))
Comment on lines +189 to +192

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that the metadata isn't written on a failure: we add information about the failure to the task metadata, so I'd have hoped that it would get written.

Copy link
Member Author

@TallJimbo TallJimbo Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we'll break a lot of things if we start writing metadata for tasks that fail; I'd like to fix that eventually, but it's a couple of big projects down from the top of my work stack (I consider it part of the "butler provenance" work package). There may be a somewhat nearer-term possibility of getting it into the "quantum execution reports" that I believe BPS uses, but I'm not super familiar with how information flows into and out of those.

To be clear, the metadata absolutely is being written when we take the "qualified success" route.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the task metadata on qualified success is probably good enough; AP's use case doesn't care about the outputs of failing tasks, so it shouldn't matter.

self.assertTrue(helper.butler.exists(get_mock_name("isr_log"), data_id, exposure=95104))
self.assertFalse(
helper.butler.exists(get_mock_name("characterizeImage_metadata"), data_id, visit=95104)
)
self.assertFalse(
helper.butler.exists(get_mock_name("characterizeImage_log"), data_id, visit=95104)
)


if __name__ == "__main__":
unittest.main()
Loading