From 9f4bd4ca04959ab00f5d4ebff4fc010439b8e66a Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Wed, 31 Jul 2024 17:10:51 -0700 Subject: [PATCH] Add documentation for DM-41711 --- doc/changes/DM-41711.feature.md | 6 + python/lsst/ctrl/mpexec/cli/cmd/commands.py | 37 ++- python/lsst/ctrl/mpexec/cli/script/report.py | 253 +++++++++++-------- tests/test_cliCmdReport.py | 16 +- 4 files changed, 196 insertions(+), 116 deletions(-) create mode 100644 doc/changes/DM-41711.feature.md diff --git a/doc/changes/DM-41711.feature.md b/doc/changes/DM-41711.feature.md new file mode 100644 index 00000000..8e7dba37 --- /dev/null +++ b/doc/changes/DM-41711.feature.md @@ -0,0 +1,6 @@ +Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs. + +Update the `pipetask report` command-line interface to accommodate the new +`QuantumProvenanceGraph`. This allows for resolving outcomes of processing over +multiple attempts (and associated graphs) over the same dataquery, providing +information on recoveries, persistent issues and mismatch errors. diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index b9a29231..1bdf8956 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -341,21 +341,34 @@ def update_graph_run( @click.command(cls=PipetaskCommand) @repo_argument() @click.argument("qgraphs", nargs=-1) -@click.option("--collections", default=None, help="Collections to resolve duplicate datasets in.") -@click.option("--where", default="", help="where") -@click.option("--full-output-filename", default="", help="Output report as a yaml file with this name.") +@click.option( + "--collections", + default=None, + help="Collection(s) associated with said graphs/processing." + " For use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful.", +) +@click.option("--where", default="", help="A 'where' string to use to constrain the collections, if passed.") +@click.option( + "--full-output-filename", + default="", + help="Output report as a file with this name. " + "For pipetask report on one graph, this should be a yaml file. For multiple graphs " + "or when using the --force-v2 option, this should be a json file. We will be " + "deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.", +) @click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.") @click.option( "--brief", default=False, is_flag=True, - help="Only show counts in report for brief " "summary (no error information", + help="Only show counts in report (a brief summary). Note that counts are" + " also printed to the screen when using the --full-output-filename option.", ) @click.option( "--curse-failed-logs", is_flag=True, default=False, - help="If log datasets are missing in v2, mark them as cursed", + help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed", ) @click.option( "--force-v2", @@ -375,12 +388,20 @@ def report( curse_failed_logs: bool = False, force_v2: bool = False, ) -> None: - """Write a yaml file summarizing the produced and missing expected datasets - in a quantum graph. + """Summarize the state of executed quantum graph(s), with counts of failed, + successful and expected quanta, as well as counts of output datasets and + their publish states. Analyze one or more attempts at the same + processing on the same dataquery-identified "group" and resolve recoveries + and persistent failures. Identify mismatch errors between groups. + + Save the report as a file (`--full-output-filename`) or print it to stdout + (default). If the terminal is overwhelmed with data_ids from failures try + the `--brief` option. REPO is the location of the butler/registry config file. - QGRAPH is the URL to a serialized Quantum Graph file. + QGRAPHS is a Sequence of links to serialized Quantum Graphs which have been + executed and are to be analyzed. """ if force_v2 or len(qgraphs) > 1 or collections is not None: script.report_v2( diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 56fe18bc..c9b34396 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -42,7 +42,8 @@ def report( logs: bool = True, brief: bool = False, ) -> None: - """Summarize the produced and missing expected dataset in a quantum graph. + """Summarize the produced, missing and expected quanta and + datasets belonging to an executed quantum graph. Parameters ---------- @@ -124,7 +125,48 @@ def report_v2( brief: bool = False, curse_failed_logs: bool = False, ) -> None: - """Docstring.""" + """Summarize the state of executed quantum graph(s), with counts of failed, + successful and expected quanta, as well as counts of output datasets and + their publish states. Analyze one or more attempts at the same + processing on the same dataquery-identified "group" and resolve recoveries + and persistent failures. Identify mismatch errors between groups. + + Parameters + ---------- + butler_config : `str` + The Butler used for this report. This should match the Butler used + for the run associated with the executed quantum graph. + qgraph_uris : `Sequence[str]` + One or more uris to the serialized Quantum Graph(s). + collections : `Sequence[str] | None` + Collection(s) associated with said graphs/processing. For use in + `lsst.daf.butler.registry.queryDatasets` if paring down the query + would be useful. + where : `str` + A "where" string to use to constrain the collections, if passed. + full_output_filename : `str` + Output the full pydantic model `QuantumProvenanceGraph.Summary` + object into a JSON file. This is ideal for error-matching and + cataloguing tools such as the ones used by Campaign Management + software and pilots, and for searching and counting specific kinds + or instances of failures. This option will also print a "brief" + (counts-only) summary to stdout. + logs : `bool` + Store error messages from Butler logs associated with failed quanta + if `True`. + brief : `bool` + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + This option will still report all `cursed` datasets and `wonky` + quanta. + curse_failed_logs : `bool` + Mark log datasets as `cursed` if they are published in the final + output collection. Note that a campaign-level collection must be + used here for `collections` if `curse_failed_logs` is `True`; if + `resolve_duplicates` is run on a list of group-level collections + then each will show logs from their own failures as published + the datasets will show as cursed regardless of this flag. + """ butler = Butler.from_config(butler_config, writeable=False) qpg = QuantumProvenanceGraph() output_runs = [] @@ -140,116 +182,113 @@ def report_v2( ) summary = qpg.to_summary(butler, do_store_logs=logs) summary_dict = summary.model_dump() - if full_output_filename: - with open(full_output_filename, "w") as stream: - stream.write(summary.model_dump_json(indent=2)) - else: - quanta_table = [] - failed_quanta_table = [] - wonky_quanta_table = [] - for task in summary_dict["tasks"].keys(): - if summary_dict["tasks"][task]["n_wonky"] > 0: - print( - f"{task} has produced wonky quanta. Recommend processing" - "cease until the issue is resolved." - ) - j = 0 - for data_id in summary_dict["tasks"][task]["wonky_quanta"]: - wonky_quanta_table.append( - { - "Task": task, - "Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"], - "Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"], - "Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"], - } - ) - j += 1 - quanta_table.append( - { - "Task": task, - "Not Attempted": summary_dict["tasks"][task]["n_not_attempted"], - "Successful": summary_dict["tasks"][task]["n_successful"], - "Blocked": summary_dict["tasks"][task]["n_blocked"], - "Failed": summary_dict["tasks"][task]["n_failed"], - "Wonky": summary_dict["tasks"][task]["n_wonky"], - "TOTAL": sum( - [ - summary_dict["tasks"][task]["n_successful"], - summary_dict["tasks"][task]["n_not_attempted"], - summary_dict["tasks"][task]["n_blocked"], - summary_dict["tasks"][task]["n_failed"], - summary_dict["tasks"][task]["n_wonky"], - ] - ), - "EXPECTED": summary_dict["tasks"][task]["n_expected"], - } - ) - if summary_dict["tasks"][task]["failed_quanta"]: - i = 0 - for data_id in summary_dict["tasks"][task]["failed_quanta"]: - failed_quanta_table.append( - { - "Task": task, - "Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"], - "Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"], - "Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"], - } - ) - i += 1 - quanta = Table(quanta_table) - quanta.pprint_all() - # Dataset loop - dataset_table = [] - cursed_datasets = [] - unsuccessful_datasets = {} - for dataset in summary_dict["datasets"]: - dataset_table.append( - { - "Dataset": dataset, - "Published": summary_dict["datasets"][dataset]["n_published"], - "Unpublished": summary_dict["datasets"][dataset]["n_unpublished"], - "Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"], - "Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"], - "Cursed": summary_dict["datasets"][dataset]["n_cursed"], - "TOTAL": sum( - [ - summary_dict["datasets"][dataset]["n_published"], - summary_dict["datasets"][dataset]["n_unpublished"], - summary_dict["datasets"][dataset]["n_predicted_only"], - summary_dict["datasets"][dataset]["n_unsuccessful"], - summary_dict["datasets"][dataset]["n_cursed"], - ] - ), - "EXPECTED": summary_dict["datasets"][dataset]["n_expected"], - } + quanta_table = [] + failed_quanta_table = [] + wonky_quanta_table = [] + for task in summary_dict["tasks"].keys(): + if summary_dict["tasks"][task]["n_wonky"] > 0: + print( + f"{task} has produced wonky quanta. Recommend processing" "cease until the issue is resolved." ) - if summary_dict["datasets"][dataset]["n_cursed"] > 0: - print( - f"{dataset} has cursed quanta with message(s) " - "{summary_dict[task]['cursed_datasets']['messages']}. " - "Recommend processing cease until the issue is resolved." + j = 0 + for data_id in summary_dict["tasks"][task]["wonky_quanta"]: + wonky_quanta_table.append( + { + "Task": task, + "Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"], + "Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"], + "Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"], + } ) - cursed_datasets.append( + j += 1 + quanta_table.append( + { + "Task": task, + "Not Attempted": summary_dict["tasks"][task]["n_not_attempted"], + "Successful": summary_dict["tasks"][task]["n_successful"], + "Blocked": summary_dict["tasks"][task]["n_blocked"], + "Failed": summary_dict["tasks"][task]["n_failed"], + "Wonky": summary_dict["tasks"][task]["n_wonky"], + "TOTAL": sum( + [ + summary_dict["tasks"][task]["n_successful"], + summary_dict["tasks"][task]["n_not_attempted"], + summary_dict["tasks"][task]["n_blocked"], + summary_dict["tasks"][task]["n_failed"], + summary_dict["tasks"][task]["n_wonky"], + ] + ), + "EXPECTED": summary_dict["tasks"][task]["n_expected"], + } + ) + if summary_dict["tasks"][task]["failed_quanta"]: + i = 0 + for data_id in summary_dict["tasks"][task]["failed_quanta"]: + failed_quanta_table.append( { - "Dataset Type": dataset, - "Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"][ - "parent_data_id" - ], + "Task": task, + "Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"], + "Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"], + "Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"], } ) - if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0: - unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"] - datasets = Table(dataset_table) - datasets.pprint_all() - curse_table = Table(cursed_datasets) - # Display wonky quanta - if wonky_quanta_table: - print("Wonky Quanta") - pprint.pprint(wonky_quanta_table) - # Display cursed datasets - if cursed_datasets: - print("Cursed Datasets") - curse_table.pprint_all() + i += 1 + quanta = Table(quanta_table) + quanta.pprint_all() + # Dataset loop + dataset_table = [] + cursed_datasets = [] + unsuccessful_datasets = {} + for dataset in summary_dict["datasets"]: + dataset_table.append( + { + "Dataset": dataset, + "Published": summary_dict["datasets"][dataset]["n_published"], + "Unpublished": summary_dict["datasets"][dataset]["n_unpublished"], + "Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"], + "Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"], + "Cursed": summary_dict["datasets"][dataset]["n_cursed"], + "TOTAL": sum( + [ + summary_dict["datasets"][dataset]["n_published"], + summary_dict["datasets"][dataset]["n_unpublished"], + summary_dict["datasets"][dataset]["n_predicted_only"], + summary_dict["datasets"][dataset]["n_unsuccessful"], + summary_dict["datasets"][dataset]["n_cursed"], + ] + ), + "EXPECTED": summary_dict["datasets"][dataset]["n_expected"], + } + ) + if summary_dict["datasets"][dataset]["n_cursed"] > 0: + print( + f"{dataset} has cursed quanta with message(s) " + "{summary_dict[task]['cursed_datasets']['messages']}. " + "Recommend processing cease until the issue is resolved." + ) + cursed_datasets.append( + { + "Dataset Type": dataset, + "Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"]["parent_data_id"], + } + ) + if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0: + unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"] + datasets = Table(dataset_table) + datasets.pprint_all() + curse_table = Table(cursed_datasets) + # Display wonky quanta + if wonky_quanta_table: + print("Wonky Quanta") + pprint.pprint(wonky_quanta_table) + # Display cursed datasets + if cursed_datasets: + print("Cursed Datasets") + curse_table.pprint_all() + if full_output_filename: + with open(full_output_filename, "w") as stream: + stream.write(summary.model_dump_json(indent=2)) + else: if not brief: if failed_quanta_table: print("Failed Quanta") diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index e65712be..f16ddc09 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -53,7 +53,7 @@ def tearDown(self) -> None: removeTestTempDir(self.root) def test_report(self): - """Test for making a report on the produced and missing expected + """Test for making a report on the produced, missing and expected datasets in a quantum graph. """ metadata = {"output_run": "run"} @@ -183,7 +183,21 @@ def test_report(self): ) self.assertEqual(result.exit_code, 0, clickResultMsg(result_v2_full)) + # Check the "brief" output that prints to the terminal first: + # Check that we get string output + self.assertIsInstance(result_v2_full.stdout, str) + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_full.stdout) + self.assertIn("Not Attempted", result_v2_full.stdout) + self.assertIn("Successful", result_v2_full.stdout) + self.assertIn("Blocked", result_v2_full.stdout) + self.assertIn("Failed", result_v2_full.stdout) + self.assertIn("Wonky", result_v2_full.stdout) + self.assertIn("TOTAL", result_v2_full.stdout) + self.assertIn("EXPECTED", result_v2_full.stdout) + + # Then validate the full output json file: with open(test_filename_v2) as f: output = f.read() model = Summary.model_validate_json(output)