diff --git a/doc/changes/DM-41711.feature.md b/doc/changes/DM-41711.feature.md new file mode 100644 index 00000000..8e7dba37 --- /dev/null +++ b/doc/changes/DM-41711.feature.md @@ -0,0 +1,6 @@ +Upgrade QuantumGraphExecutionReport to handle multiple overlapping graphs. + +Update the `pipetask report` command-line interface to accommodate the new +`QuantumProvenanceGraph`. This allows for resolving outcomes of processing over +multiple attempts (and associated graphs) over the same dataquery, providing +information on recoveries, persistent issues and mismatch errors. diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 79162861..5d26938a 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -26,7 +26,7 @@ # along with this program. If not, see . import sys -from collections.abc import Iterator +from collections.abc import Iterator, Sequence from contextlib import contextmanager from functools import partial from tempfile import NamedTemporaryFile @@ -38,12 +38,14 @@ from lsst.ctrl.mpexec import Report from lsst.ctrl.mpexec.showInfo import ShowInfo from lsst.daf.butler.cli.opt import ( + collections_option, config_file_option, config_option, confirm_option, options_file_option, processes_option, repo_argument, + where_option, ) from lsst.daf.butler.cli.utils import MWCtxObj, catch_and_exit, option_section, unwrap @@ -341,25 +343,76 @@ def update_graph_run( @click.command(cls=PipetaskCommand) @repo_argument() -@ctrlMpExecOpts.qgraph_argument() -@click.option("--full-output-filename", default="", help="Summarize report in a yaml file") +@click.argument("qgraphs", nargs=-1) +@collections_option() +@where_option() +@click.option( + "--full-output-filename", + default="", + help="Output report as a file with this name. " + "For pipetask report on one graph, this should be a yaml file. For multiple graphs " + "or when using the --force-v2 option, this should be a json file. We will be " + "deprecating the single-graph-only (QuantumGraphExecutionReport) option soon.", +) @click.option("--logs/--no-logs", default=True, help="Get butler log datasets for extra information.") @click.option( - "--show-errors", + "--brief", + default=False, + is_flag=True, + help="Only show counts in report (a brief summary). Note that counts are" + " also printed to the screen when using the --full-output-filename option.", +) +@click.option( + "--curse-failed-logs", is_flag=True, default=False, - help="Pretty-print a dict of errors from failed" - " quanta to the screen. Note: the default is to output a yaml file with error information" - " (data_ids and associated messages) to the current working directory instead.", + help="If log datasets are missing in v2 (QuantumProvenanceGraph), mark them as cursed", +) +@click.option( + "--force-v2", + is_flag=True, + default=False, + help="Use the QuantumProvenanceGraph instead of the QuantumGraphExecutionReport, " + "even when there is only one qgraph. Otherwise, the `QuantumGraphExecutionReport` " + "will run on one graph by default.", ) def report( - repo: str, qgraph: str, full_output_filename: str = "", logs: bool = True, show_errors: bool = False + repo: str, + qgraphs: Sequence[str], + collections: Sequence[str] | None, + where: str, + full_output_filename: str = "", + logs: bool = True, + brief: bool = False, + curse_failed_logs: bool = False, + force_v2: bool = False, ) -> None: - """Write a yaml file summarizing the produced and missing expected datasets - in a quantum graph. + """Summarize the state of executed quantum graph(s), with counts of failed, + successful and expected quanta, as well as counts of output datasets and + their query (visible/shadowed) states. Analyze one or more attempts at the + same processing on the same dataquery-identified "group" and resolve + recoveries and persistent failures. Identify mismatch errors between + attempts. + + Save the report as a file (`--full-output-filename`) or print it to stdout + (default). If the terminal is overwhelmed with data_ids from failures try + the `--brief` option. + + Butler `collections` and `where` options are for use in + `lsst.daf.butler.queryDatasets` if paring down the collections would be + useful. Pass collections in order of most to least recent. By default the + collections and query will be taken from the graphs. REPO is the location of the butler/registry config file. - QGRAPH is the URL to a serialized Quantum Graph file. + QGRAPHS is a `Sequence` of links to serialized Quantum Graphs which have + been executed and are to be analyzed. Pass the graphs in order of first to + last executed. """ - script.report(repo, qgraph, full_output_filename, logs, show_errors) + if any([force_v2, len(qgraphs) > 1, collections, where, curse_failed_logs]): + script.report_v2( + repo, qgraphs, collections, where, full_output_filename, logs, brief, curse_failed_logs + ) + else: + assert len(qgraphs) == 1, "Cannot make a report without a quantum graph." + script.report(repo, qgraphs[0], full_output_filename, logs, brief) diff --git a/python/lsst/ctrl/mpexec/cli/script/__init__.py b/python/lsst/ctrl/mpexec/cli/script/__init__.py index 10767d14..dc14543e 100644 --- a/python/lsst/ctrl/mpexec/cli/script/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/script/__init__.py @@ -31,7 +31,7 @@ from .pre_exec_init_qbb import pre_exec_init_qbb from .purge import PurgeResult, purge from .qgraph import qgraph -from .report import report +from .report import report, report_v2 from .run import run from .run_qbb import run_qbb from .update_graph_run import update_graph_run diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 39b4df1d..62d88e79 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -25,12 +25,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pprint +from collections.abc import Sequence -import yaml from astropy.table import Table from lsst.daf.butler import Butler from lsst.pipe.base import QuantumGraph from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary def report( @@ -38,9 +39,11 @@ def report( qgraph_uri: str, full_output_filename: str | None, logs: bool = True, - show_errors: bool = False, + brief: bool = False, ) -> None: - """Summarize the produced and missing expected dataset in a quantum graph. + """Summarize the produced, missing and expected quanta and + datasets belonging to an executed quantum graph using the + `QuantumGraphExecutionReport`. Parameters ---------- @@ -58,11 +61,9 @@ def report( command-line instead. logs : `bool` Get butler log datasets for extra information (error messages). - show_errors : `bool` - If no output yaml is provided, print error messages to the - command-line along with the report. By default, these messages and - their associated data ids are stored in a yaml file with format - `{run timestamp}_err.yaml` in the working directory instead. + brief : `bool` + List only the counts (or data_ids if number of failures < 5). This + option is good for those who just want to see totals. """ butler = Butler.from_config(butler_config, writeable=False) qgraph = QuantumGraph.loadUri(qgraph_uri) @@ -106,16 +107,218 @@ def report( datasets.add_column(data_products, index=0, name="DatasetType") quanta.pprint_all() print("\n") - if show_errors: + if not brief: pprint.pprint(error_summary) print("\n") - else: - assert qgraph.metadata is not None, "Saved QGs always have metadata." - collection = qgraph.metadata["output_run"] - collection = str(collection) - run_name = collection.split("/")[-1] - with open(f"{run_name}_err.yaml", "w") as stream: - yaml.safe_dump(error_summary, stream) datasets.pprint_all() else: report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs) + + +def report_v2( + butler_config: str, + qgraph_uris: Sequence[str], + collections: Sequence[str] | None, + where: str, + full_output_filename: str | None, + logs: bool = True, + brief: bool = False, + curse_failed_logs: bool = False, +) -> None: + """Summarize the state of executed quantum graph(s), with counts of failed, + successful and expected quanta, as well as counts of output datasets and + their visible/shadowed states. Analyze one or more attempts at the same + processing on the same dataquery-identified "group" and resolve recoveries + and persistent failures. Identify mismatch errors between groups. + + Parameters + ---------- + butler_config : `str` + The Butler used for this report. This should match the Butler used + for the run associated with the executed quantum graph. + qgraph_uris : `Sequence` [`str`] + One or more uris to the serialized Quantum Graph(s). + collections : `Sequence` [`str`] | None` + Collection(s) associated with said graphs/processing. For use in + `lsst.daf.butler.registry.queryDatasets` if paring down the query + would be useful. + where : `str` + A "where" string to use to constrain the collections, if passed. + full_output_filename : `str` + Output the full pydantic model `QuantumProvenanceGraph.Summary` + object into a JSON file. This is ideal for error-matching and + cataloguing tools such as the ones used by Campaign Management + software and pilots, and for searching and counting specific kinds + or instances of failures. This option will also print a "brief" + (counts-only) summary to stdout. + logs : `bool` + Store error messages from Butler logs associated with failed quanta + if `True`. + brief : `bool` + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + This option will still report all `cursed` datasets and `wonky` + quanta. + curse_failed_logs : `bool` + Mark log datasets as `cursed` if they are published in the final + output collection. Note that a campaign-level collection must be + used here for `collections` if `curse_failed_logs` is `True`; if + `lsst.pipe.base.QuantumProvenanceGraph.__resolve_duplicates` + is run on a list of group-level collections, then each will only + show log datasets from their own failures as visible and datasets + from others will be marked as cursed. + """ + butler = Butler.from_config(butler_config, writeable=False) + qpg = QuantumProvenanceGraph() + qgraphs = [] + for qgraph_uri in qgraph_uris: + qgraph = QuantumGraph.loadUri(qgraph_uri) + assert qgraph.metadata is not None, "Saved QGs always have metadata." + qgraphs.append(qgraph) + # If the most recent graph's timestamp was earlier than any of the + # previous graphs, raise a RuntimeError. + for count, qgraph in enumerate(qgraphs): + if len(qgraphs) > 1: + previous_graph = qgraphs[count - 1] + if qgraph.metadata["time"] < previous_graph.metadata["time"]: + raise RuntimeError( + f"""add_new_graph may only be called on graphs + which are passed in the order they were + created. Please call again, passing your + graphs in order. Time of first graph: + {qgraph.metadata["time"]} > + time of second graph: {previous_graph.metadata["time"]}""" + ) + qpg.assemble_quantum_provenance_graph(butler, qgraphs, collections, where, curse_failed_logs) + summary = qpg.to_summary(butler, do_store_logs=logs) + print_summary(summary, full_output_filename, brief) + + +def print_summary(summary: Summary, full_output_filename: str | None, brief: bool = False) -> None: + """Take a `QuantumProvenanceGraph.Summary` object and write it to a file + and/or the screen. + + Parameters + ---------- + summary : `QuantumProvenanceGraph.Summary` + This `Pydantic` model contains all the information derived from the + `QuantumProvenanceGraph`. + full_output_filename : `str | None` + Name of the JSON file in which to store summary information, if + passed. + brief : `bool` + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + This option will still report all `cursed` datasets and `wonky` + quanta. + """ + quanta_table = [] + failed_quanta_table = [] + wonky_quanta_table = [] + for label, task_summary in summary.tasks.items(): + if task_summary.n_wonky > 0: + print( + f"{label} has produced wonky quanta. Recommend processing cease until the issue is resolved." + ) + for quantum_summary in task_summary.wonky_quanta: + wonky_quanta_table.append( + { + "Task": label, + "Data ID": quantum_summary.data_id, + "Runs and Status": quantum_summary.runs, + "Messages": quantum_summary.messages, + } + ) + quanta_table.append( + { + "Task": label, + "Unknown": task_summary.n_unknown, + "Successful": task_summary.n_successful, + "Blocked": task_summary.n_blocked, + "Failed": task_summary.n_failed, + "Wonky": task_summary.n_wonky, + "TOTAL": sum( + [ + task_summary.n_successful, + task_summary.n_unknown, + task_summary.n_blocked, + task_summary.n_failed, + task_summary.n_wonky, + ] + ), + "EXPECTED": task_summary.n_expected, + } + ) + if task_summary.failed_quanta: + for quantum_summary in task_summary.failed_quanta: + failed_quanta_table.append( + { + "Task": label, + "Data ID": quantum_summary.data_id, + "Runs and Status": quantum_summary.runs, + "Messages": quantum_summary.messages, + } + ) + quanta = Table(quanta_table) + quanta.pprint_all() + # Dataset loop + dataset_table = [] + cursed_datasets = [] + unsuccessful_datasets = {} + for dataset_type_name, dataset_type_summary in summary.datasets.items(): + dataset_table.append( + { + "Dataset": dataset_type_name, + "Visible": dataset_type_summary.n_visible, + "Shadowed": dataset_type_summary.n_shadowed, + "Predicted Only": dataset_type_summary.n_predicted_only, + "Unsuccessful": dataset_type_summary.n_unsuccessful, + "Cursed": dataset_type_summary.n_cursed, + "TOTAL": sum( + [ + dataset_type_summary.n_visible, + dataset_type_summary.n_shadowed, + dataset_type_summary.n_predicted_only, + dataset_type_summary.n_unsuccessful, + dataset_type_summary.n_cursed, + ] + ), + "EXPECTED": dataset_type_summary.n_expected, + } + ) + if dataset_type_summary.n_cursed > 0: + for cursed_dataset in dataset_type_summary.cursed_datasets: + print( + f"{dataset_type_name} has cursed quanta with message(s) {cursed_dataset.messages}. " + "Recommend processing cease until the issue is resolved." + ) + cursed_datasets.append( + { + "Dataset Type": dataset_type_name, + "Producer Data Id": cursed_dataset.producer_data_id, + } + ) + if dataset_type_summary.n_unsuccessful > 0: + unsuccessful_datasets[dataset_type_name] = dataset_type_summary.unsuccessful_datasets + datasets = Table(dataset_table) + datasets.pprint_all() + curse_table = Table(cursed_datasets) + # Display wonky quanta + if wonky_quanta_table: + print("Wonky Quanta") + pprint.pprint(wonky_quanta_table) + # Display cursed datasets + if cursed_datasets: + print("Cursed Datasets") + curse_table.pprint_all() + if full_output_filename: + with open(full_output_filename, "w") as stream: + stream.write(summary.model_dump_json(indent=2)) + else: + if not brief: + if failed_quanta_table: + print("Failed Quanta") + pprint.pprint(failed_quanta_table) + if unsuccessful_datasets: + print("Unsuccessful Datasets") + pprint.pprint(unsuccessful_datasets) diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index 4710b226..d3b8bb6b 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -34,6 +34,7 @@ from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir +from lsst.pipe.base.quantum_provenance_graph import DatasetTypeSummary, Summary, TaskSummary from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph from lsst.pipe.base.tests.util import check_output_run from yaml.loader import SafeLoader @@ -52,7 +53,7 @@ def tearDown(self) -> None: removeTestTempDir(self.root) def test_report(self): - """Test for making a report on the produced and missing expected + """Test for making a report on the produced, missing and expected datasets in a quantum graph. """ metadata = {"output_run": "run"} @@ -74,19 +75,21 @@ def test_report(self): ["report", self.root, graph_uri, "--full-output-filename", test_filename, "--no-logs"], input="no", ) + # Check that we can read from the command line self.assertEqual(result.exit_code, 0, clickResultMsg(result)) # Check that we can open and read the file produced by make_reports with open(test_filename) as f: report_output_dict = yaml.load(f, Loader=SafeLoader) + self.assertIsNotNone(report_output_dict["task0"]) self.assertIsNotNone(report_output_dict["task0"]["failed_quanta"]) self.assertIsInstance(report_output_dict["task0"]["n_expected"], int) result_hr = self.runner.invoke( pipetask_cli, - ["report", self.root, graph_uri, "--no-logs", "--show-errors"], + ["report", self.root, graph_uri, "--no-logs"], input="no", ) @@ -102,6 +105,390 @@ def test_report(self): self.assertIn("Expected", result_hr.stdout) self.assertIn("Succeeded", result_hr.stdout) + # Check brief option for pipetask report + result_brief = self.runner.invoke( + pipetask_cli, + ["report", self.root, graph_uri, "--no-logs", "--brief"], + input="no", + ) + self.assertIsInstance(result_brief.stdout, str) + + # Check that task0 and the failed quanta for task0 exist in the string + self.assertIn("task0", result_brief.stdout) + self.assertIn("Failed", result_brief.stdout) + self.assertIn("Expected", result_brief.stdout) + self.assertIn("Succeeded", result_brief.stdout) + + # Test cli for the QPG + result_v2_terminal_out = self.runner.invoke( + pipetask_cli, + ["report", self.root, graph_uri, "--no-logs", "--force-v2"], + input="no", + ) + + # Check that we can read from the command line + self.assertEqual(result_v2_terminal_out.exit_code, 0, clickResultMsg(result_v2_terminal_out)) + + # Check that we get string output + self.assertIsInstance(result_v2_terminal_out.stdout, str) + + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_terminal_out.stdout) + self.assertIn("Unknown", result_v2_terminal_out.stdout) + self.assertIn("Successful", result_v2_terminal_out.stdout) + self.assertIn("Blocked", result_v2_terminal_out.stdout) + self.assertIn("Failed", result_v2_terminal_out.stdout) + self.assertIn("Wonky", result_v2_terminal_out.stdout) + self.assertIn("TOTAL", result_v2_terminal_out.stdout) + self.assertIn("EXPECTED", result_v2_terminal_out.stdout) + + # Check that title from the error summary appears + self.assertIn("Unsuccessful Datasets", result_v2_terminal_out.stdout) + + # Test cli for the QPG brief option + result_v2_brief = self.runner.invoke( + pipetask_cli, + ["report", self.root, graph_uri, "--no-logs", "--force-v2", "--brief"], + input="no", + ) + + # Check that we can read from the command line + self.assertEqual(result_v2_brief.exit_code, 0, clickResultMsg(result_v2_brief)) + + # Check that we get string output + self.assertIsInstance(result_v2_brief.stdout, str) + + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_brief.stdout) + self.assertIn("Unknown", result_v2_brief.stdout) + self.assertIn("Successful", result_v2_brief.stdout) + self.assertIn("Blocked", result_v2_brief.stdout) + self.assertIn("Failed", result_v2_brief.stdout) + self.assertIn("Wonky", result_v2_brief.stdout) + self.assertIn("TOTAL", result_v2_brief.stdout) + self.assertIn("EXPECTED", result_v2_brief.stdout) + + # Check that the full output option works + test_filename_v2 = os.path.join(self.root, "report_test.json") + result_v2_full = self.runner.invoke( + pipetask_cli, + [ + "report", + self.root, + graph_uri, + "--no-logs", + "--full-output-filename", + test_filename_v2, + "--force-v2", + ], + input="no", + ) + + self.assertEqual(result_v2_full.exit_code, 0, clickResultMsg(result_v2_full)) + # Check the "brief" output that prints to the terminal first: + # Check that we get string output + self.assertIsInstance(result_v2_full.stdout, str) + + # Check that task0 and the quanta for task0 exist in the string + self.assertIn("task0", result_v2_full.stdout) + self.assertIn("Unknown", result_v2_full.stdout) + self.assertIn("Successful", result_v2_full.stdout) + self.assertIn("Blocked", result_v2_full.stdout) + self.assertIn("Failed", result_v2_full.stdout) + self.assertIn("Wonky", result_v2_full.stdout) + self.assertIn("TOTAL", result_v2_full.stdout) + self.assertIn("EXPECTED", result_v2_full.stdout) + + # Then validate the full output json file: + with open(test_filename_v2) as f: + output = f.read() + model = Summary.model_validate_json(output) + self.assertDictEqual( + model.tasks, + { + "task0": TaskSummary( + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task1": TaskSummary( + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task2": TaskSummary( + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task3": TaskSummary( + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + "task4": TaskSummary( + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ), + }, + ) + self.assertDictEqual( + model.datasets, + { + "add_dataset1": DatasetTypeSummary( + producer="task0", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset1": DatasetTypeSummary( + producer="task0", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task0_metadata": DatasetTypeSummary( + producer="task0", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task0_log": DatasetTypeSummary( + producer="task0", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset2": DatasetTypeSummary( + producer="task1", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset2": DatasetTypeSummary( + producer="task1", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task1_metadata": DatasetTypeSummary( + producer="task1", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task1_log": DatasetTypeSummary( + producer="task1", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset3": DatasetTypeSummary( + producer="task2", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset3": DatasetTypeSummary( + producer="task2", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task2_metadata": DatasetTypeSummary( + producer="task2", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task2_log": DatasetTypeSummary( + producer="task2", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset4": DatasetTypeSummary( + producer="task3", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset4": DatasetTypeSummary( + producer="task3", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task3_metadata": DatasetTypeSummary( + producer="task3", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task3_log": DatasetTypeSummary( + producer="task3", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add_dataset5": DatasetTypeSummary( + producer="task4", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "add2_dataset5": DatasetTypeSummary( + producer="task4", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task4_metadata": DatasetTypeSummary( + producer="task4", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + "task4_log": DatasetTypeSummary( + producer="task4", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ), + }, + ) + if __name__ == "__main__": unittest.main()