Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41711: Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs #23

Merged
merged 10 commits into from
Sep 23, 2024
1 change: 1 addition & 0 deletions doc/changes/DM-41711.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add tests for the `QuantumProvenanceGraph`.
205 changes: 201 additions & 4 deletions tests/test_prod_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from lsst.ci.middleware.output_repo_tests import OutputRepoTests
from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph
from lsst.pipe.base.tests.mocks import get_mock_name

# (tract, patch, band): {input visits} for coadds produced here.
Expand Down Expand Up @@ -129,9 +130,9 @@ def test_property_set_metadata_direct(self) -> None:
def test_property_set_metadata_qbb(self) -> None:
self.qbb.check_property_set_metadata(self)

def check_step1_manifest_checker(self, helper: OutputRepoTests) -> None:
def check_step1_execution_reports(self, helper: OutputRepoTests) -> None:
"""Test that the fail-and-recover attempts in step1 worked as expected
using the manifest checker.
using the `QuantumGraphExecutionReport`.
"""

# This task should have failed in attempt1 and should have been
Expand Down Expand Up @@ -190,8 +191,204 @@ def check_step1_manifest_checker(self, helper: OutputRepoTests) -> None:
hr_summary_2 = report_2.to_summary_dict(helper.butler, human_readable=True)
self.assertEqual(hr_summary_2["_mock_calibrate"]["failed_quanta"], [])

def test_step1_manifest_checker_qbb(self) -> None:
self.check_step1_manifest_checker(self.qbb)
def test_step1_execution_reports_qbb(self) -> None:
self.check_step1_execution_reports(self.qbb)

def check_step1_qpg(self, helper: OutputRepoTests) -> None:
"""Test that the fail-and-recover attempts in step1 worked as expected
over each attempt, using the `QuantumProvenanceGraph`.
"""

# Make the quantum provenance graph for the first attempt
qg_1 = helper.get_quantum_graph("step1", "i-attempt1")
qpg1 = QuantumProvenanceGraph()
qpg1.assemble_quantum_provenance_graph(
helper.butler, [qg_1], collections=["HSC/runs/Prod/step1-i-attempt1"], where="instrument='HSC'"
)
qg_1_sum = qpg1.to_summary(helper.butler)

# Loop through the tasks in the dict
for label, task_summary in qg_1_sum.tasks.items():
self.assertEqual(task_summary.n_unknown, 0)
self.assertEqual(task_summary.n_wonky, 0)
self.assertEqual(task_summary.n_expected, 36)
self.assertListEqual(task_summary.wonky_quanta, [])
self.assertListEqual(task_summary.recovered_quanta, [])
self.assertEqual(
task_summary.n_expected,
task_summary.n_successful
+ task_summary.n_blocked
+ task_summary.n_unknown
+ task_summary.n_wonky
+ task_summary.n_failed,
)
match label:
case "_mock_calibrate":
self.assertEqual(task_summary.n_successful, 30)
self.assertEqual(task_summary.n_blocked, 0)
self.assertEqual(task_summary.n_failed, 6)
for quantum_summary in task_summary.failed_quanta:
self.assertEqual(quantum_summary.data_id["instrument"], "HSC")
self.assertIsInstance(quantum_summary.data_id["detector"], int)
self.assertEqual(quantum_summary.data_id["visit"], 18202)
self.assertDictEqual(
quantum_summary.runs, {"HSC/runs/Prod/step1-i-attempt1": "FAILED"}
)
self.assertIsInstance(quantum_summary.messages, list)
for message in quantum_summary.messages:
self.assertIsInstance(message, str)
self.assertTrue(
message.startswith("Execution of task '_mock_calibrate' on quantum")
)
self.assertIn(
"Exception ValueError: Simulated failure: task=_mock_calibrate", message
)
case "_mock_writePreSourceTable" | "_mock_transformPreSourceTable":
self.assertEqual(task_summary.n_successful, 30)
self.assertEqual(task_summary.n_blocked, 6)
self.assertEqual(task_summary.n_failed, 0)
self.assertListEqual(task_summary.failed_quanta, [])
case _:
self.assertEqual(task_summary.n_successful, 36)
self.assertEqual(task_summary.n_blocked, 0)
self.assertEqual(task_summary.n_failed, 0)
self.assertListEqual(task_summary.failed_quanta, [])

# Test datasets for the first QPG.
for dataset_type_name, dataset_summary in qg_1_sum.datasets.items():
# For the expected failure
if dataset_summary.producer == "_mock_calibrate":
# A bit hard to read, but this is actually asserting that it's
# not empty + showing an error if it is.

