Skip to content

Commit

Permalink
Add tests which don't break for _mock_analyzeAmpOffsetMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Aug 19, 2024
1 parent 647861f commit 2d73e10
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 45 deletions.
24 changes: 8 additions & 16 deletions tests/test_prod_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

from lsst.ci.middleware.output_repo_tests import OutputRepoTests
from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary, TaskSummary, DatasetTypeSummary
from lsst.pipe.base.quantum_provenance_graph import (
QuantumProvenanceGraph,
)
from lsst.pipe.base.tests.mocks import get_mock_name

# (tract, patch, band): {input visits} for coadds produced here.
Expand Down Expand Up @@ -207,9 +209,7 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None:
helper.butler, collections=["HSC/runs/Prod/step1-i-attempt1"], where="instrument='HSC'"
)
qg_1_sum = qpg1.to_summary(helper.butler)
Summary.model_validate(qg_1_sum)

TaskSummary.model_validate(qg_1_sum.tasks)
# Loop through the tasks in the dict
for label, task_summary in qg_1_sum.tasks.items():
self.assertEqual(task_summary.n_not_attempted, 0)
Expand All @@ -234,7 +234,9 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None:
self.assertEqual(quantum_summary.data_id["instrument"], "HSC")
self.assertIsInstance(quantum_summary.data_id["detector"], int)
self.assertEqual(quantum_summary.data_id["visit"], 18202)
self.assertDictEqual(quantum_summary.runs, {"HSC/runs/Prod/step1-i-attempt1": "failed"})
self.assertDictEqual(
quantum_summary.runs, {"HSC/runs/Prod/step1-i-attempt1": "failed"}
)
self.assertIsInstance(quantum_summary.messages, list)
for message in quantum_summary.messages:
self.assertIsInstance(message, str)
Expand All @@ -257,8 +259,6 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None:
self.assertListEqual(task_summary.failed_quanta, [])

# Test datasets for the first QPG.
DatasetTypeSummary.model_validate(qg_1_sum.datasets)

for dataset_type_name, dataset_summary in qg_1_sum.datasets.items():
# For the expected failure
if dataset_summary.producer == "_mock_calibrate":
Expand All @@ -279,14 +279,10 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None:
)
# Check that the unsuccessful datasets are as expected
self.assertIsInstance(dataset_summary.unsuccessful_datasets, list)
self.assertEqual(
dataset_summary.unsuccessful_datasets[0]["instrument"], "HSC"
)
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["instrument"], "HSC")
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["visit"], 18202)
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["band"], "i")
self.assertEqual(
dataset_summary.unsuccessful_datasets[0]["day_obs"], 20150117
)
self.assertEqual(dataset_summary.unsuccessful_datasets[0]["day_obs"], 20150117)
self.assertEqual(
dataset_summary.unsuccessful_datasets[0]["physical_filter"],
"HSC-I",
Expand Down Expand Up @@ -325,9 +321,7 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None:
where="instrument='HSC'",
)
qg_sum = qpg.to_summary(helper.butler)
Summary.model_validate(qg_sum)

TaskSummary.model_validate(qg_sum.tasks)
for label, task_summary in qg_sum.tasks.items():
self.assertEqual(task_summary.n_successful, 36)
self.assertEqual(task_summary.n_blocked, 0)
Expand Down Expand Up @@ -358,8 +352,6 @@ def check_step1_qpg(self, helper: OutputRepoTests) -> None:

# Test datasets for the overall QPG.
# Check that we have the expected datasets
DatasetTypeSummary.model_validate(qg_sum.datasets)

for dataset_summary in qg_sum.datasets.values():
# Check counts: we should have recovered everything, so
# published should equal expected for each dataset.
Expand Down
79 changes: 50 additions & 29 deletions tests/test_rc2_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
from typing import ClassVar

from lsst.ci.middleware.output_repo_tests import OutputRepoTests
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary, TaskSummary, DatasetTypeSummary, UnsuccessfulQuantumSummary
from lsst.pipe.base.quantum_provenance_graph import (
QuantumProvenanceGraph,
UnsuccessfulQuantumSummary,
)
from lsst.pipe.base.tests.mocks import MockDataset, get_mock_name

# (tract, patch, band): {input visits} for coadds produced here.
Expand Down Expand Up @@ -164,7 +167,6 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
helper.butler, collections=["HSC/runs/RC2/step8-attempt1"], where="instrument='HSC'"
)
qg_1_sum = qpg1.to_summary(helper.butler)
Summary.model_validate(qg_1_sum)

# Check that expected, wonky and not attempted do not occur throughout
# tasks:
Expand All @@ -181,15 +183,17 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
self.assertEqual(task_summary.n_successful, 0)
self.assertEqual(
task_summary.failed_quanta,
[UnsuccessfulQuantumSummary(
data_id = {"skymap": "ci_mw", "tract": 0},
runs = {"HSC/runs/RC2/step8-attempt1": "failed"},
messages = [
"Execution of task '_mock_analyzeObjectTableCore' on quantum {skymap: "
"'ci_mw', tract: 0} failed. Exception ValueError: Simulated failure: "
"task=_mock_analyzeObjectTableCore dataId={skymap: 'ci_mw', tract: 0}"
]
)],
[
UnsuccessfulQuantumSummary(
data_id={"skymap": "ci_mw", "tract": 0},
runs={"HSC/runs/RC2/step8-attempt1": "failed"},
messages=[
"Execution of task '_mock_analyzeObjectTableCore' on quantum {skymap: "
"'ci_mw', tract: 0} failed. Exception ValueError: Simulated failure: "
"task=_mock_analyzeObjectTableCore dataId={skymap: 'ci_mw', tract: 0}"
],
)
],
)
self.assertEqual(task_summary.n_blocked, 0)
case _:
Expand All @@ -201,21 +205,37 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
# The next few if's are making sure we have the same
# number of expected and successful quanta. We could also
# just assert that n_expected == n_successful.
if label == "_mock_analyzeMatchedPreVisitCore" or label == "_mock_analyzeMatchedVisitCore":
if label in ["_mock_analyzeMatchedPreVisitCore", "_mock_analyzeMatchedVisitCore"]:
self.assertEqual(task_summary.n_expected, 4)
self.assertEqual(task_summary.n_successful, 4)
self.assertEqual(task_summary.n_blocked, 0)
elif label == "_mock_plotPropertyMapTract":
self.assertEqual(task_summary.n_expected, 2)
self.assertEqual(task_summary.n_successful, 2)
elif label in ["_mock_makeMetricTableObjectTableCore", "_mock_objectTableCoreWholeSkyPlot"]:
self.assertEqual(task_summary.n_blocked, 0)
elif label in [
"_mock_makeMetricTableObjectTableCore",
"_mock_objectTableCoreWholeSkyPlot",
]:
self.assertEqual(task_summary.n_blocked, 1)
self.assertEqual(task_summary.n_successful, 0)
else:
self.assertEqual(task_summary.n_expected, 1)
self.assertEqual(task_summary.n_successful, 1)
elif label == "_mock_analyzeAmpOffsetMetadata":
self.assertEqual(task_summary.n_expected, 60)
self.assertEqual(task_summary.n_successful, 60)
self.assertEqual(task_summary.n_blocked, 0)
else:
self.assertEqual(
task_summary.n_expected, 1, f"{label} had {task_summary.n_expected} tasks."
)
self.assertEqual(
task_summary.n_successful,
1,
f"{label} had {task_summary.n_successful} successful tasks.",
)
self.assertEqual(
task_summary.n_blocked, 0, f"{label} had {task_summary.n_blocked} blocked tasks."
)
# Check on datasets
DatasetTypeSummary.model_validate(qg_1_sum.datasets)
for dataset_type_summary in qg_1_sum.datasets.values():
# We shouldn't run into predicted only, unpublished or cursed.
# Unpublished suggests that the dataset exists but is not included
Expand All @@ -235,7 +255,10 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
dataset_type_summary.unsuccessful_datasets,
[{"skymap": "ci_mw", "tract": 0}],
)
case label if label in ["_mock_makeMetricTableObjectTableCore", "_mock_objectTableCoreWholeSkyPlot"]:
case label if label in [
"_mock_makeMetricTableObjectTableCore",
"_mock_objectTableCoreWholeSkyPlot",
]:
self.assertEqual(dataset_type_summary.n_unsuccessful, 1)
# These are the non-failed tasks:
case _:
Expand All @@ -250,6 +273,9 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
elif dataset_type_summary.producer == "_mock_plotPropertyMapTract":
self.assertEqual(dataset_type_summary.n_published, 2)
self.assertEqual(dataset_type_summary.n_expected, 2)
elif dataset_type_summary.producer == "_mock_analyzeAmpOffsetMetadata":
self.assertEqual(dataset_type_summary.n_published, 60)
self.assertEqual(dataset_type_summary.n_expected, 60)
else:
self.assertEqual(dataset_type_summary.n_published, 1)
self.assertEqual(dataset_type_summary.n_expected, 1)
Expand All @@ -272,10 +298,6 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:

qpg_u_sum = qpg_unpublished.to_summary(helper.butler)

Summary.model_validate(qpg_u_sum)
TaskSummary.model_validate(qpg_u_sum.tasks)
DatasetTypeSummary.model_validate(qpg_u_sum.datasets)

for dataset_type_name, dataset_type_summary in qpg_u_sum.datasets.items():
if dataset_type_summary.producer == "_mock_analyzeObjectTableCore":
if dataset_type_name == "_mock_analyzeObjectTableCore_log":
Expand All @@ -299,9 +321,7 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
where="instrument='HSC'",
)
qg_2_sum = qpg2.to_summary(helper.butler)
Summary.model_validate(qg_2_sum)

TaskSummary.model_validate(qg_2_sum.tasks)
for label, task_summary in qg_2_sum.tasks.items():
self.assertEqual(task_summary.n_not_attempted, 0)
self.assertEqual(task_summary.n_wonky, 0)
Expand All @@ -313,7 +333,11 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
self.assertListEqual(task_summary.failed_quanta, [])
match label:
# Check that the failure was recovered:
case label if label in ["_mock_analyzeObjectTableCore", "_mock_makeMetricTableObjectTableCore", "_mock_objectTableCoreWholeSkyPlot"]:
case label if label in [
"_mock_analyzeObjectTableCore",
"_mock_makeMetricTableObjectTableCore",
"_mock_objectTableCoreWholeSkyPlot",
]:
self.assertEqual(task_summary.n_expected, 1)
self.assertEqual(task_summary.n_successful, 1)
self.assertEqual(task_summary.n_blocked, 0)
Expand All @@ -331,7 +355,6 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
self.assertListEqual(task_summary.recovered_quanta, [])

# Check on datasets
DatasetTypeSummary.model_validate(qg_2_sum.datasets)
for dataset_type_summary in qg_2_sum.datasets.values():
# Check that all the data products are present and successful for
# all tasks.
Expand All @@ -341,9 +364,7 @@ def check_step8_qpg(self, helper: OutputRepoTests) -> None:
self.assertEqual(dataset_type_summary.n_unsuccessful, 0)
self.assertListEqual(dataset_type_summary.unsuccessful_datasets, [])
self.assertEqual(dataset_type_summary.n_unpublished, 0)
self.assertEqual(
dataset_type_summary.n_published, dataset_type_summary.n_expected
)
self.assertEqual(dataset_type_summary.n_published, dataset_type_summary.n_expected)

def test_step8_quantum_provenance_graph_qbb(self) -> None:
self.check_step8_qpg(self.qbb)
Expand Down

0 comments on commit 2d73e10

Please sign in to comment.