Skip to content

Commit

Permalink
Add dataset tables
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Jul 17, 2024
1 parent bc6303a commit c4711de
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 50 deletions.
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,5 +389,5 @@ def report(
repo, qgraphs, collections, where, full_output_filename, logs, show_errors, curse_failed_logs
)
else:
assert(len(qgraphs) == 1, "Cannot make a report without a quantum graph.")
assert len(qgraphs) == 1, "Cannot make a report without a quantum graph."
script.report(repo, qgraphs[0], full_output_filename, logs, show_errors)
152 changes: 103 additions & 49 deletions python/lsst/ctrl/mpexec/cli/script/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections.abc import Sequence
import pprint, time
import pprint
import time
from collections.abc import Sequence, Iterable

Check failure on line 29 in python/lsst/ctrl/mpexec/cli/script/report.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F401

'collections.abc.Iterable' imported but unused
from typing import Any

import yaml
from astropy.table import Table
Expand Down Expand Up @@ -133,18 +135,18 @@ def report_v2(
show_errors: bool = False,
curse_failed_logs: bool = False,
) -> None:
"""Docstring
"""
"""Docstring."""
butler = Butler.from_config(butler_config, writeable=False)
qpg = QuantumProvenanceGraph()
output_runs = []
for qgraph_uri in qgraph_uris:
qgraph = QuantumGraph.loadUri(qgraph_uri)
qpg.add_new_graph(butler, qgraph)
output_runs.append(qgraph.metadata["output_run"])
collections_sequence: Sequence[Any] # to appease mypy

Check failure on line 146 in python/lsst/ctrl/mpexec/cli/script/report.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E261

at least two spaces before inline comment
if collections is None:
collections = reversed(output_runs)
qpg.resolve_duplicates(butler, collections=collections, where=where, curse_failed_logs=curse_failed_logs)
collections_sequence = list(reversed(output_runs))
qpg.resolve_duplicates(butler, collections=collections_sequence, where=where, curse_failed_logs=curse_failed_logs)

Check failure on line 149 in python/lsst/ctrl/mpexec/cli/script/report.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E501

line too long (118 > 110 characters)
summary = qpg.to_summary(butler, do_store_logs=logs)
summary_dict = summary.model_dump()
if full_output_filename:
Expand All @@ -156,16 +158,21 @@ def report_v2(
wonky_quanta_table = []
for task in summary_dict["tasks"].keys():
if summary_dict["tasks"][task]["n_wonky"] > 0:
print(f"{task} has produced wonky quanta. Recommend processing cease until the issue is resolved.")
j=0
print(
f"{task} has produced wonky quanta. Recommend processing"
"cease until the issue is resolved."
)
j = 0
for data_id in summary_dict["tasks"][task]["wonky_quanta"]:
wonky_quanta_table.append({
"Task": task,
"Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"],
"Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"],
})
j+=1
wonky_quanta_table.append(
{
"Task": task,
"Data ID": summary_dict["tasks"][task]["wonky_quanta"][j]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["wonky_quanta"][j]["runs"],
"Messages": summary_dict["tasks"][task]["wonky_quanta"][j]["messages"],
}
)
j += 1
quanta_table.append(
{
"Task": task,
Expand All @@ -174,55 +181,102 @@ def report_v2(
"Blocked": summary_dict["tasks"][task]["n_blocked"],
"Failed": summary_dict["tasks"][task]["n_failed"],
"Wonky": summary_dict["tasks"][task]["n_wonky"],
"TOTAL": sum([
summary_dict["tasks"][task]["n_successful"],
summary_dict["tasks"][task]["n_not_attempted"],
summary_dict["tasks"][task]["n_blocked"],
summary_dict["tasks"][task]["n_failed"],
summary_dict["tasks"][task]["n_wonky"],
]),
"EXPECTED": summary_dict["tasks"][task]["n_expected"]
"TOTAL": sum(
[
summary_dict["tasks"][task]["n_successful"],
summary_dict["tasks"][task]["n_not_attempted"],
summary_dict["tasks"][task]["n_blocked"],
summary_dict["tasks"][task]["n_failed"],
summary_dict["tasks"][task]["n_wonky"],
]
),
"EXPECTED": summary_dict["tasks"][task]["n_expected"],
}
)
if summary_dict["tasks"][task]["failed_quanta"]:
i=0
i = 0
for data_id in summary_dict["tasks"][task]["failed_quanta"]:
failed_quanta_table.append({
"Task": task,
"Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"],
"Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"],
})
i+=1
failed_quanta_table.append(
{
"Task": task,
"Data ID": summary_dict["tasks"][task]["failed_quanta"][i]["data_id"],
"Runs and Status": summary_dict["tasks"][task]["failed_quanta"][i]["runs"],
"Messages": summary_dict["tasks"][task]["failed_quanta"][i]["messages"],
}
)
i += 1
quanta = Table(quanta_table)
quanta.pprint_all()
# Dataset loop
# dataset_dict = {}
# for dataset in summary_dict["datasets"].keys():
# for producer_task in summary_dict["datasets"][dataset]["producer"]:
# dataset_dict[producer_task] = summary_dict["datasets"][dataset]
# pprint.pprint(dataset_dict)
# for producer_task in dataset_dict:
# task_table = []
# for task in dataset_dict[producer_task].keys():
# print(task)