self.assertTrue(
dataset_summary.unsuccessful_datasets,
f"Expected failures were not stored as unsuccessful datasets for {dataset_type_name}.",
)
# Check that the visible datasets = expected - (unsuccessful
# + predicted_only)
self.assertEqual(
dataset_summary.n_visible,
dataset_summary.n_expected
- dataset_summary.n_unsuccessful
- dataset_summary.n_predicted_only,
)
# Check that the unsuccessful datasets are as expected
self.assertIsInstance(dataset_summary.unsuccessful_datasets, list)
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["instrument"], "HSC")
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["visit"], 18202)
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["band"], "i")
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["day_obs"], 20150117)
self.assertEqual(
dataset_summary.unsuccessful_datasets[0]["physical_filter"],
"HSC-I",
)
# Check that there are the expected amount of failures
# and that they are not visible
self.assertEqual(len(dataset_summary.unsuccessful_datasets), 6)
self.assertEqual(dataset_summary.n_expected, 36)
self.assertEqual(dataset_summary.n_visible, 30)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an explicit check for dataset_summary.n_predicted_only and drop the above assertEqual on line 271. Since checking explicit values, it seems overkill to also check that the counts add up correctly (unless worried somebody will make a typo in the explicit values).

self.assertEqual(dataset_summary.n_predicted_only, 0)

# Check that all the counts add up for every task
self.assertEqual(
dataset_summary.n_expected,
sum(
[
dataset_summary.n_visible,
dataset_summary.n_shadowed,
dataset_summary.n_predicted_only,
dataset_summary.n_cursed,
dataset_summary.n_unsuccessful,
]
),
)
# Check that there are no cursed datasets
self.assertEqual(dataset_summary.n_cursed, 0)
self.assertListEqual(dataset_summary.cursed_datasets, [])

# Make an overall QPG and add the recovery attempt to the QPG
qpg = QuantumProvenanceGraph()
qg_2 = helper.get_quantum_graph("step1", "i-attempt2")
# Quantum graphs are passed in order of execution; collections are
# passed in reverse order because the query in
# `QuantumProvenanceGraph.__resolve_duplicates` requires collections
# be passed with the most recent first.
qpg.assemble_quantum_provenance_graph(
helper.butler,
[qg_1, qg_2],
collections=["HSC/runs/Prod/step1-i-attempt2", "HSC/runs/Prod/step1-i-attempt1"],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps add a comment explaining order of quantum graphs vs order of collections.

where="instrument='HSC'",
)
qg_sum = qpg.to_summary(helper.butler)

for label, task_summary in qg_sum.tasks.items():
self.assertEqual(task_summary.n_successful, 36)
self.assertEqual(task_summary.n_blocked, 0)
self.assertEqual(task_summary.n_failed, 0)
self.assertEqual(task_summary.n_unknown, 0)
self.assertEqual(task_summary.n_wonky, 0)
self.assertEqual(task_summary.n_expected, 36)
self.assertListEqual(task_summary.wonky_quanta, [])
self.assertListEqual(task_summary.failed_quanta, [])
self.assertEqual(
task_summary.n_expected,
task_summary.n_successful
+ task_summary.n_blocked
+ task_summary.n_unknown
+ task_summary.n_wonky
+ task_summary.n_failed,
)
if (
label == "_mock_calibrate"
or label == "_mock_writePreSourceTable"
or label == "_mock_transformPreSourceTable"
):
for quantum in task_summary.recovered_quanta:
self.assertEqual(quantum["instrument"], "HSC")
self.assertEqual(quantum["visit"], 18202)
else:
self.assertListEqual(task_summary.recovered_quanta, [])

# Test datasets for the overall QPG.
# Check that we have the expected datasets
for dataset_summary in qg_sum.datasets.values():
# Check counts: we should have recovered everything, so
# visible should equal expected for each dataset.
self.assertEqual(
dataset_summary.n_expected,
dataset_summary.n_visible,
)
# Check that this is the expected number
self.assertEqual(dataset_summary.n_visible, 36)
# Check that they all add up
self.assertEqual(
dataset_summary.n_expected,
sum(
[
dataset_summary.n_visible,
dataset_summary.n_shadowed,
dataset_summary.n_predicted_only,
dataset_summary.n_cursed,
dataset_summary.n_unsuccessful,
]
),
)
# Check that there are no cursed or unsuccessful datasets
self.assertEqual(dataset_summary.n_cursed, 0)
self.assertListEqual(dataset_summary.cursed_datasets, [])
self.assertEqual(dataset_summary.n_unsuccessful, 0)
self.assertListEqual(dataset_summary.unsuccessful_datasets, [])

# Since we have recovered everything, we should have the same
# numbers for every task:
self.assertEqual(dataset_summary.n_expected, 36)
self.assertEqual(dataset_summary.n_visible, 36)
self.assertEqual(dataset_summary.n_shadowed, 0)
self.assertEqual(dataset_summary.n_predicted_only, 0)

def test_step1_quantum_provenance_graph_qbb(self) -> None:
self.check_step1_qpg(self.qbb)


if __name__ == "__main__":
Expand Down
Loading
Loading