Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41711: Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs #294

Merged
merged 8 commits into from
Sep 23, 2024
6 changes: 6 additions & 0 deletions doc/changes/DM-41711.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs.

Update the `pipetask report` command-line interface to accommodate the new
`QuantumProvenanceGraph`. This allows for resolving outcomes of processing over
multiple attempts (and associated graphs) over the same dataquery, providing
information on recoveries, persistent issues and mismatch errors.
77 changes: 65 additions & 12 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import sys
from collections.abc import Iterator
from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from functools import partial
from tempfile import NamedTemporaryFile
Expand All @@ -38,12 +38,14 @@
from lsst.ctrl.mpexec import Report
from lsst.ctrl.mpexec.showInfo import ShowInfo
from lsst.daf.butler.cli.opt import (
collections_option,
config_file_option,
config_option,
confirm_option,
options_file_option,
processes_option,
repo_argument,
where_option,
)
from lsst.daf.butler.cli.utils import MWCtxObj, catch_and_exit, option_section, unwrap

Expand Down Expand Up @@ -341,25 +343,76 @@ def update_graph_run(

@click.command(cls=PipetaskCommand)
@repo_argument()
@ctrlMpExecOpts.qgraph_argument()
@click.option("--full-output-filename", default="", help="Summarize report in a yaml file")
@click.argument("qgraphs", nargs=-1)
@collections_option()
@where_option()
@click.option(
"--full-output-filename",
default="",
help="Output report as a file with this name. "
"For pipetask report on one graph, this should be a yaml file. For multiple graphs "
"or when using the --force-v2 option, this should be a json file. We will be "
"deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.",
)
@click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.")
@click.option(
"--show-errors",
"--brief",
default=False,
is_flag=True,
help="Only show counts in report (a brief summary). Note that counts are"
" also printed to the screen when using the --full-output-filename option.",
)
@click.option(
"--curse-failed-logs",
is_flag=True,
default=False,
help="Pretty-print a dict of errors from failed"
" quanta to the screen. Note: the default is to output a yaml file with error information"
" (data_ids and associated messages) to the current working directory instead.",
help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed",
)
@click.option(
"--force-v2",
is_flag=True,
default=False,
help="Use the QuantumProvenanceGraph instead of the QuantumGraphExecutionReport, "
"even when there is only one qgraph. Otherwise, the `QuantumGraphExecutionReport` "
"will run on one graph by default.",
)
def report(
repo: str, qgraph: str, full_output_filename: str = "", logs: bool = True, show_errors: bool = False
repo: str,
qgraphs: Sequence[str],
collections: Sequence[str] | None,
where: str,
full_output_filename: str = "",
logs: bool = True,
brief: bool = False,
curse_failed_logs: bool = False,
force_v2: bool = False,
) -> None:
"""Write a yaml file summarizing the produced and missing expected datasets
in a quantum graph.
"""Summarize the state of executed quantum graph(s), with counts of failed,
successful and expected quanta, as well as counts of output datasets and
their query (visible/shadowed) states. Analyze one or more attempts at the
same processing on the same dataquery-identified "group" and resolve
recoveries and persistent failures. Identify mismatch errors between
attempts.

Save the report as a file (`--full-output-filename`) or print it to stdout
(default). If the terminal is overwhelmed with data_ids from failures try
the `--brief` option.

Butler `collections` and `where` options are for use in
`lsst.daf.butler.queryDatasets` if paring down the collections would be
useful. Pass collections in order of most to least recent. By default the
collections and query will be taken from the graphs.

REPO is the location of the butler/registry config file.

QGRAPH is the URL to a serialized Quantum Graph file.
QGRAPHS is a `Sequence` of links to serialized Quantum Graphs which have
been executed and are to be analyzed. Pass the graphs in order of first to
last executed.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why it wasn't this way before, but doesn't this function need the LSST style docstring with PARAMETERS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because click formats this string as help text and doesn't understand numpydoc.

script.report(repo, qgraph, full_output_filename, logs, show_errors)
if any([force_v2, len(qgraphs) > 1, collections, where, curse_failed_logs]):
script.report_v2(
repo, qgraphs, collections, where, full_output_filename, logs, brief, curse_failed_logs
)
else:
assert len(qgraphs) == 1, "Cannot make a report without a quantum graph."
script.report(repo, qgraphs[0], full_output_filename, logs, brief)
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cli/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .pre_exec_init_qbb import pre_exec_init_qbb
from .purge import PurgeResult, purge
from .qgraph import qgraph
from .report import report
from .report import report, report_v2
from .run import run
from .run_qbb import run_qbb
from .update_graph_run import update_graph_run
235 changes: 219 additions & 16 deletions python/lsst/ctrl/mpexec/cli/script/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pprint
from collections.abc import Sequence

import yaml
from astropy.table import Table
from lsst.daf.butler import Butler
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary


def report(
butler_config: str,
qgraph_uri: str,
full_output_filename: str | None,
logs: bool = True,
show_errors: bool = False,
brief: bool = False,
) -> None:
"""Summarize the produced and missing expected dataset in a quantum graph.
"""Summarize the produced, missing and expected quanta and
datasets belonging to an executed quantum graph using the
`QuantumGraphExecutionReport`.

Parameters
----------
Expand All @@ -58,11 +61,9 @@ def report(
command-line instead.
logs : `bool`
Get butler log datasets for extra information (error messages).
show_errors : `bool`
If no output yaml is provided, print error messages to the
command-line along with the report. By default, these messages and
their associated data ids are stored in a yaml file with format
`{run timestamp}_err.yaml` in the working directory instead.
brief : `bool`
List only the counts (or data_ids if number of failures < 5). This
option is good for those who just want to see totals.
"""
butler = Butler.from_config(butler_config, writeable=False)
qgraph = QuantumGraph.loadUri(qgraph_uri)
Expand Down Expand Up @@ -106,16 +107,218 @@ def report(
datasets.add_column(data_products, index=0, name="DatasetType")
quanta.pprint_all()
print("\n")
if show_errors:
if not brief:
pprint.pprint(error_summary)
print("\n")
else:
assert qgraph.metadata is not None, "Saved QGs always have metadata."
collection = qgraph.metadata["output_run"]
collection = str(collection)
run_name = collection.split("/")[-1]
with open(f"{run_name}_err.yaml", "w") as stream:
yaml.safe_dump(error_summary, stream)
datasets.pprint_all()
else:
report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs)


def report_v2(
butler_config: str,
qgraph_uris: Sequence[str],
collections: Sequence[str] | None,
where: str,
full_output_filename: str | None,
logs: bool = True,
brief: bool = False,
curse_failed_logs: bool = False,
) -> None:
"""Summarize the state of executed quantum graph(s), with counts of failed,
successful and expected quanta, as well as counts of output datasets and
their visible/shadowed states. Analyze one or more attempts at the same
processing on the same dataquery-identified "group" and resolve recoveries
and persistent failures. Identify mismatch errors between groups.

Parameters
----------
butler_config : `str`
The Butler used for this report. This should match the Butler used
for the run associated with the executed quantum graph.
qgraph_uris : `Sequence` [`str`]
One or more uris to the serialized Quantum Graph(s).
collections : `Sequence` [`str`] | None`
Collection(s) associated with said graphs/processing. For use in
`lsst.daf.butler.registry.queryDatasets` if paring down the query
would be useful.
where : `str`
A "where" string to use to constrain the collections, if passed.
full_output_filename : `str`
Output the full pydantic model `QuantumProvenanceGraph.Summary`
object into a JSON file. This is ideal for error-matching and
cataloguing tools such as the ones used by Campaign Management
software and pilots, and for searching and counting specific kinds
or instances of failures. This option will also print a "brief"
(counts-only) summary to stdout.
logs : `bool`
Store error messages from Butler logs associated with failed quanta
if `True`.
brief : `bool`
Only display short (counts-only) summary on stdout. This includes
counts and not error messages or data_ids (similar to BPS report).
This option will still report all `cursed` datasets and `wonky`
quanta.
curse_failed_logs : `bool`
Mark log datasets as `cursed` if they are published in the final
output collection. Note that a campaign-level collection must be
used here for `collections` if `curse_failed_logs` is `True`; if
`lsst.pipe.base.QuantumProvenanceGraph.__resolve_duplicates`
is run on a list of group-level collections, then each will only
show log datasets from their own failures as visible and datasets
from others will be marked as cursed.
"""
butler = Butler.from_config(butler_config, writeable=False)
qpg = QuantumProvenanceGraph()
qgraphs = []
for qgraph_uri in qgraph_uris:
qgraph = QuantumGraph.loadUri(qgraph_uri)
assert qgraph.metadata is not None, "Saved QGs always have metadata."
qgraphs.append(qgraph)
# If the most recent graph's timestamp was earlier than any of the
# previous graphs, raise a RuntimeError.
for count, qgraph in enumerate(qgraphs):
if len(qgraphs) > 1:
previous_graph = qgraphs[count - 1]
if qgraph.metadata["time"] < previous_graph.metadata["time"]:
raise RuntimeError(
f"""add_new_graph may only be called on graphs
which are passed in the order they were
created. Please call again, passing your
graphs in order. Time of first graph:
{qgraph.metadata["time"]} >
time of second graph: {previous_graph.metadata["time"]}"""
)
qpg.assemble_quantum_provenance_graph(butler, qgraphs, collections, where, curse_failed_logs)
summary = qpg.to_summary(butler, do_store_logs=logs)
print_summary(summary, full_output_filename, brief)


def print_summary(summary: Summary, full_output_filename: str | None, brief: bool = False) -> None:
"""Take a `QuantumProvenanceGraph.Summary` object and write it to a file
and/or the screen.

Parameters
----------
summary : `QuantumProvenanceGraph.Summary`
This `Pydantic` model contains all the information derived from the
`QuantumProvenanceGraph`.
full_output_filename : `str | None`
Name of the JSON file in which to store summary information, if
passed.
brief : `bool`
Only display short (counts-only) summary on stdout. This includes
counts and not error messages or data_ids (similar to BPS report).
This option will still report all `cursed` datasets and `wonky`
quanta.
"""
quanta_table = []
failed_quanta_table = []
wonky_quanta_table = []
for label, task_summary in summary.tasks.items():
if task_summary.n_wonky > 0:
print(
f"{label} has produced wonky quanta. Recommend processing cease until the issue is resolved."
)
for quantum_summary in task_summary.wonky_quanta:
wonky_quanta_table.append(
{
"Task": label,
"Data ID": quantum_summary.data_id,
"Runs and Status": quantum_summary.runs,
"Messages": quantum_summary.messages,
}
)
quanta_table.append(
{
"Task": label,
"Unknown": task_summary.n_unknown,
"Successful": task_summary.n_successful,
"Blocked": task_summary.n_blocked,
"Failed": task_summary.n_failed,
"Wonky": task_summary.n_wonky,
"TOTAL": sum(
[
task_summary.n_successful,
task_summary.n_unknown,
task_summary.n_blocked,
task_summary.n_failed,
task_summary.n_wonky,
]
),
"EXPECTED": task_summary.n_expected,
}
)
if task_summary.failed_quanta:
for quantum_summary in task_summary.failed_quanta:
failed_quanta_table.append(
{
"Task": label,
"Data ID": quantum_summary.data_id,
"Runs and Status": quantum_summary.runs,
"Messages": quantum_summary.messages,
}
)
quanta = Table(quanta_table)
quanta.pprint_all()
# Dataset loop
dataset_table = []
cursed_datasets = []
unsuccessful_datasets = {}
for dataset_type_name, dataset_type_summary in summary.datasets.items():
dataset_table.append(
{
"Dataset": dataset_type_name,
"Visible": dataset_type_summary.n_visible,
"Shadowed": dataset_type_summary.n_shadowed,
"Predicted Only": dataset_type_summary.n_predicted_only,
"Unsuccessful": dataset_type_summary.n_unsuccessful,
"Cursed": dataset_type_summary.n_cursed,
"TOTAL": sum(
[
dataset_type_summary.n_visible,
dataset_type_summary.n_shadowed,
dataset_type_summary.n_predicted_only,
dataset_type_summary.n_unsuccessful,
dataset_type_summary.n_cursed,
]
),
"EXPECTED": dataset_type_summary.n_expected,
}
)
if dataset_type_summary.n_cursed > 0:
for cursed_dataset in dataset_type_summary.cursed_datasets:
print(
f"{dataset_type_name} has cursed quanta with message(s) {cursed_dataset.messages}. "
"Recommend processing cease until the issue is resolved."
)
cursed_datasets.append(
{
"Dataset Type": dataset_type_name,
"Producer Data Id": cursed_dataset.producer_data_id,
}
)
if dataset_type_summary.n_unsuccessful > 0:
unsuccessful_datasets[dataset_type_name] = dataset_type_summary.unsuccessful_datasets
datasets = Table(dataset_table)
datasets.pprint_all()
curse_table = Table(cursed_datasets)
# Display wonky quanta
if wonky_quanta_table:
print("Wonky Quanta")
pprint.pprint(wonky_quanta_table)
# Display cursed datasets
if cursed_datasets:
print("Cursed Datasets")
curse_table.pprint_all()
if full_output_filename:
with open(full_output_filename, "w") as stream:
stream.write(summary.model_dump_json(indent=2))
else:
if not brief:
if failed_quanta_table:
print("Failed Quanta")
pprint.pprint(failed_quanta_table)
if unsuccessful_datasets:
print("Unsuccessful Datasets")
pprint.pprint(unsuccessful_datasets)
Loading
Loading