From dcc76a8dd8111f71554e7e3a85dd951e326d97d0 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 18 Jun 2024 16:31:36 -0700 Subject: [PATCH 1/8] amend for edits for QPG --- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 19 +++++++++++--- python/lsst/ctrl/mpexec/cli/script/report.py | 27 ++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 79162861..342448c7 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -26,7 +26,7 @@ # along with this program. If not, see . import sys -from collections.abc import Iterator +from collections.abc import Iterator, Sequence from contextlib import contextmanager from functools import partial from tempfile import NamedTemporaryFile @@ -341,7 +341,7 @@ def update_graph_run( @click.command(cls=PipetaskCommand) @repo_argument() -@ctrlMpExecOpts.qgraph_argument() +@click.argument() @click.option("--full-output-filename", default="", help="Summarize report in a yaml file") @click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.") @click.option( @@ -353,7 +353,15 @@ def update_graph_run( " (data_ids and associated messages) to the current working directory instead.", ) def report( - repo: str, qgraph: str, full_output_filename: str = "", logs: bool = True, show_errors: bool = False + repo: str, + qgraphs: Sequence[str], + collections: Sequence[str] | None, + where: str, + full_output_filename: str = "", + logs: bool = True, + show_errors: bool = False, + curse_failed_logs: bool = False, + force_v2: bool = False ) -> None: """Write a yaml file summarizing the produced and missing expected datasets in a quantum graph. @@ -362,4 +370,7 @@ def report( QGRAPH is the URL to a serialized Quantum Graph file. """ - script.report(repo, qgraph, full_output_filename, logs, show_errors) + if force_v2 or len(qgraphs) > 1 or collections is not None: + script.report_v2(repo, qgraphs, collections, where, full_output_filename, logs, show_errors, curse_failed_logs) + else: + script.report(repo, qgraphs, full_output_filename, logs, show_errors) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 39b4df1d..80303bab 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -24,6 +24,7 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from collections.abc import Sequence import pprint import yaml @@ -31,6 +32,7 @@ from lsst.daf.butler import Butler from lsst.pipe.base import QuantumGraph from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph def report( @@ -119,3 +121,28 @@ def report( datasets.pprint_all() else: report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs) + +def report_v2(butler_config: str, + qgraph_uris: Sequence[str], + collections: Sequence[str] | None, + where: str, + full_output_filename: str | None, + logs: bool = True, + show_errors: bool = False, + curse_failed_logs: bool = False, + ) -> None: + butler = Butler.from_config(butler_config, writeable=False) + qpg = QuantumProvenanceGraph() + output_runs = [] + for qgraph_uri in qgraph_uris: + qgraph = QuantumGraph.loadUri(qgraph_uri) + qpg.add_new_graph(butler, qgraph) + output_runs.append(qgraph.metadata["output_run"]) + if collections is None: + collections = reversed(output_runs) + qpg.resolve_duplicates(butler, collections=collections, where=where, + curse_failed_logs=curse_failed_logs) + summary = qpg.to_summary(butler, do_store_logs=logs) + if full_output_filename is not None: + with open(full_output_filename, "w") as stream: + yaml.safe_dump(summary.model_dump(), stream) From 998a60a5bf78fa9927b7b2194c005a4e57c2bb3c Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 25 Jun 2024 11:25:06 -0700 Subject: [PATCH 2/8] Add a pipetask report option for the QuantumProvenanceGraph --- doc/changes/DM-41711.feature.md | 6 + python/lsst/ctrl/mpexec/cli/cmd/commands.py | 63 ++- .../lsst/ctrl/mpexec/cli/script/__init__.py | 2 +- python/lsst/ctrl/mpexec/cli/script/report.py | 202 +++++++-- requirements.txt | 2 +- tests/test_cliCmdReport.py | 389 +++++++++++++++++- 6 files changed, 621 insertions(+), 43 deletions(-) create mode 100644 doc/changes/DM-41711.feature.md diff --git a/doc/changes/DM-41711.feature.md b/doc/changes/DM-41711.feature.md new file mode 100644 index 00000000..8e7dba37 --- /dev/null +++ b/doc/changes/DM-41711.feature.md @@ -0,0 +1,6 @@ +Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs. + +Update the `pipetask report` command-line interface to accommodate the new +`QuantumProvenanceGraph`. This allows for resolving outcomes of processing over +multiple attempts (and associated graphs) over the same dataquery, providing +information on recoveries, persistent issues and mismatch errors. diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 342448c7..f7cce6e6 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -341,16 +341,42 @@ def update_graph_run( @click.command(cls=PipetaskCommand) @repo_argument() -@click.argument() -@click.option("--full-output-filename", default="", help="Summarize report in a yaml file") +@click.argument("qgraphs", nargs=-1) +@click.option( + "--collections", + default=None, + help="Collection(s) associated with said graphs/processing." + " For use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful.", +) +@click.option("--where", default="", help="A 'where' string to use to constrain the collections, if passed.") +@click.option( + "--full-output-filename", + default="", + help="Output report as a file with this name. " + "For pipetask report on one graph, this should be a yaml file. For multiple graphs " + "or when using the --force-v2 option, this should be a json file. We will be " + "deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.", +) @click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.") @click.option( - "--show-errors", + "--brief", + default=False, + is_flag=True, + help="Only show counts in report (a brief summary). Note that counts are" + " also printed to the screen when using the --full-output-filename option.", +) +@click.option( + "--curse-failed-logs", is_flag=True, default=False, - help="Pretty-print a dict of errors from failed" - " quanta to the screen. Note: the default is to output a yaml file with error information" - " (data_ids and associated messages) to the current working directory instead.", + help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed", +) +@click.option( + "--force-v2", + is_flag=True, + default=False, + help="Use the QuantumProvenanceGraph instead of the QuantumGraphExecutionReport, " + "even when there is only one qgraph.", ) def report( repo: str, @@ -359,18 +385,29 @@ def report( where: str, full_output_filename: str = "", logs: bool = True, - show_errors: bool = False, + brief: bool = False, curse_failed_logs: bool = False, - force_v2: bool = False + force_v2: bool = False, ) -> None: - """Write a yaml file summarizing the produced and missing expected datasets - in a quantum graph. + """Summarize the state of executed quantum graph(s), with counts of failed, + successful and expected quanta, as well as counts of output datasets and + their publish states. Analyze one or more attempts at the same + processing on the same dataquery-identified "group" and resolve recoveries + and persistent failures. Identify mismatch errors between groups. + + Save the report as a file (`--full-output-filename`) or print it to stdout + (default). If the terminal is overwhelmed with data_ids from failures try + the `--brief` option. REPO is the location of the butler/registry config file. - QGRAPH is the URL to a serialized Quantum Graph file. + QGRAPHS is a Sequence of links to serialized Quantum Graphs which have been + executed and are to be analyzed. """ if force_v2 or len(qgraphs) > 1 or collections is not None: - script.report_v2(repo, qgraphs, collections, where, full_output_filename, logs, show_errors, curse_failed_logs) + script.report_v2( + repo, qgraphs, collections, where, full_output_filename, logs, brief, curse_failed_logs + ) else: - script.report(repo, qgraphs, full_output_filename, logs, show_errors) + assert len(qgraphs) == 1, "Cannot make a report without a quantum graph." + script.report(repo, qgraphs[0], full_output_filename, logs, brief) diff --git a/python/lsst/ctrl/mpexec/cli/script/__init__.py b/python/lsst/ctrl/mpexec/cli/script/__init__.py index 10767d14..dc14543e 100644 --- a/python/lsst/ctrl/mpexec/cli/script/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/script/__init__.py @@ -31,7 +31,7 @@ from .pre_exec_init_qbb import pre_exec_init_qbb from .purge import PurgeResult, purge from .qgraph import qgraph -from .report import report +from .report import report, report_v2 from .run import run from .run_qbb import run_qbb from .update_graph_run import update_graph_run diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 80303bab..c9b34396 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -24,10 +24,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from collections.abc import Sequence import pprint +from collections.abc import Sequence +from typing import Any -import yaml from astropy.table import Table from lsst.daf.butler import Butler from lsst.pipe.base import QuantumGraph @@ -40,9 +40,10 @@ def report( qgraph_uri: str, full_output_filename: str | None, logs: bool = True, - show_errors: bool = False, + brief: bool = False, ) -> None: - """Summarize the produced and missing expected dataset in a quantum graph. + """Summarize the produced, missing and expected quanta and + datasets belonging to an executed quantum graph. Parameters ---------- @@ -60,11 +61,9 @@ def report( command-line instead. logs : `bool` Get butler log datasets for extra information (error messages). - show_errors : `bool` - If no output yaml is provided, print error messages to the - command-line along with the report. By default, these messages and - their associated data ids are stored in a yaml file with format - `{run timestamp}_err.yaml` in the working directory instead. + brief : `bool` + List only the counts (or data_ids if number of failures < 5). This + option is good for those who just want to see totals. """ butler = Butler.from_config(butler_config, writeable=False) qgraph = QuantumGraph.loadUri(qgraph_uri) @@ -108,29 +107,66 @@ def report( datasets.add_column(data_products, index=0, name="DatasetType") quanta.pprint_all() print("\n") - if show_errors: + if not brief: pprint.pprint(error_summary) print("\n") - else: - assert qgraph.metadata is not None, "Saved QGs always have metadata." - collection = qgraph.metadata["output_run"] - collection = str(collection) - run_name = collection.split("/")[-1] - with open(f"{run_name}_err.yaml", "w") as stream: - yaml.safe_dump(error_summary, stream) datasets.pprint_all() else: report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs) - -def report_v2(butler_config: str, + + +def report_v2( + butler_config: str, qgraph_uris: Sequence[str], collections: Sequence[str] | None, where: str, full_output_filename: str | None, logs: bool = True, - show_errors: bool = False, + brief: bool = False, curse_failed_logs: bool = False, - ) -> None: +) -> None: + """Summarize the state of executed quantum graph(s), with counts of failed, + successful and expected quanta, as well as counts of output datasets and + their publish states. Analyze one or more attempts at the same + processing on the same dataquery-identified "group" and resolve recoveries + and persistent failures. Identify mismatch errors between groups. + + Parameters + ---------- + butler_config : `str` + The Butler used for this report. This should match the Butler used + for the run associated with the executed quantum graph. + qgraph_uris : `Sequence[str]` + One or more uris to the serialized Quantum Graph(s). + collections : `Sequence[str] | None` + Collection(s) associated with said graphs/processing. For use in + `lsst.daf.butler.registry.queryDatasets` if paring down the query + would be useful. + where : `str` + A "where" string to use to constrain the collections, if passed. + full_output_filename : `str` + Output the full pydantic model `QuantumProvenanceGraph.Summary` + object into a JSON file. This is ideal for error-matching and + cataloguing tools such as the ones used by Campaign Management + software and pilots, and for searching and counting specific kinds + or instances of failures. This option will also print a "brief" + (counts-only) summary to stdout. + logs : `bool` + Store error messages from Butler logs associated with failed quanta + if `True`. + brief : `bool` + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + This option will still report all `cursed` datasets and `wonky` + quanta. + curse_failed_logs : `bool` + Mark log datasets as `cursed` if they are published in the final + output collection. Note that a campaign-level collection must be + used here for `collections` if `curse_failed_logs` is `True`; if + `resolve_duplicates` is run on a list of group-level collections + then each will show logs from their own failures as published + the datasets will show as cursed regardless of this flag. + """ butler = Butler.from_config(butler_config, writeable=False) qpg = QuantumProvenanceGraph() output_runs = [] @@ -138,11 +174,125 @@ def report_v2(butler_config: str, qgraph = QuantumGraph.loadUri(qgraph_uri) qpg.add_new_graph(butler, qgraph) output_runs.append(qgraph.metadata["output_run"]) + collections_sequence: Sequence[Any] # to appease mypy if collections is None: - collections = reversed(output_runs) - qpg.resolve_duplicates(butler, collections=collections, where=where, - curse_failed_logs=curse_failed_logs) + collections_sequence = list(reversed(output_runs)) + qpg.resolve_duplicates( + butler, collections=collections_sequence, where=where, curse_failed_logs=curse_failed_logs + ) summary = qpg.to_summary(butler, do_store_logs=logs) - if full_output_filename is not None: + summary_dict = summary.model_dump() + quanta_table = [] + failed_quanta_table = [] + wonky_quanta_table = [] + for task in summary_dict["tasks"].keys(): + if summary_dict["tasks"][task]["n_wonky"] > 0: + print( + f"{task} has produced wonky quanta. Recommend processing" "cease until the issue is resolved." + ) + j = 0 + for data_id in summary_dict["tasks"][task]["wonky_quanta"]: + wonky_quanta_table.append( + { + "Task": task, + "Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"], + "Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"], + "Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"], + } + ) + j += 1 + quanta_table.append( + { + "Task": task, + "Not Attempted": summary_dict["tasks"][task]["n_not_attempted"], + "Successful": summary_dict["tasks"][task]["n_successful"], + "Blocked": summary_dict["tasks"][task]["n_blocked"], + "Failed": summary_dict["tasks"][task]["n_failed"], + "Wonky": summary_dict["tasks"][task]["n_wonky"], + "TOTAL": sum( + [ + summary_dict["tasks"][task]["n_successful"], + summary_dict["tasks"][task]["n_not_attempted"], + summary_dict["tasks"][task]["n_blocked"], + summary_dict["tasks"][task]["n_failed"], + summary_dict["tasks"][task]["n_wonky"], + ] + ), + "EXPECTED": summary_dict["tasks"][task]["n_expected"], + } + ) + if summary_dict["tasks"][task]["failed_quanta"]: + i = 0 + for data_id in summary_dict["tasks"][task]["failed_quanta"]: + failed_quanta_table.append( + { + "Task": task, + "Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"], + "Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"], + "Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"], + } + ) + i += 1 + quanta = Table(quanta_table) + quanta.pprint_all() + # Dataset loop + dataset_table = [] + cursed_datasets = [] + unsuccessful_datasets = {} + for dataset in summary_dict["datasets"]: + dataset_table.append( + { + "Dataset": dataset, + "Published": summary_dict["datasets"][dataset]["n_published"], + "Unpublished": summary_dict["datasets"][dataset]["n_unpublished"], + "Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"], + "Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"], + "Cursed": summary_dict["datasets"][dataset]["n_cursed"], + "TOTAL": sum( + [ + summary_dict["datasets"][dataset]["n_published"], + summary_dict["datasets"][dataset]["n_unpublished"], + summary_dict["datasets"][dataset]["n_predicted_only"], + summary_dict["datasets"][dataset]["n_unsuccessful"], + summary_dict["datasets"][dataset]["n_cursed"], + ] + ), + "EXPECTED": summary_dict["datasets"][dataset]["n_expected"], + } + ) + if summary_dict["datasets"][dataset]["n_cursed"] > 0: + print( + f"{dataset} has cursed quanta with message(s) " + "{summary_dict[task]['cursed_datasets']['messages']}. " + "Recommend processing cease until the issue is resolved." + ) + cursed_datasets.append( + { + "Dataset Type": dataset, + "Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"]["parent_data_id"], + } + ) + if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0: + unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"] + datasets = Table(dataset_table) + datasets.pprint_all() + curse_table = Table(cursed_datasets) + # Display wonky quanta + if wonky_quanta_table: + print("Wonky Quanta") + pprint.pprint(wonky_quanta_table) + # Display cursed datasets + if cursed_datasets: + print("Cursed Datasets") + curse_table.pprint_all() + if full_output_filename: with open(full_output_filename, "w") as stream: - yaml.safe_dump(summary.model_dump(), stream) + stream.write(summary.model_dump_json(indent=2)) + else: + if not brief: + if failed_quanta_table: + print("Failed Quanta") + pprint.pprint(failed_quanta_table) + if unsuccessful_datasets: + print("Unsuccessful Datasets") + pprint.pprint(unsuccessful_datasets) diff --git a/requirements.txt b/requirements.txt index 4623a213..025994df 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ networkx lsst-resources @ git+https://github.com/lsst/resources@main lsst-daf-butler @ git+https://github.com/lsst/daf_butler@main lsst-utils @ git+https://github.com/lsst/utils@main -lsst-pipe-base @ git+https://github.com/lsst/pipe_base@main +lsst-pipe-base @ git+https://github.com/lsst/pipe_base@tickets/DM-41711 lsst-pex-config @ git+https://github.com/lsst/pex_config@main sqlalchemy diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index 4710b226..f16ddc09 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -34,6 +34,7 @@ from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir +from lsst.pipe.base.quantum_provenance_graph import DatasetTypeSummary, Summary, TaskSummary from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph from lsst.pipe.base.tests.util import check_output_run from yaml.loader import SafeLoader @@ -52,7 +53,7 @@ def tearDown(self) -> None: removeTestTempDir(self.root) def test_report(self): - """Test for making a report on the produced and missing expected + """Test for making a report on the produced, missing and expected datasets in a quantum graph. """ metadata = {"output_run": "run"} @@ -86,7 +87,7 @@ def test_report(self): result_hr = self.runner.invoke( pipetask_cli, - ["report", self.root, graph_uri, "--no-logs", "--show-errors"], + ["report", self.root, graph_uri, "--no-logs"], input="no", ) @@ -102,6 +103,390 @@ def test_report(self): self.assertIn("Expected", result_hr.stdout) self.assertIn("Succeeded", result_hr.stdout) + # Check brief option for pipetask report + result_brief = self.runner.invoke( + pipetask_cli, + ["report", self.root, graph_uri, "--no-logs", "--brief"], + input="no", + ) + self.assertIsInstance(result_brief.stdout, str) + + # Check that task0 and the failed quanta for task0 exist in the string + self.assertIn("task0", result_brief.stdout) + self.assertIn("Failed", result_brief.stdout) + self.assertIn("Expected", result_brief.stdout) + self.assertIn("Succeeded", result_brief.stdout) + + # Test cli for the QPG + result_v2_terminal_out = self.runner.invoke( + pipetask_cli, + ["report", self.root, graph_uri, "--no-logs", "--force-v2"], + input="no", + ) + + # Check that we can read from the command line + self.assertEqual(result_v2_terminal_out.exit_code, 0, clickResultMsg(result_v2_terminal_out)) + + # Check that we get string output + self.assertIsInstance(result_v2_terminal_out.stdout, str) + + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_terminal_out.stdout) + self.assertIn("Not Attempted", result_v2_terminal_out.stdout) + self.assertIn("Successful", result_v2_terminal_out.stdout) + self.assertIn("Blocked", result_v2_terminal_out.stdout) + self.assertIn("Failed", result_v2_terminal_out.stdout) + self.assertIn("Wonky", result_v2_terminal_out.stdout) + self.assertIn("TOTAL", result_v2_terminal_out.stdout) + self.assertIn("EXPECTED", result_v2_terminal_out.stdout) + + # Check that title from the error summary appears + self.assertIn("Unsuccessful Datasets", result_v2_terminal_out.stdout) + + # Test cli for the QPG brief option + result_v2_brief = self.runner.invoke( + pipetask_cli, + ["report", self.root, graph_uri, "--no-logs", "--force-v2", "--brief"], + input="no", + ) + + # Check that we can read from the command line + self.assertEqual(result_v2_brief.exit_code, 0, clickResultMsg(result_v2_brief)) + + # Check that we get string output + self.assertIsInstance(result_v2_brief.stdout, str) + + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_brief.stdout) + self.assertIn("Not Attempted", result_v2_brief.stdout) + self.assertIn("Successful", result_v2_brief.stdout) + self.assertIn("Blocked", result_v2_brief.stdout) + self.assertIn("Failed", result_v2_brief.stdout) + self.assertIn("Wonky", result_v2_brief.stdout) + self.assertIn("TOTAL", result_v2_brief.stdout) + self.assertIn("EXPECTED", result_v2_brief.stdout) + + # Check that the full output option works + test_filename_v2 = os.path.join(self.root, "report_test.json") + result_v2_full = self.runner.invoke( + pipetask_cli, + [ + "report", + self.root, + graph_uri, + "--no-logs", + "--full-output-filename", + test_filename_v2, + "--force-v2", + ], + input="no", + ) + + self.assertEqual(result.exit_code, 0, clickResultMsg(result_v2_full)) + # Check the "brief" output that prints to the terminal first: + # Check that we get string output + self.assertIsInstance(result_v2_full.stdout, str) + + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_full.stdout) + self.assertIn("Not Attempted", result_v2_full.stdout) + self.assertIn("Successful", result_v2_full.stdout) + self.assertIn("Blocked", result_v2_full.stdout) + self.assertIn("Failed", result_v2_full.stdout) + self.assertIn("Wonky", result_v2_full.stdout) + self.assertIn("TOTAL", result_v2_full.stdout) + self.assertIn("EXPECTED", result_v2_full.stdout) + + # Then validate the full output json file: + with open(test_filename_v2) as f: + output = f.read() + model = Summary.model_validate_json(output) + self.assertDictEqual( + model.tasks, + { + "task0": TaskSummary( + n_successful=0, + n_blocked=0, + n_not_attempted=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task1": TaskSummary( + n_successful=0, + n_blocked=0, + n_not_attempted=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task2": TaskSummary( + n_successful=0, + n_blocked=0, + n_not_attempted=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task3": TaskSummary( + n_successful=0, + n_blocked=0, + n_not_attempted=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task4": TaskSummary( + n_successful=0, + n_blocked=0, + n_not_attempted=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + }, + ) + self.assertDictEqual( + model.datasets, + { + "add_dataset1": DatasetTypeSummary( + producer="task0", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset1": DatasetTypeSummary( + producer="task0", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task0_metadata": DatasetTypeSummary( + producer="task0", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task0_log": DatasetTypeSummary( + producer="task0", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset2": DatasetTypeSummary( + producer="task1", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset2": DatasetTypeSummary( + producer="task1", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task1_metadata": DatasetTypeSummary( + producer="task1", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task1_log": DatasetTypeSummary( + producer="task1", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset3": DatasetTypeSummary( + producer="task2", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset3": DatasetTypeSummary( + producer="task2", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task2_metadata": DatasetTypeSummary( + producer="task2", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task2_log": DatasetTypeSummary( + producer="task2", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset4": DatasetTypeSummary( + producer="task3", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset4": DatasetTypeSummary( + producer="task3", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task3_metadata": DatasetTypeSummary( + producer="task3", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task3_log": DatasetTypeSummary( + producer="task3", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset5": DatasetTypeSummary( + producer="task4", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset5": DatasetTypeSummary( + producer="task4", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task4_metadata": DatasetTypeSummary( + producer="task4", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task4_log": DatasetTypeSummary( + producer="task4", + n_published=0, + n_unpublished=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + }, + ) + if __name__ == "__main__": unittest.main() From ec725dfdeda8e40a6af9c288fa4ec2c5eda21f12 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Thu, 1 Aug 2024 12:23:47 -0700 Subject: [PATCH 3/8] Separate and pydantify printing QuantumProvenanceGraph.Summary object --- python/lsst/ctrl/mpexec/cli/script/report.py | 134 +++++++++++-------- tests/test_cliCmdReport.py | 2 +- 2 files changed, 76 insertions(+), 60 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index c9b34396..bb7c33f1 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -32,7 +32,7 @@ from lsst.daf.butler import Butler from lsst.pipe.base import QuantumGraph from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary def report( @@ -181,99 +181,115 @@ def report_v2( butler, collections=collections_sequence, where=where, curse_failed_logs=curse_failed_logs ) summary = qpg.to_summary(butler, do_store_logs=logs) - summary_dict = summary.model_dump() + print_summary(summary, full_output_filename, brief) + + +def print_summary(summary: Summary, full_output_filename: str | None, brief: bool = False) -> None: + """Take a `QuantumProvenanceGraph.Summary` object and write it to a file + and/or the screen. + + Parameters + ---------- + summary : `QuantumProvenanceGraph.Summary` + This object contains all the information derived from the + `QuantumProvenanceGraph`. + full_output_filename : `str | None` + Name of the JSON file in which to store summary information, if + passed. + brief : `bool` + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + This option will still report all `cursed` datasets and `wonky` + quanta. + """ quanta_table = [] failed_quanta_table = [] wonky_quanta_table = [] - for task in summary_dict["tasks"].keys(): - if summary_dict["tasks"][task]["n_wonky"] > 0: + for label, task_summary in summary.tasks.items(): + if task_summary.n_wonky > 0: print( - f"{task} has produced wonky quanta. Recommend processing" "cease until the issue is resolved." + f"{label} has produced wonky quanta. Recommend processing cease until the issue is resolved." ) - j = 0 - for data_id in summary_dict["tasks"][task]["wonky_quanta"]: + for quantum_summary in task_summary.wonky_quanta: wonky_quanta_table.append( { - "Task": task, - "Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"], - "Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"], - "Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"], + "Task": label, + "Data ID": quantum_summary.data_id, + "Runs and Status": quantum_summary.runs, + "Messages": quantum_summary.messages, } ) - j += 1 quanta_table.append( { - "Task": task, - "Not Attempted": summary_dict["tasks"][task]["n_not_attempted"], - "Successful": summary_dict["tasks"][task]["n_successful"], - "Blocked": summary_dict["tasks"][task]["n_blocked"], - "Failed": summary_dict["tasks"][task]["n_failed"], - "Wonky": summary_dict["tasks"][task]["n_wonky"], + "Task": label, + "Not Attempted": task_summary.n_not_attempted, + "Successful": task_summary.n_successful, + "Blocked": task_summary.n_blocked, + "Failed": task_summary.n_failed, + "Wonky": task_summary.n_wonky, "TOTAL": sum( [ - summary_dict["tasks"][task]["n_successful"], - summary_dict["tasks"][task]["n_not_attempted"], - summary_dict["tasks"][task]["n_blocked"], - summary_dict["tasks"][task]["n_failed"], - summary_dict["tasks"][task]["n_wonky"], + task_summary.n_successful, + task_summary.n_not_attempted, + task_summary.n_blocked, + task_summary.n_failed, + task_summary.n_wonky, ] ), - "EXPECTED": summary_dict["tasks"][task]["n_expected"], + "EXPECTED": task_summary.n_expected, } ) - if summary_dict["tasks"][task]["failed_quanta"]: - i = 0 - for data_id in summary_dict["tasks"][task]["failed_quanta"]: + if task_summary.failed_quanta: + for quantum_summary in task_summary.failed_quanta: failed_quanta_table.append( { - "Task": task, - "Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"], - "Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"], - "Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"], + "Task": label, + "Data ID": quantum_summary.data_id, + "Runs and Status": quantum_summary.runs, + "Messages": quantum_summary.messages, } ) - i += 1 quanta = Table(quanta_table) quanta.pprint_all() # Dataset loop dataset_table = [] cursed_datasets = [] unsuccessful_datasets = {} - for dataset in summary_dict["datasets"]: + for dataset_type_name, dataset_type_summary in summary.datasets.items(): dataset_table.append( { - "Dataset": dataset, - "Published": summary_dict["datasets"][dataset]["n_published"], - "Unpublished": summary_dict["datasets"][dataset]["n_unpublished"], - "Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"], - "Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"], - "Cursed": summary_dict["datasets"][dataset]["n_cursed"], + "Dataset": dataset_type_name, + "Published": dataset_type_summary.n_published, + "Unpublished": dataset_type_summary.n_unpublished, + "Predicted Only": dataset_type_summary.n_predicted_only, + "Unsuccessful": dataset_type_summary.n_unsuccessful, + "Cursed": dataset_type_summary.n_cursed, "TOTAL": sum( [ - summary_dict["datasets"][dataset]["n_published"], - summary_dict["datasets"][dataset]["n_unpublished"], - summary_dict["datasets"][dataset]["n_predicted_only"], - summary_dict["datasets"][dataset]["n_unsuccessful"], - summary_dict["datasets"][dataset]["n_cursed"], + dataset_type_summary.n_published, + dataset_type_summary.n_unpublished, + dataset_type_summary.n_predicted_only, + dataset_type_summary.n_unsuccessful, + dataset_type_summary.n_cursed, ] ), - "EXPECTED": summary_dict["datasets"][dataset]["n_expected"], + "EXPECTED": dataset_type_summary.n_expected, } ) - if summary_dict["datasets"][dataset]["n_cursed"] > 0: - print( - f"{dataset} has cursed quanta with message(s) " - "{summary_dict[task]['cursed_datasets']['messages']}. " - "Recommend processing cease until the issue is resolved." - ) - cursed_datasets.append( - { - "Dataset Type": dataset, - "Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"]["parent_data_id"], - } - ) - if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0: - unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"] + if dataset_type_summary.n_cursed > 0: + for cursed_dataset in dataset_type_summary.cursed_datasets: + print( + f"{dataset_type_name} has cursed quanta with message(s) {cursed_dataset.messages}. " + "Recommend processing cease until the issue is resolved." + ) + cursed_datasets.append( + { + "Dataset Type": dataset_type_name, + "Producer Data Id": cursed_dataset.producer_data_id, + } + ) + if dataset_type_summary.n_unsuccessful > 0: + unsuccessful_datasets[dataset_type_name] = dataset_type_summary.unsuccessful_datasets datasets = Table(dataset_table) datasets.pprint_all() curse_table = Table(cursed_datasets) diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index f16ddc09..87f35a52 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -182,7 +182,7 @@ def test_report(self): input="no", ) - self.assertEqual(result.exit_code, 0, clickResultMsg(result_v2_full)) + self.assertEqual(result_v2_full.exit_code, 0, clickResultMsg(result_v2_full)) # Check the "brief" output that prints to the terminal first: # Check that we get string output self.assertIsInstance(result_v2_full.stdout, str) From 1383e594a14861e1bcdd0651e2069b39e95bc066 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 10 Sep 2024 15:55:28 -0700 Subject: [PATCH 4/8] Change not_attempted to unknown, published to visible and unpublished to shadowed --- python/lsst/ctrl/mpexec/cli/script/report.py | 12 +-- tests/test_cliCmdReport.py | 96 ++++++++++---------- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index bb7c33f1..8c6e5c47 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -222,7 +222,7 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo quanta_table.append( { "Task": label, - "Not Attempted": task_summary.n_not_attempted, + "Unknown": task_summary.n_unknown, "Successful": task_summary.n_successful, "Blocked": task_summary.n_blocked, "Failed": task_summary.n_failed, @@ -230,7 +230,7 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo "TOTAL": sum( [ task_summary.n_successful, - task_summary.n_not_attempted, + task_summary.n_unknown, task_summary.n_blocked, task_summary.n_failed, task_summary.n_wonky, @@ -259,15 +259,15 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo dataset_table.append( { "Dataset": dataset_type_name, - "Published": dataset_type_summary.n_published, - "Unpublished": dataset_type_summary.n_unpublished, + "Visible": dataset_type_summary.n_visible, + "Shadowed": dataset_type_summary.n_shadowed, "Predicted Only": dataset_type_summary.n_predicted_only, "Unsuccessful": dataset_type_summary.n_unsuccessful, "Cursed": dataset_type_summary.n_cursed, "TOTAL": sum( [ - dataset_type_summary.n_published, - dataset_type_summary.n_unpublished, + dataset_type_summary.n_visible, + dataset_type_summary.n_shadowed, dataset_type_summary.n_predicted_only, dataset_type_summary.n_unsuccessful, dataset_type_summary.n_cursed, diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index 87f35a52..2ddd1982 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -132,7 +132,7 @@ def test_report(self): # Check that task0 and the quanta for task0 exist in the string self.assertIn("task0", result_v2_terminal_out.stdout) - self.assertIn("Not Attempted", result_v2_terminal_out.stdout) + self.assertIn("Unknown", result_v2_terminal_out.stdout) self.assertIn("Successful", result_v2_terminal_out.stdout) self.assertIn("Blocked", result_v2_terminal_out.stdout) self.assertIn("Failed", result_v2_terminal_out.stdout) @@ -158,7 +158,7 @@ def test_report(self): # Check that task0 and the quanta for task0 exist in the string self.assertIn("task0", result_v2_brief.stdout) - self.assertIn("Not Attempted", result_v2_brief.stdout) + self.assertIn("Unknown", result_v2_brief.stdout) self.assertIn("Successful", result_v2_brief.stdout) self.assertIn("Blocked", result_v2_brief.stdout) self.assertIn("Failed", result_v2_brief.stdout) @@ -189,7 +189,7 @@ def test_report(self): # Check that task0 and the quanta for task0 exist in the string self.assertIn("task0", result_v2_full.stdout) - self.assertIn("Not Attempted", result_v2_full.stdout) + self.assertIn("Unknown", result_v2_full.stdout) self.assertIn("Successful", result_v2_full.stdout) self.assertIn("Blocked", result_v2_full.stdout) self.assertIn("Failed", result_v2_full.stdout) @@ -207,7 +207,7 @@ def test_report(self): "task0": TaskSummary( n_successful=0, n_blocked=0, - n_not_attempted=1, + n_unknown=1, n_expected=1, failed_quanta=[], recovered_quanta=[], @@ -218,7 +218,7 @@ def test_report(self): "task1": TaskSummary( n_successful=0, n_blocked=0, - n_not_attempted=1, + n_unknown=1, n_expected=1, failed_quanta=[], recovered_quanta=[], @@ -229,7 +229,7 @@ def test_report(self): "task2": TaskSummary( n_successful=0, n_blocked=0, - n_not_attempted=1, + n_unknown=1, n_expected=1, failed_quanta=[], recovered_quanta=[], @@ -240,7 +240,7 @@ def test_report(self): "task3": TaskSummary( n_successful=0, n_blocked=0, - n_not_attempted=1, + n_unknown=1, n_expected=1, failed_quanta=[], recovered_quanta=[], @@ -251,7 +251,7 @@ def test_report(self): "task4": TaskSummary( n_successful=0, n_blocked=0, - n_not_attempted=1, + n_unknown=1, n_expected=1, failed_quanta=[], recovered_quanta=[], @@ -266,8 +266,8 @@ def test_report(self): { "add_dataset1": DatasetTypeSummary( producer="task0", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -277,8 +277,8 @@ def test_report(self): ), "add2_dataset1": DatasetTypeSummary( producer="task0", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -288,8 +288,8 @@ def test_report(self): ), "task0_metadata": DatasetTypeSummary( producer="task0", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -299,8 +299,8 @@ def test_report(self): ), "task0_log": DatasetTypeSummary( producer="task0", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -310,8 +310,8 @@ def test_report(self): ), "add_dataset2": DatasetTypeSummary( producer="task1", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -321,8 +321,8 @@ def test_report(self): ), "add2_dataset2": DatasetTypeSummary( producer="task1", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -332,8 +332,8 @@ def test_report(self): ), "task1_metadata": DatasetTypeSummary( producer="task1", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -343,8 +343,8 @@ def test_report(self): ), "task1_log": DatasetTypeSummary( producer="task1", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -354,8 +354,8 @@ def test_report(self): ), "add_dataset3": DatasetTypeSummary( producer="task2", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -365,8 +365,8 @@ def test_report(self): ), "add2_dataset3": DatasetTypeSummary( producer="task2", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -376,8 +376,8 @@ def test_report(self): ), "task2_metadata": DatasetTypeSummary( producer="task2", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -387,8 +387,8 @@ def test_report(self): ), "task2_log": DatasetTypeSummary( producer="task2", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -398,8 +398,8 @@ def test_report(self): ), "add_dataset4": DatasetTypeSummary( producer="task3", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -409,8 +409,8 @@ def test_report(self): ), "add2_dataset4": DatasetTypeSummary( producer="task3", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -420,8 +420,8 @@ def test_report(self): ), "task3_metadata": DatasetTypeSummary( producer="task3", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -431,8 +431,8 @@ def test_report(self): ), "task3_log": DatasetTypeSummary( producer="task3", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -442,8 +442,8 @@ def test_report(self): ), "add_dataset5": DatasetTypeSummary( producer="task4", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -453,8 +453,8 @@ def test_report(self): ), "add2_dataset5": DatasetTypeSummary( producer="task4", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -464,8 +464,8 @@ def test_report(self): ), "task4_metadata": DatasetTypeSummary( producer="task4", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], @@ -475,8 +475,8 @@ def test_report(self): ), "task4_log": DatasetTypeSummary( producer="task4", - n_published=0, - n_unpublished=0, + n_visible=0, + n_shadowed=0, n_predicted_only=0, n_expected=1, cursed_datasets=[], From 205e45175c4dee33e77e8b584d119005c26a0f3e Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 10 Sep 2024 16:19:31 -0700 Subject: [PATCH 5/8] Use butler collections and where options --- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 17 +++++++++-------- python/lsst/ctrl/mpexec/cli/script/report.py | 4 +++- tests/test_cliCmdReport.py | 5 +++++ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index f7cce6e6..4641253e 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -38,12 +38,14 @@ from lsst.ctrl.mpexec import Report from lsst.ctrl.mpexec.showInfo import ShowInfo from lsst.daf.butler.cli.opt import ( + collections_option, config_file_option, config_option, confirm_option, options_file_option, processes_option, repo_argument, + where_option, ) from lsst.daf.butler.cli.utils import MWCtxObj, catch_and_exit, option_section, unwrap @@ -342,13 +344,8 @@ def update_graph_run( @click.command(cls=PipetaskCommand) @repo_argument() @click.argument("qgraphs", nargs=-1) -@click.option( - "--collections", - default=None, - help="Collection(s) associated with said graphs/processing." - " For use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful.", -) -@click.option("--where", default="", help="A 'where' string to use to constrain the collections, if passed.") +@collections_option() +@where_option() @click.option( "--full-output-filename", default="", @@ -399,12 +396,16 @@ def report( (default). If the terminal is overwhelmed with data_ids from failures try the `--brief` option. + Butler `collections` and `where` options are for use in + `lsst.daf.butler.queryDatasets` if paring down the collections would be + useful. By default the collections and query will match the graphs. + REPO is the location of the butler/registry config file. QGRAPHS is a Sequence of links to serialized Quantum Graphs which have been executed and are to be analyzed. """ - if force_v2 or len(qgraphs) > 1 or collections is not None: + if force_v2 or len(qgraphs) > 1 or collections: script.report_v2( repo, qgraphs, collections, where, full_output_filename, logs, brief, curse_failed_logs ) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 8c6e5c47..502aa4e3 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -175,8 +175,10 @@ def report_v2( qpg.add_new_graph(butler, qgraph) output_runs.append(qgraph.metadata["output_run"]) collections_sequence: Sequence[Any] # to appease mypy - if collections is None: + if not collections: collections_sequence = list(reversed(output_runs)) + else: + collections_sequence = collections qpg.resolve_duplicates( butler, collections=collections_sequence, where=where, curse_failed_logs=curse_failed_logs ) diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index 2ddd1982..38b14c01 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -75,12 +75,17 @@ def test_report(self): ["report", self.root, graph_uri, "--full-output-filename", test_filename, "--no-logs"], input="no", ) + # Check that we can read from the command line self.assertEqual(result.exit_code, 0, clickResultMsg(result)) # Check that we can open and read the file produced by make_reports with open(test_filename) as f: report_output_dict = yaml.load(f, Loader=SafeLoader) + + with open("delete_me.yaml", "w") as f: + yaml.safe_dump(report_output_dict, f) + self.assertIsNotNone(report_output_dict["task0"]) self.assertIsNotNone(report_output_dict["task0"]["failed_quanta"]) self.assertIsInstance(report_output_dict["task0"]["n_expected"], int) From 134a3ae9dc8f10280eeb9522488e75d3438c9f77 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 13 Sep 2024 17:07:35 -0700 Subject: [PATCH 6/8] Make add_new_graph and resolve_duplicates private methods and require user to pass graphs in order --- python/lsst/ctrl/mpexec/cli/script/report.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 502aa4e3..300cde96 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -26,7 +26,6 @@ # along with this program. If not, see . import pprint from collections.abc import Sequence -from typing import Any from astropy.table import Table from lsst.daf.butler import Butler @@ -169,19 +168,8 @@ def report_v2( """ butler = Butler.from_config(butler_config, writeable=False) qpg = QuantumProvenanceGraph() - output_runs = [] - for qgraph_uri in qgraph_uris: - qgraph = QuantumGraph.loadUri(qgraph_uri) - qpg.add_new_graph(butler, qgraph) - output_runs.append(qgraph.metadata["output_run"]) - collections_sequence: Sequence[Any] # to appease mypy - if not collections: - collections_sequence = list(reversed(output_runs)) - else: - collections_sequence = collections - qpg.resolve_duplicates( - butler, collections=collections_sequence, where=where, curse_failed_logs=curse_failed_logs - ) + qgraphs = [QuantumGraph.loadUri(qgraph_uri) for qgraph_uri in qgraph_uris] + qpg.assemble_quantum_provenance_graph(butler, qgraphs, collections, where, curse_failed_logs) summary = qpg.to_summary(butler, do_store_logs=logs) print_summary(summary, full_output_filename, brief) From d37fa737e5c7c46d9098c6071f03646e6fb1ec55 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Wed, 18 Sep 2024 16:25:00 -0700 Subject: [PATCH 7/8] Clean up and document --- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 15 ++++++++------- python/lsst/ctrl/mpexec/cli/script/report.py | 14 ++++++++------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 4641253e..edb2e3a2 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -388,9 +388,10 @@ def report( ) -> None: """Summarize the state of executed quantum graph(s), with counts of failed, successful and expected quanta, as well as counts of output datasets and - their publish states. Analyze one or more attempts at the same - processing on the same dataquery-identified "group" and resolve recoveries - and persistent failures. Identify mismatch errors between groups. + their query (visible/shadowed) states. Analyze one or more attempts at the + same processing on the same dataquery-identified "group" and resolve + recoveries and persistent failures. Identify mismatch errors between + attempts. Save the report as a file (`--full-output-filename`) or print it to stdout (default). If the terminal is overwhelmed with data_ids from failures try @@ -398,14 +399,14 @@ def report( Butler `collections` and `where` options are for use in `lsst.daf.butler.queryDatasets` if paring down the collections would be - useful. By default the collections and query will match the graphs. + useful. By default the collections and query be taken from the graphs. REPO is the location of the butler/registry config file. - QGRAPHS is a Sequence of links to serialized Quantum Graphs which have been - executed and are to be analyzed. + QGRAPHS is a `Sequence` of links to serialized Quantum Graphs which have + been executed and are to be analyzed. """ - if force_v2 or len(qgraphs) > 1 or collections: + if any([force_v2, len(qgraphs) > 1, collections, where, curse_failed_logs]): script.report_v2( repo, qgraphs, collections, where, full_output_filename, logs, brief, curse_failed_logs ) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 300cde96..db890e0e 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -42,7 +42,8 @@ def report( brief: bool = False, ) -> None: """Summarize the produced, missing and expected quanta and - datasets belonging to an executed quantum graph. + datasets belonging to an executed quantum graph using the + `QuantumGraphExecutionReport`. Parameters ---------- @@ -126,7 +127,7 @@ def report_v2( ) -> None: """Summarize the state of executed quantum graph(s), with counts of failed, successful and expected quanta, as well as counts of output datasets and - their publish states. Analyze one or more attempts at the same + their visible/shadowed states. Analyze one or more attempts at the same processing on the same dataquery-identified "group" and resolve recoveries and persistent failures. Identify mismatch errors between groups. @@ -162,9 +163,10 @@ def report_v2( Mark log datasets as `cursed` if they are published in the final output collection. Note that a campaign-level collection must be used here for `collections` if `curse_failed_logs` is `True`; if - `resolve_duplicates` is run on a list of group-level collections - then each will show logs from their own failures as published - the datasets will show as cursed regardless of this flag. + `lsst.pipe.base.QuantumProvenanceGraph.__resolve_duplicates` + is run on a list of group-level collections, then each will only + show log datasets from their own failures as visible and datasets + from others will be marked as cursed. """ butler = Butler.from_config(butler_config, writeable=False) qpg = QuantumProvenanceGraph() @@ -181,7 +183,7 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo Parameters ---------- summary : `QuantumProvenanceGraph.Summary` - This object contains all the information derived from the + This `Pydantic` model contains all the information derived from the `QuantumProvenanceGraph`. full_output_filename : `str | None` Name of the JSON file in which to store summary information, if From 53d339a2cb9feb20032120b02349e74ae0b8d461 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 20 Sep 2024 21:03:24 -0700 Subject: [PATCH 8/8] Add check for graphs passed out of order and clarify docs --- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 9 +++++--- python/lsst/ctrl/mpexec/cli/script/report.py | 24 +++++++++++++++++--- requirements.txt | 2 +- tests/test_cliCmdReport.py | 3 --- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index edb2e3a2..5d26938a 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -373,7 +373,8 @@ def update_graph_run( is_flag=True, default=False, help="Use the QuantumProvenanceGraph instead of the QuantumGraphExecutionReport, " - "even when there is only one qgraph.", + "even when there is only one qgraph. Otherwise, the `QuantumGraphExecutionReport` " + "will run on one graph by default.", ) def report( repo: str, @@ -399,12 +400,14 @@ def report( Butler `collections` and `where` options are for use in `lsst.daf.butler.queryDatasets` if paring down the collections would be - useful. By default the collections and query be taken from the graphs. + useful. Pass collections in order of most to least recent. By default the + collections and query will be taken from the graphs. REPO is the location of the butler/registry config file. QGRAPHS is a `Sequence` of links to serialized Quantum Graphs which have - been executed and are to be analyzed. + been executed and are to be analyzed. Pass the graphs in order of first to + last executed. """ if any([force_v2, len(qgraphs) > 1, collections, where, curse_failed_logs]): script.report_v2( diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index db890e0e..62d88e79 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -136,9 +136,9 @@ def report_v2( butler_config : `str` The Butler used for this report. This should match the Butler used for the run associated with the executed quantum graph. - qgraph_uris : `Sequence[str]` + qgraph_uris : `Sequence` [`str`] One or more uris to the serialized Quantum Graph(s). - collections : `Sequence[str] | None` + collections : `Sequence` [`str`] | None` Collection(s) associated with said graphs/processing. For use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful. @@ -170,7 +170,25 @@ def report_v2( """ butler = Butler.from_config(butler_config, writeable=False) qpg = QuantumProvenanceGraph() - qgraphs = [QuantumGraph.loadUri(qgraph_uri) for qgraph_uri in qgraph_uris] + qgraphs = [] + for qgraph_uri in qgraph_uris: + qgraph = QuantumGraph.loadUri(qgraph_uri) + assert qgraph.metadata is not None, "Saved QGs always have metadata." + qgraphs.append(qgraph) + # If the most recent graph's timestamp was earlier than any of the + # previous graphs, raise a RuntimeError. + for count, qgraph in enumerate(qgraphs): + if len(qgraphs) > 1: + previous_graph = qgraphs[count - 1] + if qgraph.metadata["time"] < previous_graph.metadata["time"]: + raise RuntimeError( + f"""add_new_graph may only be called on graphs + which are passed in the order they were + created. Please call again, passing your + graphs in order. Time of first graph: + {qgraph.metadata["time"]} > + time of second graph: {previous_graph.metadata["time"]}""" + ) qpg.assemble_quantum_provenance_graph(butler, qgraphs, collections, where, curse_failed_logs) summary = qpg.to_summary(butler, do_store_logs=logs) print_summary(summary, full_output_filename, brief) diff --git a/requirements.txt b/requirements.txt index 025994df..4623a213 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ networkx lsst-resources @ git+https://github.com/lsst/resources@main lsst-daf-butler @ git+https://github.com/lsst/daf_butler@main lsst-utils @ git+https://github.com/lsst/utils@main -lsst-pipe-base @ git+https://github.com/lsst/pipe_base@tickets/DM-41711 +lsst-pipe-base @ git+https://github.com/lsst/pipe_base@main lsst-pex-config @ git+https://github.com/lsst/pex_config@main sqlalchemy diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index 38b14c01..d3b8bb6b 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -83,9 +83,6 @@ def test_report(self): with open(test_filename) as f: report_output_dict = yaml.load(f, Loader=SafeLoader) - with open("delete_me.yaml", "w") as f: - yaml.safe_dump(report_output_dict, f) - self.assertIsNotNone(report_output_dict["task0"]) self.assertIsNotNone(report_output_dict["task0"]["failed_quanta"]) self.assertIsInstance(report_output_dict["task0"]["n_expected"], int)