From e5d3d756fe483daaac26245f9c6f673d22319564 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 20 Sep 2024 20:52:25 -0700 Subject: [PATCH] Respond to review feedback --- .../pipe/base/quantum_provenance_graph.py | 131 +++++++++++------- tests/test_quantum_provenance_graph.py | 51 +++---- 2 files changed, 106 insertions(+), 76 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 1881df59..58f036e5 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -85,7 +85,7 @@ class QuantumKey(NamedTuple): class DatasetKey(NamedTuple): """Identifier type for dataset keys in a `QuantumProvenanceGraph`.""" - parent_dataset_type_name: str + dataset_type_name: str """Name of the dataset type (never a component).""" data_id_values: tuple[DataIdValue, ...] @@ -117,7 +117,7 @@ class PrerequisiteDatasetKey(NamedTuple): dataset ID (UUID) instead. """ - parent_dataset_type_name: str + dataset_type_name: str """Name of the dataset type (never a component).""" dataset_id_bytes: bytes @@ -180,8 +180,9 @@ class QuantumInfoStatus(Enum): state; it is impossible to exit and requires human intervention to proceed with processing. Currently, a quantum enters a wonky state for one of three reasons: - - Its `QuantumInfoStatus` exits a successful state. Something that - initially succeeded fails on subsequent attempts. + - Its overall `QuantumInfoStatus` moves from a successful state (as a + result of a successful run) to any other state. In other words, + something that initially succeeded fails on subsequent attempts. - A `QuantumRun` is missing logs. - There are multiple runs associated with a dataset, and this comes up in a findFirst search. This means that a dataset which will be used @@ -258,7 +259,7 @@ class DatasetRun(pydantic.BaseModel): """ produced: bool = False - """Whether the specific run produced the dataset. + """Whether the specific run wrote the dataset. """ visible: bool = False @@ -271,13 +272,20 @@ class DatasetRun(pydantic.BaseModel): def _validate(self) -> DatasetRun: """Validate the model for `DatasetRun` by asserting that no visible `DatasetRun` is also not produced (this should be impossible). + + Returns + ------- + self : `DatasetRun` + The `DatasetRun` object, validated. """ assert not (self.visible and not self.produced) return self class DatasetInfoStatus(Enum): - """Status of the the DatasetType-dataID pair over all runs. + """Status of the the DatasetType-dataID pair over all runs. This depends + not only on the presence of the dataset itself, but also on metadata, logs + and the state of its producer quantum. Possible Statuses ----------------- @@ -341,12 +349,13 @@ class UnsuccessfulQuantumSummary(pydantic.BaseModel): """The data_id of the unsuccessful quantum. """ runs: dict[str, str] - """A dictionary including the enum name of the `QuantumRunStatus` of each - run associated with an attempt to process the unsuccessful quantum. + """A dictionary (key: output run collection name) with the value of the + enum name of the `QuantumRunStatus` of each run associated with an attempt + to process the unsuccessful quantum. """ messages: list[str] """Any messages associated with the unsuccessful quantum (any clues as to - why the quantum may be in a `FAILED` or `WONKY` state). + why the quantum may be in a FAILED or WONKY state). """ @classmethod @@ -358,6 +367,17 @@ def from_info(cls, info: QuantumInfo) -> UnsuccessfulQuantumSummary: ---------- info : `QuantumInfo` The `QuantumInfo` object for the unsuccessful quantum. + + Returns + ------- + summary : `UnsuccessfulQuantumSummary` + A Pydantic model containing the dataID, run collection names (and + each of their `QuantumRunStatus` enum names) as well as messages + which may point to any clues about the nature of the problem. For + failed quanta, these are usually error messages from the butler + logs. For wonky quanta, these can be messages generated during the + assembly of the `QuantumProvenanceGraph` that describe why it was + marked as wonky. """ return cls( data_id=dict(info["data_id"].required), @@ -404,8 +424,8 @@ def n_failed(self) -> int: associated with the failures when applicable. """ recovered_quanta: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list) - """A list of the quanta which moved from an unsuccessful to SUCCESSFUL - state. + """A list of dataIDs (key->value) which moved from an unsuccessful to + successful state. """ wonky_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list) """A list of all `UnsuccessfulQuantumSummary` objects associated with the @@ -450,9 +470,7 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo try: # should probably upgrade this to use a dataset # ref - log = butler.get( - log_key.parent_dataset_type_name, info["data_id"], collections=run - ) + log = butler.get(log_key.dataset_type_name, info["data_id"], collections=run) except LookupError: failed_quantum_summary.messages.append(f"Logs not ingested for {run!r}") except FileNotFoundError: @@ -503,6 +521,17 @@ def from_info(cls, info: DatasetInfo, producer_info: QuantumInfo) -> CursedDatas producer_info : `QuantumInfo` All relevant information on the producer task. This is used to report the data_id of the producer task. + + Returns + ------- + summary : `CursedDatasetSummary` + A Pydantic model containing the dataID of the task which produced + this cursed dataset, the dataID associated with the cursed dataset, + run collection names (and their `DatasetRun` information) as well + as any messages which may point to any clues about the nature of + the problem. These are be messages generated during the assembly of + the `QuantumProvenanceGraph` that describe why it was marked as + cursed. """ runs_visible = {k for k, v in info["runs"].items() if v.visible} return cls( @@ -650,6 +679,12 @@ def get_quantum_info(self, key: QuantumKey) -> QuantumInfo: ---------- key : `QuantumKey` The key used to refer to the node on the graph. + + Returns + ------- + quantum_info : `QuantumInfo` + The `TypedDict` with information on the task label-dataID pair + across all runs. """ return self._xgraph.nodes[key] @@ -661,15 +696,23 @@ def get_dataset_info(self, key: DatasetKey) -> DatasetInfo: ---------- key : `DatasetKey` The key used to refer to the node on the graph. + + Returns + ------- + dataset_info : `DatasetInfo` + The `TypedDict` with information about the `DatasetType`-dataID + pair across all runs. """ return self._xgraph.nodes[key] def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None: """Add a new quantum graph to the `QuantumProvenanceGraph`. - Step through the quantum graph. Annotate a `networkx.DiGraph` - (`QuantumProvenanceGraph._xgraph`) with all of the relevant - information: quanta, dataset types and their associated run + Notes + ----- + The algorithm: step through the quantum graph. Annotate a + `networkx.DiGraph` (`QuantumProvenanceGraph._xgraph`) with all of the + relevant information: quanta, dataset types and their associated run collections (these unique quanta- and dataset type-run collection combinations are encapsulated in the classes `DatasetRun` and `QuantumRun`). For each new quantum, annotate @@ -702,7 +745,8 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp output_run = qgraph.metadata["output_run"] new_quanta = [] for node in qgraph: - # make a key to refer to the quantum and add it to the graph. + # make a key to refer to the quantum and add it to the quantum + # provenance graph. quantum_key = QuantumKey( node.taskDef.label, cast(DataCoordinate, node.quantum.dataId).required_values ) @@ -737,15 +781,15 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp dataset_info.setdefault("data_id", ref.dataId) dataset_info.setdefault("status", DatasetInfoStatus.PREDICTED_ONLY) dataset_info.setdefault("messages", []) - self._datasets.setdefault(dataset_key.parent_dataset_type_name, set()).add(dataset_key) + self._datasets.setdefault(dataset_key.dataset_type_name, set()).add(dataset_key) dataset_runs = dataset_info.setdefault("runs", {}) # make a `DatasetRun` for the specific dataset-run # collection combination. dataset_runs[output_run] = DatasetRun(id=ref.id) # save metadata and logs for easier status interpretation later - if dataset_key.parent_dataset_type_name.endswith("_metadata"): + if dataset_key.dataset_type_name.endswith("_metadata"): quantum_info["metadata"] = dataset_key - if dataset_key.parent_dataset_type_name.endswith("_log"): + if dataset_key.dataset_type_name.endswith("_log"): quantum_info["log"] = dataset_key for ref in itertools.chain.from_iterable(node.quantum.inputs.values()): dataset_key = DatasetKey(ref.datasetType.nameAndComponent()[0], ref.dataId.required_values) @@ -776,12 +820,12 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # if we also have logs, this is a success. if log_dataset_run.produced: quantum_run.status = QuantumRunStatus.SUCCESSFUL - # if we have metadata and no logs, this is a very rare - # case. either the task ran successfully and the datastore - # died immediately afterwards, or some supporting - # infrastructure for transferring the logs to the datastore - # failed. else: + # if we have metadata and no logs, this is a very rare + # case. either the task ran successfully and the datastore + # died immediately afterwards, or some supporting + # infrastructure for transferring the logs to the datastore + # failed. quantum_run.status = QuantumRunStatus.LOGS_MISSING # missing metadata means that the task did not finish. else: @@ -889,7 +933,7 @@ def __resolve_duplicates( The Butler used for this report. This should match the Butler used for the run associated with the executed quantum graph. - collections : `Sequence[str]` | `None` + collections : `Sequence` [`str`] | `None` Collections to use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful. @@ -995,7 +1039,7 @@ def __resolve_duplicates( for dataset_key in self.iter_outputs_of(quantum_key): dataset_info = self.get_dataset_info(dataset_key) quantum_info["messages"].append( - f"{dataset_key.parent_dataset_type_name}" + f"{dataset_key.dataset_type_name}" + f"from {str(dataset_info['runs'])};" + f"{str(dataset_info['status'])}" ) @@ -1024,10 +1068,10 @@ def assemble_quantum_provenance_graph( butler : `lsst.daf.butler.Butler` The Butler used for this report. This should match the Butler used for the run associated with the executed quantum graph. - qgraphs : `Sequence`[`QuantumGraph` | `ResourcePathExpression`] + qgraphs : `Sequence` [`QuantumGraph` | `ResourcePathExpression`] A list of either quantum graph objects or their uri's, to be used to assemble the `QuantumProvenanceGraph`. - collections : `Sequence[str]` | `None` + collections : `Sequence` [`str`] | `None` Collections to use in `lsst.daf.butler.registry.queryDatasets` if paring down the query would be useful. where : `str` @@ -1041,29 +1085,15 @@ def assemble_quantum_provenance_graph( visible and datasets from others will be marked as cursed. """ output_runs = [] - for count, graph in enumerate(qgraphs): + for graph in qgraphs: qgraph = graph if isinstance(graph, QuantumGraph) else QuantumGraph.loadUri(graph) assert qgraph.metadata is not None, "Saved QGs always have metadata." - # If the most recent graph's timestamp was earlier than any of the - # previous graphs, raise a RuntimeError. - if len(qgraphs) > 1: - for previous_graph in qgraphs[: count - 1]: - previous_graph = ( - previous_graph - if isinstance(previous_graph, QuantumGraph) - else QuantumGraph.loadUri(previous_graph) - ) - if qgraph.metadata["time"] < previous_graph.metadata["time"]: - raise RuntimeError( - """add_new_graph may only be called on graphs - which are passed in the order they were - created. Please call again, passing your - graphs in order.""" - ) self.__add_new_graph(butler, qgraph) output_runs.append(qgraph.metadata["output_run"]) # If the user has not passed a `collections` variable if not collections: + # We reverse the order of the associated output runs because the + # query in __resolve_duplicates must be done most recent-first. collections = list(reversed(output_runs)) assert ( not curse_failed_logs @@ -1082,7 +1112,7 @@ def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary: Returns ------- - summary : `Summary` + result : `Summary` A struct containing counts of quanta and datasets in each of the overall states defined in `QuantumInfo` and `DatasetInfo`, as well as diagnostic information and error messages for failed @@ -1137,6 +1167,11 @@ def get_producer_of(self, dataset_key: DatasetKey) -> QuantumKey: ---------- dataset_key : `DatasetKey` The key for the dataset whose producer quantum is needed. + + Returns + ------- + result : `QuantumKey` + The key for the quantum which produced the dataset. """ (result,) = self._xgraph.predecessors(dataset_key) return result diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index f237674a..cdb2c2c9 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -31,7 +31,7 @@ import unittest -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, TaskSummary +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph from lsst.pipe.base.tests import simpleQGraph from lsst.utils.tests import temporaryDirectory @@ -61,20 +61,15 @@ def test_qpg_reports(self) -> None: # We know that we have one expected task that was not run. # As such, the following dictionary should describe all of # the mock tasks. - self.assertEqual( - task_summary, - TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - ) + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_unknown, 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) expected_mock_datasets = [ "add_dataset1", "add2_dataset1", @@ -115,16 +110,16 @@ def test_qpg_reports(self) -> None: self.assertListEqual(dataset_type_summary.cursed_datasets, []) # Make sure we have the right datasets based on our mock self.assertIn(dataset_type_name, expected_mock_datasets) - # Make sure the expected datasets were produced by the expected - # tasks - match dataset_type_name: - case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: - self.assertEqual(dataset_type_summary.producer, "task0") - case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: - self.assertEqual(dataset_type_summary.producer, "task1") - case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: - self.assertEqual(dataset_type_summary.producer, "task2") - case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: - self.assertEqual(dataset_type_summary.producer, "task3") - case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: - self.assertEqual(dataset_type_summary.producer, "task4") + # Make sure the expected datasets were produced by the expected + # tasks + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4")