# If there are wonky quanta, print them to the screen. People should
# be confronted with them immediately.
dataset_table = []
cursed_datasets = []
unsuccessful_datasets = {}
for dataset in summary_dict["datasets"]:
dataset_table.append(
{
"Dataset": dataset,
"Published": summary_dict["datasets"][dataset]["n_published"],
"Unpublished": summary_dict["datasets"][dataset]["n_unpublished"],
"Predicted Only": summary_dict["datasets"][dataset]["n_predicted_only"],
"Unsuccessful": summary_dict["datasets"][dataset]["n_unsuccessful"],
"Cursed": summary_dict["datasets"][dataset]["n_cursed"],
"TOTAL": sum(
[
summary_dict["datasets"][dataset]["n_published"],
summary_dict["datasets"][dataset]["n_unpublished"],
summary_dict["datasets"][dataset]["n_predicted_only"],
summary_dict["datasets"][dataset]["n_unsuccessful"],
summary_dict["datasets"][dataset]["n_cursed"],
]
),
"EXPECTED": summary_dict["datasets"][dataset]["n_expected"],
}
)
if summary_dict["datasets"][dataset]["n_cursed"] > 0:
print(
f"{dataset} has cursed quanta with message(s) "
"{summary_dict[task]['cursed_datasets']['messages']}. "
"Recommend processing cease until the issue is resolved."
)
cursed_datasets.append(
{
"Dataset Type": dataset,
"Parent Data Id": summary_dict["datasets"][dataset]["cursed_datasets"][
"parent_data_id"
],
}
)
if summary_dict["datasets"][dataset]["n_unsuccessful"] > 0:
unsuccessful_datasets[dataset] = summary_dict["datasets"][dataset]["unsuccessful_datasets"]
datasets = Table(dataset_table)
datasets.pprint_all()
curse_table = Table(cursed_datasets)
# Display wonky quanta
if wonky_quanta_table:
print("Wonky Quanta")
pprint.pprint(wonky_quanta_table)
# Display cursed datasets
if cursed_datasets:
print("Cursed Datasets")
curse_table.pprint_all()
if show_errors:
print("Failed Quanta")
pprint.pprint(failed_quanta_table)
if failed_quanta_table:
print("Failed Quanta")
pprint.pprint(failed_quanta_table)
if unsuccessful_datasets:
print("Unsuccessful Datasets")
pprint.pprint(unsuccessful_datasets)
elif not show_errors:
if failed_quanta_table or wonky_quanta_table:
timestr = time.strftime("%Y%m%d-%H%M%S")
timestr = time.strftime("%Y%m%d-%H%M%S")
if failed_quanta_table:
with open(f"qpg_failed_quanta_{timestr}.yaml", "w") as stream:
yaml.safe_dump(failed_quanta_table, stream)
if wonky_quanta_table:
with open(f"qpg_wonky_quanta_{timestr}.yaml", "w") as stream:
yaml.safe_dump(wonky_quanta_table, stream)


if unsuccessful_datasets:
with open(f"qpg_unsuccessful_datasets_{timestr}.yaml", "w") as stream:
yaml.safe_dump(unsuccessful_datasets, stream)
if curse_table:
with open(f"qpg_cursed_datasets_{timestr}.yaml", "w") as stream:
yaml.safe_dump(curse_table, stream)

0 comments on commit c4711de

Please sign in to comment.