From 28e5fcfa4459e2e109c0a69ffbe7d06a2109ea7c Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 2 Aug 2024 16:54:37 -0700 Subject: [PATCH] Use Pydantic model validation in tests --- tests/test_prod_outputs.py | 328 ++++++++++++++----------------------- tests/test_rc2_outputs.py | 174 ++++++++++---------- 2 files changed, 213 insertions(+), 289 deletions(-) diff --git a/tests/test_prod_outputs.py b/tests/test_prod_outputs.py index fc11658..720864a 100644 --- a/tests/test_prod_outputs.py +++ b/tests/test_prod_outputs.py @@ -24,7 +24,7 @@ from lsst.ci.middleware.output_repo_tests import OutputRepoTests from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary, TaskSummary, DatasetTypeSummary from lsst.pipe.base.tests.mocks import get_mock_name # (tract, patch, band): {input visits} for coadds produced here. @@ -206,36 +206,37 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None: qpg1.resolve_duplicates( helper.butler, collections=["HSC/runs/Prod/step1-i-attempt1"], where="instrument='HSC'" ) - qg_1_sum_only = qpg1.to_summary(helper.butler) - qg_1_dict = qg_1_sum_only.model_dump() + qg_1_sum = qpg1.to_summary(helper.butler) + Summary.model_validate(qg_1_sum) + TaskSummary.model_validate(qg_1_sum.tasks) # Loop through the tasks in the dict - for task in qg_1_dict["tasks"]: - self.assertEqual(qg_1_dict["tasks"][task]["n_not_attempted"], 0) - self.assertEqual(qg_1_dict["tasks"][task]["n_wonky"], 0) - self.assertEqual(qg_1_dict["tasks"][task]["n_expected"], 36) - self.assertListEqual(qg_1_dict["tasks"][task]["wonky_quanta"], []) - self.assertListEqual(qg_1_dict["tasks"][task]["recovered_quanta"], []) + for label, task_summary in qg_1_sum.tasks.items(): + self.assertEqual(task_summary.n_not_attempted, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_expected, 36) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) self.assertEqual( - qg_1_dict["tasks"][task]["n_expected"], - qg_1_dict["tasks"][task]["n_successful"] - + qg_1_dict["tasks"][task]["n_blocked"] - + qg_1_dict["tasks"][task]["n_not_attempted"] - + qg_1_dict["tasks"][task]["n_wonky"] - + qg_1_dict["tasks"][task]["n_failed"], + task_summary.n_expected, + task_summary.n_successful + + task_summary.n_blocked + + task_summary.n_not_attempted + + task_summary.n_wonky + + task_summary.n_failed, ) - match task: + match label: case "_mock_calibrate": - self.assertEqual(qg_1_dict["tasks"][task]["n_successful"], 30) - self.assertEqual(qg_1_dict["tasks"][task]["n_blocked"], 0) - self.assertEqual(qg_1_dict["tasks"][task]["n_failed"], 6) - for quantum in qg_1_dict["tasks"][task]["failed_quanta"]: - self.assertEqual(quantum["data_id"]["instrument"], "HSC") - self.assertIsInstance(quantum["data_id"]["detector"], int) - self.assertEqual(quantum["data_id"]["visit"], 18202) - self.assertDictEqual(quantum["runs"], {"HSC/runs/Prod/step1-i-attempt1": "failed"}) - self.assertIsInstance(quantum["messages"], list) - for message in quantum["messages"]: + self.assertEqual(task_summary.n_successful, 30) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_failed, 6) + for quantum_summary in task_summary.failed_quanta: + self.assertEqual(quantum_summary.data_id["instrument"], "HSC") + self.assertIsInstance(quantum_summary.data_id["detector"], int) + self.assertEqual(quantum_summary.data_id["visit"], 18202) + self.assertDictEqual(quantum_summary.runs, {"HSC/runs/Prod/step1-i-attempt1": "failed"}) + self.assertIsInstance(quantum_summary.messages, list) + for message in quantum_summary.messages: self.assertIsInstance(message, str) self.assertTrue( message.startswith("Execution of task '_mock_calibrate' on quantum") @@ -244,112 +245,74 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None: "Exception ValueError: Simulated failure: task=_mock_calibrate", message ) case _: - if task == "_mock_writePreSourceTable" or task == "_mock_transformPreSourceTable": - self.assertEqual(qg_1_dict["tasks"][task]["n_successful"], 30) - self.assertEqual(qg_1_dict["tasks"][task]["n_blocked"], 6) - self.assertEqual(qg_1_dict["tasks"][task]["n_failed"], 0) - self.assertListEqual(qg_1_dict["tasks"][task]["failed_quanta"], []) + if label == "_mock_writePreSourceTable" or label == "_mock_transformPreSourceTable": + self.assertEqual(task_summary.n_successful, 30) + self.assertEqual(task_summary.n_blocked, 6) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) else: - self.assertEqual(qg_1_dict["tasks"][task]["n_successful"], 36) - self.assertEqual(qg_1_dict["tasks"][task]["n_blocked"], 0) - self.assertEqual(qg_1_dict["tasks"][task]["n_failed"], 0) - self.assertListEqual(qg_1_dict["tasks"][task]["failed_quanta"], []) + self.assertEqual(task_summary.n_successful, 36) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) # Test datasets for the first QPG. - datasets = [ - "_mock_postISRCCD", - "_mock_isr_metadata", - "_mock_isr_log", - "_mock_icExp", - "_mock_icSrc", - "_mock_icExpBackground", - "_mock_characterizeImage_metadata", - "_mock_characterizeImage_log", - "_mock_calexpBackground", - "_mock_srcMatch", - "_mock_calexp", - "_mock_src", - "_mock_srcMatchFull", - "_mock_calibrate_metadata", - "_mock_calibrate_log", - "_mock_preSource", - "_mock_writePreSourceTable_metadata", - "_mock_writePreSourceTable_log", - "_mock_preSourceTable", - "_mock_transformPreSourceTable_metadata", - "_mock_transformPreSourceTable_log", - ] - for dataset in datasets: - self.assertIn(dataset, qg_1_dict["datasets"].keys()) - for dataset in qg_1_dict["datasets"]: - self.assertEqual( - list(qg_1_dict["datasets"][dataset].keys()), - [ - "producer", - "n_published", - "n_unpublished", - "n_predicted_only", - "n_expected", - "cursed_datasets", - "unsuccessful_datasets", - "n_cursed", - "n_unsuccessful", - ], - ) - self.assertIsInstance(qg_1_dict["datasets"][dataset]["producer"], str) + DatasetTypeSummary.model_validate(qg_1_sum.datasets) + + for dataset_type_name, dataset_summary in qg_1_sum.datasets.items(): # For the expected failure - if qg_1_dict["datasets"][dataset]["producer"] == "_mock_calibrate": + if dataset_summary.producer == "_mock_calibrate": # A bit hard to read, but this is actually asserting that it's - # not empty. + # not empty + showing an error if it is. self.assertTrue( - qg_1_dict["datasets"][dataset]["unsuccessful_datasets"], - f"Expected failures were not stored as unsuccessful datasets for {dataset}.", + dataset_summary.unsuccessful_datasets, + f"Expected failures were not stored as unsuccessful datasets for {dataset_type_name}.", ) # Check that the published datasets = expected - (unsuccessful # + predicted_only) self.assertEqual( - qg_1_dict["datasets"][dataset]["n_published"], - qg_1_dict["datasets"][dataset]["n_expected"] - - qg_1_dict["datasets"][dataset]["n_unsuccessful"] - - qg_1_dict["datasets"][dataset]["n_predicted_only"], + dataset_summary.n_published, + dataset_summary.n_expected + - dataset_summary.n_unsuccessful + - dataset_summary.n_predicted_only, ) # Check that the unsuccessful datasets are as expected - self.assertIsInstance(qg_1_dict["datasets"][dataset]["unsuccessful_datasets"], list) + self.assertIsInstance(dataset_summary.unsuccessful_datasets, list) self.assertEqual( - qg_1_dict["datasets"][dataset]["unsuccessful_datasets"][0]["instrument"], "HSC" + dataset_summary.unsuccessful_datasets[0]["instrument"], "HSC" ) - self.assertEqual(qg_1_dict["datasets"][dataset]["unsuccessful_datasets"][0]["visit"], 18202) - self.assertEqual(qg_1_dict["datasets"][dataset]["unsuccessful_datasets"][0]["band"], "i") + self.assertEqual(dataset_summary.unsuccessful_datasets[0]["visit"], 18202) + self.assertEqual(dataset_summary.unsuccessful_datasets[0]["band"], "i") self.assertEqual( - qg_1_dict["datasets"][dataset]["unsuccessful_datasets"][0]["day_obs"], 20150117 + dataset_summary.unsuccessful_datasets[0]["day_obs"], 20150117 ) self.assertEqual( - qg_1_dict["datasets"][dataset]["unsuccessful_datasets"][0]["physical_filter"], + dataset_summary.unsuccessful_datasets[0]["physical_filter"], "HSC-I", ) # Check that there are the expected amount of failures # and that they are not published - self.assertEqual(len(qg_1_dict["datasets"][dataset]["unsuccessful_datasets"]), 6) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_expected"], 36) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_published"], 30) + self.assertEqual(len(dataset_summary.unsuccessful_datasets), 6) + self.assertEqual(dataset_summary.n_expected, 36) + self.assertEqual(dataset_summary.n_published, 30) # Check that all the counts add up for every task self.assertEqual( - qg_1_dict["datasets"][dataset]["n_expected"], + dataset_summary.n_expected, sum( [ - qg_1_dict["datasets"][dataset]["n_published"], - qg_1_dict["datasets"][dataset]["n_unpublished"], - qg_1_dict["datasets"][dataset]["n_predicted_only"], - qg_1_dict["datasets"][dataset]["n_cursed"], - qg_1_dict["datasets"][dataset]["n_unsuccessful"], + dataset_summary.n_published, + dataset_summary.n_unpublished, + dataset_summary.n_predicted_only, + dataset_summary.n_cursed, + dataset_summary.n_unsuccessful, ] ), ) # Check that there are no cursed datasets - self.assertEqual(qg_1_dict["datasets"][dataset]["n_cursed"], 0) - self.assertListEqual(qg_1_dict["datasets"][dataset]["cursed_datasets"], []) + self.assertEqual(dataset_summary.n_cursed, 0) + self.assertListEqual(dataset_summary.cursed_datasets, []) # Make an overall QPG and add the recovery attempt to the QPG qpg = QuantumProvenanceGraph() @@ -362,116 +325,75 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None: where="instrument='HSC'", ) qg_sum = qpg.to_summary(helper.butler) - - qg_2_dict = qg_sum.model_dump() - - for task in qg_2_dict["tasks"]: - self.assertEqual(qg_2_dict["tasks"][task]["n_successful"], 36) - self.assertEqual(qg_2_dict["tasks"][task]["n_blocked"], 0) - self.assertEqual(qg_2_dict["tasks"][task]["n_failed"], 0) - self.assertEqual(qg_2_dict["tasks"][task]["n_not_attempted"], 0) - self.assertEqual(qg_2_dict["tasks"][task]["n_wonky"], 0) - self.assertEqual(qg_2_dict["tasks"][task]["n_expected"], 36) - self.assertListEqual(qg_2_dict["tasks"][task]["wonky_quanta"], []) - self.assertListEqual(qg_2_dict["tasks"][task]["failed_quanta"], []) + Summary.model_validate(qg_sum) + + TaskSummary.model_validate(qg_sum.tasks) + for label, task_summary in qg_sum.tasks.items(): + self.assertEqual(task_summary.n_successful, 36) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_failed, 0) + self.assertEqual(task_summary.n_not_attempted, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_expected, 36) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertListEqual(task_summary.failed_quanta, []) self.assertEqual( - qg_2_dict["tasks"][task]["n_expected"], - qg_2_dict["tasks"][task]["n_successful"] - + qg_2_dict["tasks"][task]["n_blocked"] - + qg_2_dict["tasks"][task]["n_not_attempted"] - + qg_2_dict["tasks"][task]["n_wonky"] - + qg_2_dict["tasks"][task]["n_failed"], + task_summary.n_expected, + task_summary.n_successful + + task_summary.n_blocked + + task_summary.n_not_attempted + + task_summary.n_wonky + + task_summary.n_failed, ) if ( - task == "_mock_calibrate" - or task == "_mock_writePreSourceTable" - or task == "_mock_transformPreSourceTable" + label == "_mock_calibrate" + or label == "_mock_writePreSourceTable" + or label == "_mock_transformPreSourceTable" ): - for quantum in qg_2_dict["tasks"][task]["recovered_quanta"]: + for quantum in task_summary.recovered_quanta: self.assertEqual(quantum["instrument"], "HSC") self.assertEqual(quantum["visit"], 18202) else: - self.assertListEqual(qg_2_dict["tasks"][task]["recovered_quanta"], []) - - # Test datasets for the overall QPG. - # Check that we have the expected datasets - datasets = [ - "_mock_postISRCCD", - "_mock_isr_metadata", - "_mock_isr_log", - "_mock_icExp", - "_mock_icSrc", - "_mock_icExpBackground", - "_mock_characterizeImage_metadata", - "_mock_characterizeImage_log", - "_mock_calexpBackground", - "_mock_srcMatch", - "_mock_calexp", - "_mock_src", - "_mock_srcMatchFull", - "_mock_calibrate_metadata", - "_mock_calibrate_log", - "_mock_preSource", - "_mock_writePreSourceTable_metadata", - "_mock_writePreSourceTable_log", - "_mock_preSourceTable", - "_mock_transformPreSourceTable_metadata", - "_mock_transformPreSourceTable_log", - ] - for dataset in datasets: - self.assertIn(dataset, qg_2_dict["datasets"].keys()) - # Check that they are the same datasets - self.assertEqual(qg_2_dict["datasets"].keys(), qg_1_dict["datasets"].keys()) - for dataset in qg_2_dict["datasets"]: - # Check that each dataset has the same information - self.assertEqual( - list(qg_2_dict["datasets"][dataset].keys()), + self.assertListEqual(task_summary.recovered_quanta, []) + + # Test datasets for the overall QPG. + # Check that we have the expected datasets + DatasetTypeSummary.model_validate(qg_sum.datasets) + + for dataset_summary in qg_sum.datasets.values(): + # Check counts: we should have recovered everything, so + # published should equal expected for each dataset. + self.assertEqual( + dataset_summary.n_expected, + dataset_summary.n_published, + ) + # Check that this is the expected number + self.assertEqual(dataset_summary.n_published, 36) + # Check that they all add up + self.assertEqual( + dataset_summary.n_expected, + sum( [ - "producer", - "n_published", - "n_unpublished", - "n_predicted_only", - "n_expected", - "cursed_datasets", - "unsuccessful_datasets", - "n_cursed", - "n_unsuccessful", - ], - ) - self.assertIsInstance(qg_2_dict["datasets"][dataset]["producer"], str) - # Check counts: we should have recovered everything, so - # published should equal expected for each dataset. - self.assertEqual( - qg_2_dict["datasets"][dataset]["n_expected"], - qg_2_dict["datasets"][dataset]["n_published"], - ) - # Check that this is the expected number - self.assertEqual(qg_2_dict["datasets"][dataset]["n_published"], 36) - # Check that they all add up - self.assertEqual( - qg_2_dict["datasets"][dataset]["n_expected"], - sum( - [ - qg_2_dict["datasets"][dataset]["n_published"], - qg_2_dict["datasets"][dataset]["n_unpublished"], - qg_2_dict["datasets"][dataset]["n_predicted_only"], - qg_2_dict["datasets"][dataset]["n_cursed"], - qg_2_dict["datasets"][dataset]["n_unsuccessful"], - ] - ), - ) - # Check that there are no cursed or unsuccessful datasets - self.assertEqual(qg_2_dict["datasets"][dataset]["n_cursed"], 0) - self.assertListEqual(qg_2_dict["datasets"][dataset]["cursed_datasets"], []) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_unsuccessful"], 0) - self.assertListEqual(qg_2_dict["datasets"][dataset]["unsuccessful_datasets"], []) - - # Since we have recovered everything, we should have the same - # numbers for every task: - self.assertEqual(qg_2_dict["datasets"][dataset]["n_expected"], 36) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_published"], 36) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_unpublished"], 0) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_predicted_only"], 0) + dataset_summary.n_published, + dataset_summary.n_unpublished, + dataset_summary.n_predicted_only, + dataset_summary.n_cursed, + dataset_summary.n_unsuccessful, + ] + ), + ) + # Check that there are no cursed or unsuccessful datasets + self.assertEqual(dataset_summary.n_cursed, 0) + self.assertListEqual(dataset_summary.cursed_datasets, []) + self.assertEqual(dataset_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_summary.unsuccessful_datasets, []) + + # Since we have recovered everything, we should have the same + # numbers for every task: + self.assertEqual(dataset_summary.n_expected, 36) + self.assertEqual(dataset_summary.n_published, 36) + self.assertEqual(dataset_summary.n_unpublished, 0) + self.assertEqual(dataset_summary.n_predicted_only, 0) def test_step1_quantum_provenance_graph_qbb(self) -> None: self.check_step1_qpg(self.qbb) diff --git a/tests/test_rc2_outputs.py b/tests/test_rc2_outputs.py index 9ebe113..116d186 100644 --- a/tests/test_rc2_outputs.py +++ b/tests/test_rc2_outputs.py @@ -23,7 +23,7 @@ from typing import ClassVar from lsst.ci.middleware.output_repo_tests import OutputRepoTests -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary, TaskSummary, DatasetTypeSummary from lsst.pipe.base.tests.mocks import MockDataset, get_mock_name # (tract, patch, band): {input visits} for coadds produced here. @@ -163,25 +163,25 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None: qpg1.resolve_duplicates( helper.butler, collections=["HSC/runs/RC2/step8-attempt1"], where="instrument='HSC'" ) - qg_1_sum_only = qpg1.to_summary(helper.butler) - qg_1_dict = qg_1_sum_only.model_dump() + qg_1_sum = qpg1.to_summary(helper.butler) + Summary.model_validate(qg_1_sum) # Check that expected, wonky and not attempted do not occur throughout # tasks: - for task in qg_1_dict["tasks"]: - self.assertEqual(qg_1_dict["tasks"][task]["n_not_attempted"], 0) - self.assertEqual(qg_1_dict["tasks"][task]["n_wonky"], 0) - self.assertEqual(qg_1_dict["tasks"][task]["n_blocked"], 0) - self.assertListEqual(qg_1_dict["tasks"][task]["wonky_quanta"], []) - self.assertListEqual(qg_1_dict["tasks"][task]["recovered_quanta"], []) - match task: + for label, task_summary in qg_1_sum.tasks.items(): + self.assertEqual(task_summary.n_not_attempted, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + match label: # Check that the failure was documented in expected ways: case "_mock_analyzeObjectTableCore": - self.assertEqual(qg_1_dict["tasks"][task]["n_expected"], 1) - self.assertEqual(qg_1_dict["tasks"]["_mock_analyzeObjectTableCore"]["n_failed"], 1) - self.assertEqual(qg_1_dict["tasks"]["_mock_analyzeObjectTableCore"]["n_successful"], 0) + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_failed, 1) + self.assertEqual(task_summary.n_successful, 0) self.assertEqual( - qg_1_dict["tasks"]["_mock_analyzeObjectTableCore"]["failed_quanta"], + task_summary.failed_quanta, [ { "data_id": {"skymap": "ci_mw", "tract": 0}, @@ -194,63 +194,62 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None: } ], ) - self.assertEqual(qg_1_dict["tasks"]["_mock_analyzeObjectTableCore"]["n_blocked"], 0) + self.assertEqual(task_summary.n_blocked, 0) case _: # If it's not the failed task, there should be no failures - self.assertEqual(qg_1_dict["tasks"][task]["n_failed"], 0) - self.assertListEqual(qg_1_dict["tasks"][task]["failed_quanta"], []) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) # We also shouldn't have had anything to recover - self.assertListEqual(qg_1_dict["tasks"][task]["recovered_quanta"], []) + self.assertListEqual(task_summary.recovered_quanta, []) # The next few if's are making sure we have the same # number of expected and successful quanta. We could also # just assert that n_expected == n_successful. - if task == "_mock_analyzeMatchedPreVisitCore" or task == "_mock_analyzeMatchedVisitCore": - self.assertEqual(qg_1_dict["tasks"][task]["n_expected"], 4) - self.assertEqual(qg_1_dict["tasks"][task]["n_successful"], 4) - elif task == "_mock_plotPropertyMapTract": - self.assertEqual(qg_1_dict["tasks"][task]["n_expected"], 2) - self.assertEqual(qg_1_dict["tasks"][task]["n_successful"], 2) + if label == "_mock_analyzeMatchedPreVisitCore" or label == "_mock_analyzeMatchedVisitCore": + self.assertEqual(task_summary.n_expected, 4) + self.assertEqual(task_summary.n_successful, 4) + elif label == "_mock_plotPropertyMapTract": + self.assertEqual(task_summary.n_expected, 2) + self.assertEqual(task_summary.n_successful, 2) else: - self.assertEqual(qg_1_dict["tasks"][task]["n_expected"], 1) - self.assertEqual(qg_1_dict["tasks"][task]["n_successful"], 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_successful, 1) # Check on datasets - # This used to be a self.assertIn but the list was annoyingly long. - self.assertEqual(len(qg_1_dict["datasets"].keys()), 218) - for dataset in qg_1_dict["datasets"]: + DatasetTypeSummary.model_validate(qg_1_sum.datasets) + for dataset_type_summary in qg_1_sum.datasets.values(): # We shouldn't run into predicted only, unpublished or cursed. # Unpublished suggests that the dataset exists but is not included # in the final collection; cursed suggests that the dataset is # published but unsuccessful. - self.assertEqual(qg_1_dict["datasets"][dataset]["n_predicted_only"], 0) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_unpublished"], 0) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_cursed"], 0) - self.assertListEqual(qg_1_dict["datasets"][dataset]["cursed_datasets"], []) - match qg_1_dict["datasets"][dataset]["producer"]: + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + match dataset_type_summary.producer: # Check that the failure was documented in expected ways: case "_mock_analyzeObjectTableCore": - self.assertEqual(qg_1_dict["datasets"][dataset]["n_published"], 0) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_expected"], 1) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_unsuccessful"], 1) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) self.assertListEqual( - qg_1_dict["datasets"][dataset]["unsuccessful_datasets"], + dataset_type_summary.unsuccessful_datasets, [{"skymap": "ci_mw", "tract": 0}], ) # These are the non-failed tasks: case _: - self.assertEqual(qg_1_dict["datasets"][dataset]["n_unsuccessful"], 0) - self.assertListEqual(qg_1_dict["datasets"][dataset]["unsuccessful_datasets"], []) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_type_summary.unsuccessful_datasets, []) if ( - qg_1_dict["datasets"][dataset]["producer"] == "_mock_analyzeMatchedPreVisitCore" - or qg_1_dict["datasets"][dataset]["producer"] == "_mock_analyzeMatchedVisitCore" + dataset_type_summary.producer == "_mock_analyzeMatchedPreVisitCore" + or dataset_type_summary.producer == "_mock_analyzeMatchedVisitCore" ): - self.assertEqual(qg_1_dict["datasets"][dataset]["n_published"], 4) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_expected"], 4) - elif qg_1_dict["datasets"][dataset]["producer"] == "_mock_plotPropertyMapTract": - self.assertEqual(qg_1_dict["datasets"][dataset]["n_published"], 2) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_expected"], 2) + self.assertEqual(dataset_type_summary.n_published, 4) + self.assertEqual(dataset_type_summary.n_expected, 4) + elif dataset_type_summary.producer == "_mock_plotPropertyMapTract": + self.assertEqual(dataset_type_summary.n_published, 2) + self.assertEqual(dataset_type_summary.n_expected, 2) else: - self.assertEqual(qg_1_dict["datasets"][dataset]["n_published"], 1) - self.assertEqual(qg_1_dict["datasets"][dataset]["n_expected"], 1) + self.assertEqual(dataset_type_summary.n_published, 1) + self.assertEqual(dataset_type_summary.n_expected, 1) # Now examine the quantum provenance graph after the recovery attempt # has been made. @@ -269,19 +268,22 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None: ) qpg_u_sum = qpg_unpublished.to_summary(helper.butler) - qpg_u = qpg_u_sum.model_dump() - for dataset in qpg_u["datasets"]: - if qpg_u["datasets"][dataset]["producer"] == "_mock_analyzeObjectTableCore": - if dataset == "_mock_analyzeObjectTableCore_log": + Summary.model_validate(qpg_u_sum) + TaskSummary.model_validate(qpg_u_sum.tasks) + DatasetTypeSummary.model_validate(qpg_u_sum.datasets) + + for dataset_type_name, dataset_type_summary in qpg_u_sum.datasets.items(): + if dataset_type_summary.producer == "_mock_analyzeObjectTableCore": + if dataset_type_name == "_mock_analyzeObjectTableCore_log": continue else: - self.assertEqual(qpg_u["datasets"][dataset]["n_published"], 0) - self.assertEqual(qpg_u["datasets"][dataset]["n_unpublished"], 1) - self.assertEqual(qpg_u["datasets"][dataset]["n_expected"], 1) - self.assertEqual(qpg_u["datasets"][dataset]["n_cursed"], 0) - self.assertEqual(qpg_u["datasets"][dataset]["n_predicted_only"], 0) - self.assertEqual(qpg_u["datasets"][dataset]["n_unsuccessful"], 0) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 1) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) # Now for verifying the recovery properly -- the graph below is made # as intended. @@ -293,45 +295,45 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None: collections=["HSC/runs/RC2/step8-attempt2", "HSC/runs/RC2/step8-attempt1"], where="instrument='HSC'", ) - qg_2_sum_only = qpg2.to_summary(helper.butler) - qg_2_dict = qg_2_sum_only.model_dump() - - for task in qg_2_dict["tasks"]: - self.assertEqual(qg_2_dict["tasks"][task]["n_not_attempted"], 0) - self.assertEqual(qg_2_dict["tasks"][task]["n_wonky"], 0) - self.assertEqual(qg_2_dict["tasks"][task]["n_blocked"], 0) - self.assertListEqual(qg_2_dict["tasks"][task]["wonky_quanta"], []) + qg_2_sum = qpg2.to_summary(helper.butler) + Summary.model_validate(qg_2_sum) + + TaskSummary.model_validate(qg_2_sum.tasks) + for label, task_summary in qg_2_sum.tasks.items(): + self.assertEqual(task_summary.n_not_attempted, 0) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertListEqual(task_summary.wonky_quanta, []) # There should be no failures, so we can say for all tasks: - self.assertEqual(qg_2_dict["tasks"][task]["n_successful"], qg_2_dict["tasks"][task]["n_expected"]) - self.assertEqual(qg_2_dict["tasks"][task]["n_failed"], 0) - self.assertListEqual(qg_2_dict["tasks"][task]["failed_quanta"], []) - match task: + self.assertEqual(task_summary.n_successful, task_summary.n_expected) + self.assertEqual(task_summary.n_failed, 0) + self.assertListEqual(task_summary.failed_quanta, []) + match label: # Check that the failure was recovered: case "_mock_analyzeObjectTableCore": - self.assertEqual(qg_2_dict["tasks"][task]["n_expected"], 1) - self.assertEqual(qg_2_dict["tasks"]["_mock_analyzeObjectTableCore"]["n_successful"], 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertEqual(task_summary.n_successful, 1) self.assertEqual( - qg_2_dict["tasks"]["_mock_analyzeObjectTableCore"]["recovered_quanta"], + task_summary.recovered_quanta, [{"skymap": "ci_mw", "tract": 0}], ) - self.assertEqual(qg_2_dict["tasks"]["_mock_analyzeObjectTableCore"]["n_blocked"], 0) + self.assertEqual(task_summary.n_blocked, 0) case _: - self.assertListEqual(qg_2_dict["tasks"][task]["recovered_quanta"], []) + self.assertListEqual(task_summary.recovered_quanta, []) # Check on datasets - # This used to be a self.assertIn but the list was annoyingly long. - self.assertEqual(len(qg_2_dict["datasets"].keys()), 218) - for dataset in qg_2_dict["datasets"]: + DatasetTypeSummary.model_validate(qg_2_sum.datasets) + for dataset_type_summary in qg_2_sum.datasets.values(): # Check that all the data products are present and successful for # all tasks. - self.assertEqual(qg_2_dict["datasets"][dataset]["n_predicted_only"], 0) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_cursed"], 0) - self.assertListEqual(qg_2_dict["datasets"][dataset]["cursed_datasets"], []) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_unsuccessful"], 0) - self.assertListEqual(qg_2_dict["datasets"][dataset]["unsuccessful_datasets"], []) - self.assertEqual(qg_2_dict["datasets"][dataset]["n_unpublished"], 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertEqual(dataset_type_summary.n_unsuccessful, 0) + self.assertListEqual(dataset_type_summary.unsuccessful_datasets, []) + self.assertEqual(dataset_type_summary.n_unpublished, 0) self.assertEqual( - qg_2_dict["datasets"][dataset]["n_published"], qg_2_dict["datasets"][dataset]["n_expected"] + dataset_type_summary.n_published, dataset_type_summary.n_expected ) def test_step8_quantum_provenance_graph_qbb(self) -> None: