Skip to content

Commit

Permalink
Add documentation for DM-41711
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Aug 1, 2024
1 parent 475ff72 commit 9f4bd4c
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 116 deletions.
6 changes: 6 additions & 0 deletions doc/changes/DM-41711.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs.

Update the `pipetask report` command-line interface to accommodate the new
`QuantumProvenanceGraph`. This allows for resolving outcomes of processing over
multiple attempts (and associated graphs) over the same dataquery, providing
information on recoveries, persistent issues and mismatch errors.
37 changes: 29 additions & 8 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,21 +341,34 @@ def update_graph_run(
@click.command(cls=PipetaskCommand)
@repo_argument()
@click.argument("qgraphs", nargs=-1)
@click.option("--collections", default=None, help="Collections to resolve duplicate datasets in.")
@click.option("--where", default="", help="where")
@click.option("--full-output-filename", default="", help="Output report as a yaml file with this name.")
@click.option(
"--collections",
default=None,
help="Collection(s) associated with said graphs/processing."
" For use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful.",
)
@click.option("--where", default="", help="A 'where' string to use to constrain the collections, if passed.")
@click.option(
"--full-output-filename",
default="",
help="Output report as a file with this name. "
"For pipetask report on one graph, this should be a yaml file. For multiple graphs "
"or when using the --force-v2 option, this should be a json file. We will be "
"deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.",
)
@click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.")
@click.option(
"--brief",
default=False,
is_flag=True,
help="Only show counts in report for brief " "summary (no error information",
help="Only show counts in report (a brief summary). Note that counts are"
" also printed to the screen when using the --full-output-filename option.",
)
@click.option(
"--curse-failed-logs",
is_flag=True,
default=False,
help="If log datasets are missing in v2, mark them as cursed",
help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed",
)
@click.option(
"--force-v2",
Expand All @@ -375,12 +388,20 @@ def report(
curse_failed_logs: bool = False,
force_v2: bool = False,
) -> None:
"""Write a yaml file summarizing the produced and missing expected datasets
in a quantum graph.
"""Summarize the state of executed quantum graph(s), with counts of failed,
successful and expected quanta, as well as counts of output datasets and
their publish states. Analyze one or more attempts at the same
processing on the same dataquery-identified "group" and resolve recoveries
and persistent failures. Identify mismatch errors between groups.
Save the report as a file (`--full-output-filename`) or print it to stdout
(default). If the terminal is overwhelmed with data_ids from failures try
the `--brief` option.
REPO is the location of the butler/registry config file.
QGRAPH is the URL to a serialized Quantum Graph file.
QGRAPHS is a Sequence of links to serialized Quantum Graphs which have been
executed and are to be analyzed.
"""
if force_v2 or len(qgraphs) > 1 or collections is not None:
script.report_v2(
Expand Down
253 changes: 146 additions & 107 deletions python/lsst/ctrl/mpexec/cli/script/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def report(
logs: bool = True,
brief: bool = False,
) -> None:
"""Summarize the produced and missing expected dataset in a quantum graph.
"""Summarize the produced, missing and expected quanta and
datasets belonging to an executed quantum graph.
Parameters
----------
Expand Down Expand Up @@ -124,7 +125,48 @@ def report_v2(
brief: bool = False,
curse_failed_logs: bool = False,
) -> None:
"""Docstring."""
"""Summarize the state of executed quantum graph(s), with counts of failed,
successful and expected quanta, as well as counts of output datasets and
their publish states. Analyze one or more attempts at the same
processing on the same dataquery-identified "group" and resolve recoveries
and persistent failures. Identify mismatch errors between groups.
Parameters
----------
butler_config : `str`
The Butler used for this report. This should match the Butler used
for the run associated with the executed quantum graph.
qgraph_uris : `Sequence[str]`
One or more uris to the serialized Quantum Graph(s).
collections : `Sequence[str] | None`
Collection(s) associated with said graphs/processing. For use in
`lsst.daf.butler.registry.queryDatasets` if paring down the query
would be useful.
where : `str`
A "where" string to use to constrain the collections, if passed.
full_output_filename : `str`
Output the full pydantic model `QuantumProvenanceGraph.Summary`
object into a JSON file. This is ideal for error-matching and
cataloguing tools such as the ones used by Campaign Management
software and pilots, and for searching and counting specific kinds
or instances of failures. This option will also print a "brief"
(counts-only) summary to stdout.
logs : `bool`
Store error messages from Butler logs associated with failed quanta
if `True`.
brief : `bool`
Only display short (counts-only) summary on stdout. This includes
counts and not error messages or data_ids (similar to BPS report).
This option will still report all `cursed` datasets and `wonky`
quanta.
curse_failed_logs : `bool`
Mark log datasets as `cursed` if they are published in the final
output collection. Note that a campaign-level collection must be
used here for `collections` if `curse_failed_logs` is `True`; if
`resolve_duplicates` is run on a list of group-level collections
then each will show logs from their own failures as published
the datasets will show as cursed regardless of this flag.
"""
butler = Butler.from_config(butler_config, writeable=False)
qpg = QuantumProvenanceGraph()
output_runs = []
Expand All @@ -140,116 +182,113 @@ def report_v2(
)
summary = qpg.to_summary(butler, do_store_logs=logs)
summary_dict = summary.model_dump()
if full_output_filename:
with open(full_output_filename, "w") as stream:
stream.write(summary.model_dump_json(indent=2))
else:
quanta_table = []
failed_quanta_table = []
wonky_quanta_table = []
for task in summary_dict["tasks"].keys():
if summary_dict["tasks"][task]["n_wonky"] > 0:
print(
f"{task} has produced wonky quanta. Recommend processing"
"cease until the issue is resolved."
)
j = 0
for data_id in summary_dict["tasks"][task]["wonky_quanta"]:
wonky_quanta_table.append(
{
"Task": task,
"Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"],
"Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"],
}
)
j += 1
quanta_table.append(
{
"Task": task,
"Not Attempted": summary_dict["tasks"][task]["n_not_attempted"],
"Successful": summary_dict["tasks"][task]["n_successful"],
"Blocked": summary_dict["tasks"][task]["n_blocked"],
"Failed": summary_dict["tasks"][task]["n_failed"],
"Wonky": summary_dict["tasks"][task]["n_wonky"],
"TOTAL": sum(
[
summary_dict["tasks"][task]["n_successful"],
summary_dict["tasks"][task]["n_not_attempted"],
summary_dict["tasks"][task]["n_blocked"],
summary_dict["tasks"][task]["n_failed"],
summary_dict["tasks"][task]["n_wonky"],
]
),
"EXPECTED": summary_dict["tasks"][task]["n_expected"],
}
)
if summary_dict["tasks"][task]["failed_quanta"]:
i = 0
for data_id in summary_dict["tasks"][task]["failed_quanta"]:
failed_quanta_table.append(
{
"Task": task,
"Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"],
"Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"],
}
)
i += 1
quanta = Table(quanta_table)
quanta.pprint_all()
# Dataset loop
dataset_table = []
cursed_datasets = []
unsuccessful_datasets = {}
for dataset in summary_dict["datasets"]:
dataset_table.append(
{
"Dataset": dataset,
"Published": summary_dict["datasets"][dataset]["n_published"],
"Unpublished": summary_dict["datasets"][dataset]["n_unpublished"],
"Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"],
"Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"],
"Cursed": summary_dict["datasets"][dataset]["n_cursed"],
"TOTAL": sum(
[
summary_dict["datasets"][dataset]["n_published"],
summary_dict["datasets"][dataset]["n_unpublished"],
summary_dict["datasets"][dataset]["n_predicted_only"],
summary_dict["datasets"][dataset]["n_unsuccessful"],
summary_dict["datasets"][dataset]["n_cursed"],
]
),
"EXPECTED": summary_dict["datasets"][dataset]["n_expected"],
}
quanta_table = []
failed_quanta_table = []
wonky_quanta_table = []
for task in summary_dict["tasks"].keys():
if summary_dict["tasks"][task]["n_wonky"] > 0:
print(

Check warning on line 190 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L190

Added line #L190 was not covered by tests
f"{task} has produced wonky quanta. Recommend processing" "cease until the issue is resolved."
)
if summary_dict["datasets"][dataset]["n_cursed"] > 0:
print(
f"{dataset} has cursed quanta with message(s) "
"{summary_dict[task]['cursed_datasets']['messages']}. "
"Recommend processing cease until the issue is resolved."
j = 0

Check warning on line 193 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L193

Added line #L193 was not covered by tests
for data_id in summary_dict["tasks"][task]["wonky_quanta"]:
wonky_quanta_table.append(

Check warning on line 195 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L195

Added line #L195 was not covered by tests
{
"Task": task,
"Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"],
"Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"],
}
)
cursed_datasets.append(
j += 1

Check warning on line 203 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L203

Added line #L203 was not covered by tests
quanta_table.append(
{
"Task": task,
"Not Attempted": summary_dict["tasks"][task]["n_not_attempted"],
"Successful": summary_dict["tasks"][task]["n_successful"],
"Blocked": summary_dict["tasks"][task]["n_blocked"],
"Failed": summary_dict["tasks"][task]["n_failed"],
"Wonky": summary_dict["tasks"][task]["n_wonky"],
"TOTAL": sum(
[
summary_dict["tasks"][task]["n_successful"],
summary_dict["tasks"][task]["n_not_attempted"],
summary_dict["tasks"][task]["n_blocked"],
summary_dict["tasks"][task]["n_failed"],
summary_dict["tasks"][task]["n_wonky"],
]
),
"EXPECTED": summary_dict["tasks"][task]["n_expected"],
}
)
if summary_dict["tasks"][task]["failed_quanta"]:
i = 0

Check warning on line 225 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L225

Added line #L225 was not covered by tests
for data_id in summary_dict["tasks"][task]["failed_quanta"]:
failed_quanta_table.append(

Check warning on line 227 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L227

Added line #L227 was not covered by tests
{
"Dataset Type": dataset,
"Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"][
"parent_data_id"
],
"Task": task,
"Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"],
"Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"],
}
)
if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0:
unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"]
datasets = Table(dataset_table)
datasets.pprint_all()
curse_table = Table(cursed_datasets)
# Display wonky quanta
if wonky_quanta_table:
print("Wonky Quanta")
pprint.pprint(wonky_quanta_table)
# Display cursed datasets
if cursed_datasets:
print("Cursed Datasets")
curse_table.pprint_all()
i += 1

Check warning on line 235 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L235

Added line #L235 was not covered by tests
quanta = Table(quanta_table)
quanta.pprint_all()
# Dataset loop
dataset_table = []
cursed_datasets = []
unsuccessful_datasets = {}
for dataset in summary_dict["datasets"]:
dataset_table.append(
{
"Dataset": dataset,
"Published": summary_dict["datasets"][dataset]["n_published"],
"Unpublished": summary_dict["datasets"][dataset]["n_unpublished"],
"Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"],
"Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"],
"Cursed": summary_dict["datasets"][dataset]["n_cursed"],
"TOTAL": sum(
[
summary_dict["datasets"][dataset]["n_published"],
summary_dict["datasets"][dataset]["n_unpublished"],
summary_dict["datasets"][dataset]["n_predicted_only"],
summary_dict["datasets"][dataset]["n_unsuccessful"],
summary_dict["datasets"][dataset]["n_cursed"],
]
),
"EXPECTED": summary_dict["datasets"][dataset]["n_expected"],
}
)
if summary_dict["datasets"][dataset]["n_cursed"] > 0:
print(

Check warning on line 264 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L264

Added line #L264 was not covered by tests
f"{dataset} has cursed quanta with message(s) "
"{summary_dict[task]['cursed_datasets']['messages']}. "
"Recommend processing cease until the issue is resolved."
)
cursed_datasets.append(

Check warning on line 269 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L269

Added line #L269 was not covered by tests
{
"Dataset Type": dataset,
"Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"]["parent_data_id"],
}
)
if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0:
unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"]
datasets = Table(dataset_table)
datasets.pprint_all()
curse_table = Table(cursed_datasets)
# Display wonky quanta
if wonky_quanta_table:
print("Wonky Quanta")
pprint.pprint(wonky_quanta_table)

Check warning on line 283 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L282-L283

Added lines #L282 - L283 were not covered by tests
# Display cursed datasets
if cursed_datasets:
print("Cursed Datasets")
curse_table.pprint_all()

Check warning on line 287 in python/lsst/ctrl/mpexec/cli/script/report.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cli/script/report.py#L286-L287

Added lines #L286 - L287 were not covered by tests
if full_output_filename:
with open(full_output_filename, "w") as stream:
stream.write(summary.model_dump_json(indent=2))
else:
if not brief:
if failed_quanta_table:
print("Failed Quanta")
Expand Down
16 changes: 15 additions & 1 deletion tests/test_cliCmdReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def tearDown(self) -> None:
removeTestTempDir(self.root)

def test_report(self):
"""Test for making a report on the produced and missing expected
"""Test for making a report on the produced, missing and expected
datasets in a quantum graph.
"""
metadata = {"output_run": "run"}
Expand Down Expand Up @@ -183,7 +183,21 @@ def test_report(self):
)

self.assertEqual(result.exit_code, 0, clickResultMsg(result_v2_full))
# Check the "brief" output that prints to the terminal first:
# Check that we get string output
self.assertIsInstance(result_v2_full.stdout, str)

# Check that task0 and the quanta for task0 exist in the string
self.assertIn("task0", result_v2_full.stdout)
self.assertIn("Not Attempted", result_v2_full.stdout)
self.assertIn("Successful", result_v2_full.stdout)
self.assertIn("Blocked", result_v2_full.stdout)
self.assertIn("Failed", result_v2_full.stdout)
self.assertIn("Wonky", result_v2_full.stdout)
self.assertIn("TOTAL", result_v2_full.stdout)
self.assertIn("EXPECTED", result_v2_full.stdout)

# Then validate the full output json file:
with open(test_filename_v2) as f:
output = f.read()
model = Summary.model_validate_json(output)
Expand Down

0 comments on commit 9f4bd4c

Please sign in to comment.