diff --git a/doc/changes/DM-41711.feature.md b/doc/changes/DM-41711.feature.md new file mode 100644 index 0000000..a2b3539 --- /dev/null +++ b/doc/changes/DM-41711.feature.md @@ -0,0 +1 @@ +Add tests for the `QuantumProvenanceGraph`. \ No newline at end of file diff --git a/tests/test_prod_outputs.py b/tests/test_prod_outputs.py index bcc9bc5..1de14dc 100644 --- a/tests/test_prod_outputs.py +++ b/tests/test_prod_outputs.py @@ -24,6 +24,7 @@ from lsst.ci.middleware.output_repo_tests import OutputRepoTests from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph from lsst.pipe.base.tests.mocks import get_mock_name # (tract, patch, band): {input visits} for coadds produced here. @@ -129,9 +130,9 @@ def test_property_set_metadata_direct(self) -> None: def test_property_set_metadata_qbb(self) -> None: self.qbb.check_property_set_metadata(self) - def check_step1_manifest_checker(self, helper: OutputRepoTests) -> None: + def check_step1_execution_reports(self, helper: OutputRepoTests) -> None: """Test that the fail-and-recover attempts in step1 worked as expected - using the manifest checker. + using the `QuantumGraphExecutionReport`. """ # This task should have failed in attempt1 and should have been @@ -190,8 +191,204 @@ def check_step1_manifest_checker(self, helper: OutputRepoTests) -> None: hr_summary_2 = report_2.to_summary_dict(helper.butler, human_readable=True) self.assertEqual(hr_summary_2["_mock_calibrate"]["failed_quanta"], []) - def test_step1_manifest_checker_qbb(self) -> None: - self.check_step1_manifest_checker(self.qbb) + def test_step1_execution_reports_qbb(self) -> None: + self.check_step1_execution_reports(self.qbb) + + def check_step1_qpg(self, helper: OutputRepoTests) -> None: + """Test that the fail-and-recover attempts in step1 worked as expected + over each attempt, using the `QuantumProvenanceGraph`. + """ + + # Make the quantum provenance graph for the first attempt + qg_1 = helper.get_quantum_graph("step1", "i-attempt1") + qpg1 = QuantumProvenanceGraph() + qpg1.assemble_quantum_provenance_graph( + helper.butler, [qg_1], collections=["HSC/runs/Prod/step1-i-attempt1"], where="instrument='HSC'" + ) + qg_1_sum = qpg1.to_summary(helper.butler) + + # Loop through the tasks in the dict + for label, task_summary in qg_1_sum.tasks.items(): + self.assertEqual(task_summary.n_unknown, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_expected, 36) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertEqual( + task_summary.n_expected, + task_summary.n_successful + + task_summary.n_blocked + + task_summary.n_unknown + + task_summary.n_wonky + + task_summary.n_failed, + ) + match label: + case "_mock_calibrate": + self.assertEqual(task_summary.n_successful, 30) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_failed, 6) + for quantum_summary in task_summary.failed_quanta: + self.assertEqual(quantum_summary.data_id["instrument"], "HSC") + self.assertIsInstance(quantum_summary.data_id["detector"], int) + self.assertEqual(quantum_summary.data_id["visit"], 18202) + self.assertDictEqual( + quantum_summary.runs, {"HSC/runs/Prod/step1-i-attempt1": "FAILED"} + ) + self.assertIsInstance(quantum_summary.messages, list) + for message in quantum_summary.messages: + self.assertIsInstance(message, str) + self.assertTrue( + message.startswith("Execution of task '_mock_calibrate' on quantum") + ) + self.assertIn( + "Exception ValueError: Simulated failure: task=_mock_calibrate", message + ) + case "_mock_writePreSourceTable" | "_mock_transformPreSourceTable": + self.assertEqual(task_summary.n_successful, 30) + self.assertEqual(task_summary.n_blocked, 6) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) + case _: + self.assertEqual(task_summary.n_successful, 36) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) + + # Test datasets for the first QPG. + for dataset_type_name, dataset_summary in qg_1_sum.datasets.items(): + # For the expected failure + if dataset_summary.producer == "_mock_calibrate": + # A bit hard to read, but this is actually asserting that it's + # not empty + showing an error if it is. + + self.assertTrue( + dataset_summary.unsuccessful_datasets, + f"Expected failures were not stored as unsuccessful datasets for {dataset_type_name}.", + ) + # Check that the visible datasets = expected - (unsuccessful + # + predicted_only) + self.assertEqual( + dataset_summary.n_visible, + dataset_summary.n_expected + - dataset_summary.n_unsuccessful + - dataset_summary.n_predicted_only, + ) + # Check that the unsuccessful datasets are as expected + self.assertIsInstance(dataset_summary.unsuccessful_datasets, list) + self.assertEqual(dataset_summary.unsuccessful_datasets[0]["instrument"], "HSC") + self.assertEqual(dataset_summary.unsuccessful_datasets[0]["visit"], 18202) + self.assertEqual(dataset_summary.unsuccessful_datasets[0]["band"], "i") + self.assertEqual(dataset_summary.unsuccessful_datasets[0]["day_obs"], 20150117) + self.assertEqual( + dataset_summary.unsuccessful_datasets[0]["physical_filter"], + "HSC-I", + ) + # Check that there are the expected amount of failures + # and that they are not visible + self.assertEqual(len(dataset_summary.unsuccessful_datasets), 6) + self.assertEqual(dataset_summary.n_expected, 36) + self.assertEqual(dataset_summary.n_visible, 30) + self.assertEqual(dataset_summary.n_predicted_only, 0) + + # Check that all the counts add up for every task + self.assertEqual( + dataset_summary.n_expected, + sum( + [ + dataset_summary.n_visible, + dataset_summary.n_shadowed, + dataset_summary.n_predicted_only, + dataset_summary.n_cursed, + dataset_summary.n_unsuccessful, + ] + ), + ) + # Check that there are no cursed datasets + self.assertEqual(dataset_summary.n_cursed, 0) + self.assertListEqual(dataset_summary.cursed_datasets, []) + + # Make an overall QPG and add the recovery attempt to the QPG + qpg = QuantumProvenanceGraph() + qg_2 = helper.get_quantum_graph("step1", "i-attempt2") + # Quantum graphs are passed in order of execution; collections are + # passed in reverse order because the query in + # `QuantumProvenanceGraph.__resolve_duplicates` requires collections + # be passed with the most recent first. + qpg.assemble_quantum_provenance_graph( + helper.butler, + [qg_1, qg_2], + collections=["HSC/runs/Prod/step1-i-attempt2", "HSC/runs/Prod/step1-i-attempt1"], + where="instrument='HSC'", + ) + qg_sum = qpg.to_summary(helper.butler) + + for label, task_summary in qg_sum.tasks.items(): + self.assertEqual(task_summary.n_successful, 36) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_unknown, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_expected, 36) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertEqual( + task_summary.n_expected, + task_summary.n_successful + + task_summary.n_blocked + + task_summary.n_unknown + + task_summary.n_wonky + + task_summary.n_failed, + ) + if ( + label == "_mock_calibrate" + or label == "_mock_writePreSourceTable" + or label == "_mock_transformPreSourceTable" + ): + for quantum in task_summary.recovered_quanta: + self.assertEqual(quantum["instrument"], "HSC") + self.assertEqual(quantum["visit"], 18202) + else: + self.assertListEqual(task_summary.recovered_quanta, []) + + # Test datasets for the overall QPG. + # Check that we have the expected datasets + for dataset_summary in qg_sum.datasets.values(): + # Check counts: we should have recovered everything, so + # visible should equal expected for each dataset. + self.assertEqual( + dataset_summary.n_expected, + dataset_summary.n_visible, + ) + # Check that this is the expected number + self.assertEqual(dataset_summary.n_visible, 36) + # Check that they all add up + self.assertEqual( + dataset_summary.n_expected, + sum( + [ + dataset_summary.n_visible, + dataset_summary.n_shadowed, + dataset_summary.n_predicted_only, + dataset_summary.n_cursed, + dataset_summary.n_unsuccessful, + ] + ), + ) + # Check that there are no cursed or unsuccessful datasets + self.assertEqual(dataset_summary.n_cursed, 0) + self.assertListEqual(dataset_summary.cursed_datasets, []) + self.assertEqual(dataset_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_summary.unsuccessful_datasets, []) + + # Since we have recovered everything, we should have the same + # numbers for every task: + self.assertEqual(dataset_summary.n_expected, 36) + self.assertEqual(dataset_summary.n_visible, 36) + self.assertEqual(dataset_summary.n_shadowed, 0) + self.assertEqual(dataset_summary.n_predicted_only, 0) + + def test_step1_quantum_provenance_graph_qbb(self) -> None: + self.check_step1_qpg(self.qbb) if __name__ == "__main__": diff --git a/tests/test_rc2_outputs.py b/tests/test_rc2_outputs.py index 4dc57b9..4133191 100644 --- a/tests/test_rc2_outputs.py +++ b/tests/test_rc2_outputs.py @@ -23,6 +23,7 @@ from typing import ClassVar from lsst.ci.middleware.output_repo_tests import OutputRepoTests +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, UnsuccessfulQuantumSummary from lsst.pipe.base.tests.mocks import MockDataset, get_mock_name # (tract, patch, band): {input visits} for coadds produced here. @@ -148,9 +149,251 @@ def test_step5_rescue_direct(self) -> None: self.assertIn(get_mock_name("transformForcedSourceTable"), tasks_with_quanta_1) self.assertNotIn(get_mock_name("transformForcedSourceTable"), tasks_with_quanta_2) - def test_step8_rescue_qbb(self) -> None: + def test_step5_rescue_qbb(self) -> None: self.check_step5_rescue(self.qbb) + def check_step5_qpg(self, helper: OutputRepoTests) -> None: + """Check that the fail-and-recover attempts in step 5 are properly + diagnosed using the `QuantumProvenanceGraph`. + """ + # Make the quantum provenance graph for the first attempt + qg_1 = helper.get_quantum_graph("step5", "attempt1") + qpg1 = QuantumProvenanceGraph() + qpg1.assemble_quantum_provenance_graph( + helper.butler, [qg_1], collections=["HSC/runs/RC2/step5-attempt1"], where="instrument='HSC'" + ) + qg_1_sum = qpg1.to_summary(helper.butler) + + # Check that expected, wonky and not attempted do not occur throughout + # tasks: + for label, task_summary in qg_1_sum.tasks.items(): + self.assertEqual(task_summary.n_unknown, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + match label: + # Check that the failure was documented in expected ways: + case label if label in [ + "_mock_transformForcedSourceTable", + "_mock_drpAssociation", + "_mock_drpDiaCalculation", + "_mock_transformForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(task_summary.n_expected, 4) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_successful, 4) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.failed_quanta, []) + case "_mock_consolidateForcedSourceTable": + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_failed, 1) + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual( + task_summary.failed_quanta, + [ + UnsuccessfulQuantumSummary( + data_id={"instrument": "HSC", "skymap": "ci_mw", "tract": 0}, + runs={"HSC/runs/RC2/step5-attempt1": "FAILED"}, + messages=[ + "Execution of task '_mock_consolidateForcedSourceTable' on quantum " + "{instrument: 'HSC', skymap: 'ci_mw', tract: 0} failed. Exception " + "ValueError: Simulated failure: task=_mock_consolidateForcedSourceTable " + "dataId={instrument: 'HSC', skymap: 'ci_mw', tract: 0}" + ], + ) + ], + ) + case label if label in [ + "_mock_consolidateAssocDiaSourceTable", + "_mock_consolidateFullDiaObjectTable", + "_mock_consolidateForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_successful, 1) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.failed_quanta, []) + + case label if label in [ + "_mock_forcedPhotCcdOnDiaObjects", + "_mock_forcedPhotDiffOnDiaObjects", + "_mock_writeForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(task_summary.n_expected, 46) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_successful, 46) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.failed_quanta, []) + case _: + raise RuntimeError( + """Task summary contains unexpected + quanta. It is likely this test must be + updated to reflect the mocks.""" + ) + # Check on datasets + for dataset_type_name, dataset_type_summary in qg_1_sum.datasets.items(): + # We shouldn't run into predicted only, shadowed or cursed. + # Shadowed suggests that the dataset exists but is not included + # in the final collection; cursed suggests that the dataset is + # visible but unsuccessful. + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + match dataset_type_summary.producer: + # Check that the failure was documented in expected ways: + case label if label in [ + "_mock_transformForcedSourceTable", + "_mock_drpAssociation", + "_mock_drpDiaCalculation", + "_mock_transformForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(dataset_type_summary.n_visible, 4) + self.assertEqual(dataset_type_summary.n_expected, 4) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_type_summary.unsuccessful_datasets, []) + case "_mock_consolidateForcedSourceTable": + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + if dataset_type_name == "_mock_forcedSourceTable_tract": + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"skymap": "ci_mw", "tract": 0}], + ) + else: + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "HSC", "skymap": "ci_mw", "tract": 0}], + ) + case label if label in [ + "_mock_consolidateAssocDiaSourceTable", + "_mock_consolidateFullDiaObjectTable", + "_mock_consolidateForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(dataset_type_summary.n_visible, 1) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_type_summary.unsuccessful_datasets, []) + case label if label in [ + "_mock_forcedPhotCcdOnDiaObjects", + "_mock_forcedPhotDiffOnDiaObjects", + "_mock_writeForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(dataset_type_summary.n_visible, 46) + self.assertEqual(dataset_type_summary.n_expected, 46) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_type_summary.unsuccessful_datasets, []) + + # Now examine the quantum provenance graph after the recovery attempt + # has been made. + # Get a graph for the second attempt. + qg_2 = helper.get_quantum_graph("step5", "attempt2") + + # Check that if we correctly label a successful task whose data + # products do not make it into the output collection, the data products + # are marked as shadowed. + qpg_shadowed = QuantumProvenanceGraph() + qpg_shadowed.assemble_quantum_provenance_graph( + helper.butler, [qg_1, qg_2], collections=["HSC/runs/RC2/step5-attempt1"], where="instrument='HSC'" + ) + qg_shadowed_sum = qpg_shadowed.to_summary(helper.butler) + + for dataset_type_name, dataset_type_summary in qg_shadowed_sum.datasets.items(): + if dataset_type_summary.producer == "_mock_consolidateForcedSourceTable": + if dataset_type_name == "_mock_consolidateForcedSourceTable_log": + continue + else: + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 1) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + + # Make the quantum provenance graph across both attempts properly, to + # check that the recovery was correctly handled. + qpg2 = QuantumProvenanceGraph() + # Quantum graphs are passed in order of execution; collections are + # passed in reverse order because the query in + # `QuantumProvenanceGraph.__resolve_duplicates` requires collections + # be passed with the most recent first. + qpg2.assemble_quantum_provenance_graph( + helper.butler, + [qg_1, qg_2], + collections=["HSC/runs/RC2/step5-attempt2", "HSC/runs/RC2/step5-attempt1"], + where="instrument='HSC'", + ) + qg_2_sum = qpg2.to_summary(helper.butler) + + for label, task_summary in qg_2_sum.tasks.items(): + self.assertEqual(task_summary.n_unknown, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertListEqual(task_summary.wonky_quanta, []) + # There should be no failures, so we can say for all tasks: + self.assertEqual(task_summary.n_successful, task_summary.n_expected) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) + match label: + case label if label in [ + "_mock_transformForcedSourceTable", + "_mock_drpAssociation", + "_mock_drpDiaCalculation", + "_mock_transformForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(task_summary.n_expected, 4) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_successful, 4) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.failed_quanta, []) + self.assertEqual(task_summary.recovered_quanta, []) + # Check that the failure was recovered: + case "_mock_consolidateForcedSourceTable": + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_successful, 1) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual( + task_summary.recovered_quanta, + [{"instrument": "HSC", "skymap": "ci_mw", "tract": 0}], + ) + case label if label in [ + "_mock_consolidateAssocDiaSourceTable", + "_mock_consolidateFullDiaObjectTable", + "_mock_consolidateForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_successful, 1) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.failed_quanta, []) + + case label if label in [ + "_mock_forcedPhotCcdOnDiaObjects", + "_mock_forcedPhotDiffOnDiaObjects", + "_mock_writeForcedSourceOnDiaObjectTable", + ]: + self.assertEqual(task_summary.n_expected, 46) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_successful, 46) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.failed_quanta, []) + + # Check on datasets + for dataset_type_summary in qg_2_sum.datasets.values(): + # Check that all the data products are present and successful for + # all tasks. + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_type_summary.unsuccessful_datasets, []) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_visible, dataset_type_summary.n_expected) + + def test_step5_quantum_provenance_graph_qbb(self) -> None: + self.check_step5_qpg(self.qbb) + def test_fgcm_refcats(self) -> None: """Test that FGCM does not get refcats that don't overlap any of its inputs or outputs, despite not having a spatial data ID.