diff --git a/data/SConscript b/data/SConscript index 80734ea..9b0b8e2 100644 --- a/data/SConscript +++ b/data/SConscript @@ -70,9 +70,38 @@ rc2 = ( PipelineCommands( "RC2", os.path.join(os.environ["DRP_PIPE_DIR"], "pipelines", "HSC", "DRP-RC2.yaml"), base_repo ) + # Add side runs (later runs do not build on these) to check + # --raise-on-partial-outputs. + .add_side_run( + "test-raise-partial-outputs", + ["isr", "characterizeImage"], + where="skymap='ci_mw'", + fail=[ + # Configure one quantum to fail with AnnotatedPartialOutputsError. + # Due to command-line options this should be interpreted as + # a regular failure, and downstream quanta should not be run. + '''isr:lsst.pipe.base.AnnotatedPartialOutputsError:"instrument='HSC' AND exposure=95104"''' + ], + raise_on_partial_outputs=True, + expect_failure=True, + ) + .add_side_run( + "test-no-raise-partial-outputs", + ["isr", "characterizeImage"], + where="skymap='ci_mw'", + fail=[ + # Configure one quantum to fail with AnnotatedPartialOutputsError. + # By default this is treated as a qualified success, and downstream + # quanta will be run - we expect them to them do NoWorkFound, which + # just reflects the fact that it's a little weird to pick ISR as + # the task to raise this error, but that doesn't matter for testing + # the mechanics. + '''isr:lsst.pipe.base.AnnotatedPartialOutputsError:"instrument='HSC' AND exposure=95104"''' + ], + expect_failure=False, + ) .add("step1", where="instrument='HSC' AND exposure NOT IN (18202, 96980)") - # Add side runs (later runs do not build on these) to check --extend-run - # and --clobber-outputs. + # Add more side runs to check --extend-run and --clobber-outputs. .add_side_run( "test-clobber-without-skip", ["calibrate", "consolidatePreSourceTable"], diff --git a/python/lsst/ci/middleware/scons.py b/python/lsst/ci/middleware/scons.py index 19759dd..fb3d16c 100644 --- a/python/lsst/ci/middleware/scons.py +++ b/python/lsst/ci/middleware/scons.py @@ -264,6 +264,8 @@ def add_side_run( skip_existing_in_last: bool = False, extend_run: bool = False, clobber_outputs: bool = False, + raise_on_partial_outputs: bool = False, + expect_failure: bool | None = None, ) -> PipelineCommands: """Add a new QuantumGraph and its execution to the build, but do not use it as input for later runs. @@ -293,6 +295,12 @@ def add_side_run( If `True`, pass ``--clobber-outputs`` to the QuantumGraph generation command and the direct execution command. QBB execution effectively inherits its clobber behavior from the quantum graph. + raise_on_partial_outputs : `bool`, optional + If `True`, pass ``--raise-on-partial-outputs`` to ``pipetask run`` + and ``pipetask run-qbb``. + expect_failure : `bool`, optional + Override whether to expect the command to exit with a nonzero exit + code; default is ``bool(fail)``. Returns ------- @@ -318,17 +326,19 @@ def add_side_run( qg, suffix=suffix, output_run=output_run, - expect_failure=bool(fail), + expect_failure=bool(fail) if expect_failure is None else expect_failure, extend_run=extend_run, clobber_outputs=clobber_outputs, + raise_on_partial_outputs=raise_on_partial_outputs, ) if not extend_run: self._add_qbb( qg, suffix=suffix, output_run=output_run, - expect_failure=bool(fail), + expect_failure=bool(fail) if expect_failure is None else expect_failure, pre_exec_init=not extend_run, + raise_on_partial_outputs=raise_on_partial_outputs, ) return self @@ -506,6 +516,7 @@ def _add_direct( extend_run: bool = False, clobber_outputs: bool = False, auto_retry_mem: tuple[str, str] | None = None, + raise_on_partial_outputs: bool = False, ) -> File: """Make an SCons target for direct execution of the quantum graph with ``pipetask run`` and a full butler. @@ -527,6 +538,8 @@ def _add_direct( If `True`, pass ``--clobber-outputs`` to ``pipetask run``. auto_retry_mem : `tuple` [ `str`, `str` ], optional See argument of the same name on `add`. + raise_on_partial_outputs : `bool`, optional + If `True`, pass ``--raise-on-partial-outputs`` to ``pipetask run``. Returns ------- @@ -543,6 +556,8 @@ def _add_direct( extra_args.append("--clobber-outputs") if auto_retry_mem: extra_args.append(f"--memory-per-quantum {auto_retry_mem[0]}") + if raise_on_partial_outputs: + extra_args.append("--raise-on-partial-outputs") cmds = [ # Untar the input data repository, which naturally makes a copy # of it, with the name we'll use for the output data @@ -598,6 +613,7 @@ def _add_qbb( expect_failure: bool, pre_exec_init: bool = True, auto_retry_mem: tuple[str, str] | None = None, + raise_on_partial_outputs: bool = False, ) -> File: """Make an SCons target for direct execution of the quantum graph with ``pipetask run-qbb`` and `lsst.daf.butler.QuantumBackedButler`. @@ -617,7 +633,9 @@ def _add_qbb( If `False`, do not run ``pipetask pre-exec-init-qbb`` at all. auto_retry_mem : `tuple` [ `str`, `str` ], optional See argument of the same name on `add`. - + raise_on_partial_outputs : `bool`, optional + If `True`, pass ``--raise-on-partial-outputs`` to + ``pipetask run-qbb``. Returns ------- @@ -645,6 +663,8 @@ def _add_qbb( extra_args: list[str] = [] if auto_retry_mem: extra_args.append(f"--memory-per-quantum {auto_retry_mem[0]}") + if raise_on_partial_outputs: + extra_args.append("--raise-on-partial-outputs") commands.append( # Execute the QG using QuantumBackedButler. self._pipetask_cmd( diff --git a/tests/test_rc2_outputs.py b/tests/test_rc2_outputs.py index d1d688f..b8914eb 100644 --- a/tests/test_rc2_outputs.py +++ b/tests/test_rc2_outputs.py @@ -164,6 +164,40 @@ def test_fgcm_refcats(self) -> None: self.assertNotIn(231819, htm7_indices) self.assertIn(231865, htm7_indices) + def test_partial_outputs(self) -> None: + """Test that downstream tasks are run or not as appropriate when + partial output errors are raised. + """ + no_raise_direct = OutputRepoTests("RC2", "test-no-raise-partial-outputs-direct", {}) + no_raise_qbb = OutputRepoTests("RC2", "test-no-raise-partial-outputs-qbb", {}) + data_id = dict(instrument="HSC", detector=57) + for helper in (no_raise_direct, no_raise_qbb): + # When we don't raise, the ISR quantum that raised should have + # metadata and logs written, and so should downstream tasks that + # end up raising NoWorkFound. + self.assertTrue(helper.butler.exists(get_mock_name("isr_metadata"), data_id, exposure=95104)) + self.assertTrue(helper.butler.exists(get_mock_name("isr_log"), data_id, exposure=95104)) + self.assertTrue( + helper.butler.exists(get_mock_name("characterizeImage_metadata"), data_id, visit=95104) + ) + self.assertTrue( + helper.butler.exists(get_mock_name("characterizeImage_log"), data_id, visit=95104) + ) + raise_direct = OutputRepoTests("RC2", "test-raise-partial-outputs-direct", {}) + raise_qbb = OutputRepoTests("RC2", "test-raise-partial-outputs-qbb", {}) + for helper in (raise_direct, raise_qbb): + # When we do raise, the ISR quantum that raised should not have + # metadata written, but it should have logs, and downstream tasks + # should not have either, because they are never run. + self.assertFalse(helper.butler.exists(get_mock_name("isr_metadata"), data_id, exposure=95104)) + self.assertTrue(helper.butler.exists(get_mock_name("isr_log"), data_id, exposure=95104)) + self.assertFalse( + helper.butler.exists(get_mock_name("characterizeImage_metadata"), data_id, visit=95104) + ) + self.assertFalse( + helper.butler.exists(get_mock_name("characterizeImage_log"), data_id, visit=95104) + ) + if __name__ == "__main__": unittest.main()