From 4289fd018249f83eac551a29d36c0045fd004850 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 11 Jan 2025 10:06:35 -0500 Subject: [PATCH 01/10] Fix decorator bug in ExecutionResources pickling. If it has a cls arg, it's a classmethod, not a staticmethod. --- python/lsst/pipe/base/_quantumContext.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index c13b35efb..27ea424cf 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -139,7 +139,7 @@ def _reduce_kwargs(self) -> dict[str, Any]: kwargs["max_mem"] = int(self.max_mem.value) return kwargs - @staticmethod + @classmethod def _unpickle_via_factory( cls: type[ExecutionResources], args: Sequence[Any], kwargs: dict[str, Any] ) -> ExecutionResources: @@ -153,11 +153,11 @@ def _unpickle_via_factory( def __reduce__( self, ) -> tuple[ - Callable[[type[ExecutionResources], Sequence[Any], dict[str, Any]], ExecutionResources], - tuple[type[ExecutionResources], Sequence[Any], dict[str, Any]], + Callable[[Sequence[Any], dict[str, Any]], ExecutionResources], + tuple[Sequence[Any], dict[str, Any]], ]: """Pickler.""" - return self._unpickle_via_factory, (self.__class__, [], self._reduce_kwargs()) + return self._unpickle_via_factory, ([], self._reduce_kwargs()) class QuantumContext: From f0d8888658a0b6d7830dee34e1191bbfd248d4e3 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 11 Jan 2025 10:07:09 -0500 Subject: [PATCH 02/10] Track which outputs are 'put' by QuantumContext. --- python/lsst/pipe/base/_quantumContext.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index 27ea424cf..7a1bc19d3 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -39,7 +39,7 @@ from typing import Any import astropy.units as u -from lsst.daf.butler import DatasetRef, DimensionUniverse, LimitedButler, Quantum +from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, DimensionUniverse, LimitedButler, Quantum from lsst.utils.introspection import get_full_type_name from lsst.utils.logging import PeriodicLogger, getLogger @@ -205,6 +205,7 @@ def __init__( for refs in quantum.outputs.values(): for ref in refs: self.allOutputs.add((ref.datasetType, ref.dataId)) + self.outputsPut: set[tuple[DatasetType, DataCoordinate]] = set() self.__butler = butler def _get(self, ref: DeferredDatasetRef | DatasetRef | None) -> Any: @@ -223,6 +224,7 @@ def _put(self, value: Any, ref: DatasetRef) -> None: """Store data in butler.""" self._checkMembership(ref, self.allOutputs) self.__butler.put(value, ref) + self.outputsPut.add((ref.datasetType, ref.dataId)) def get( self, From 45bdd8a8e5299b209542be9a17fb3b7379d08943 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Mon, 13 Jan 2025 12:32:04 -0500 Subject: [PATCH 03/10] Don't include log and metadata in QuantumContext.allOutputs. --- python/lsst/pipe/base/_quantumContext.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index 7a1bc19d3..abb0235b1 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -43,6 +43,7 @@ from lsst.utils.introspection import get_full_type_name from lsst.utils.logging import PeriodicLogger, getLogger +from .automatic_connection_constants import LOG_OUTPUT_CONNECTION_NAME, METADATA_OUTPUT_CONNECTION_NAME from .connections import DeferredDatasetRef, InputQuantizedConnection, OutputQuantizedConnection from .struct import Struct @@ -202,7 +203,14 @@ def __init__( for refs in quantum.inputs.values(): for ref in refs: self.allInputs.add((ref.datasetType, ref.dataId)) - for refs in quantum.outputs.values(): + for dataset_type, refs in quantum.outputs.items(): + if dataset_type.name.endswith(METADATA_OUTPUT_CONNECTION_NAME) or dataset_type.name.endswith( + LOG_OUTPUT_CONNECTION_NAME + ): + # Don't consider log and metadata datasets to be outputs in + # this context, because we don't want the task to be able to + # write them itself; that's for the execution system to do. + continue for ref in refs: self.allOutputs.add((ref.datasetType, ref.dataId)) self.outputsPut: set[tuple[DatasetType, DataCoordinate]] = set() From 83962f497431b8126c3c96976ec607adb7ca0593 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 11 Jan 2025 10:07:31 -0500 Subject: [PATCH 04/10] Add Flag enum for caveats for "successful" quanta. These will be set in the task metadata by lsst.ctrl.mpexec.SingleQuantumExecutor and interpreted by QuantumProvenaceGraph. Ideally we'll also find a way to pass this information to BPS for use in its reporting as well. --- python/lsst/pipe/base/_status.py | 136 +++++++++++++++++++++++++- tests/test_quantum_success_caveats.py | 63 ++++++++++++ 2 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 tests/test_quantum_success_caveats.py diff --git a/python/lsst/pipe/base/_status.py b/python/lsst/pipe/base/_status.py index 4ef8e6d4b..3bea7d623 100644 --- a/python/lsst/pipe/base/_status.py +++ b/python/lsst/pipe/base/_status.py @@ -28,14 +28,16 @@ from __future__ import annotations import abc +import enum import logging -from typing import Protocol +from typing import ClassVar, Protocol from lsst.utils import introspection from ._task_metadata import GetSetDictMetadata, NestedMetadataDict __all__ = ( + "QuantumSuccessCaveats", "UnprocessableDataError", "AnnotatedPartialOutputsError", "NoWorkFound", @@ -46,6 +48,130 @@ ) +class QuantumSuccessCaveats(enum.Flag): + """Flags that add caveats to a "successful" quantum. + + Quanta can be considered successful even if they do not produce some of + their expected outputs (and even if they do not produce all of their + expected outputs), as long as the condition is sufficiently well understood + that downstream processing should succeed. + """ + + NO_CAVEATS = 0 + """All outputs were produced and no exceptions were raised.""" + + ANY_OUTPUTS_MISSING = enum.auto() + """At least one predicted output was not produced.""" + + ALL_OUTPUTS_MISSING = enum.auto() + """No predicted outputs (except logs and metadata) were produced. + + `ANY_OUTPUTS_MISSING` is also set whenever this flag it set. + """ + + NO_WORK = enum.auto() + """A subclass of `NoWorkFound` was raised. + + This does not necessarily imply that `ANY_OUTPUTS_MISSING` is not set, + since a `PipelineTask.runQuantum` implementation could raise it after + directly writing all of its predicted outputs. + """ + + ADJUST_QUANTUM_RAISED = enum.auto() + """`NoWorkFound` was raised by `PipelineTaskConnnections.adjustQuantum`. + + This indicates that if a new `QuantumGraph` had been generated immediately + before running this quantum, that quantum would not have even been + included, because required inputs that were expected to exist by the time + it was run (in the original `QuantumGraph`) were not actually produced. + + `NO_WORK` and `ALL_OUTPUTS_MISSING` are also set whenever this flag is set. + """ + + UPSTREAM_FAILURE_NO_WORK = enum.auto() + """`UpstreamFailureNoWorkFound` was raised by `PipelineTask.runQuantum`. + + This exception is raised by downstream tasks when an upstream task's + outputs were incomplete in a way that blocks it from running, often + because the upstream task raised `AnnotatedPartialOutputsError`. + + `NO_WORK` is also set whenever this flag is set. + """ + + UNPROCESSABLE_DATA = enum.auto() + """`UnprocessableDataError` was raised by `PipelineTask.runQuantum`. + + `NO_WORK` is also set whenever this flag is set. + """ + + PARTIAL_OUTPUTS_ERROR = enum.auto() + """`AnnotatedPartialOutputsError` was raised by `PipelineTask.runQuantum` + and the execution system was instructed to consider this a qualified + success. + """ + + @classmethod + def from_adjust_quantum_no_work(cls) -> QuantumSuccessCaveats: + """Return the set of flags appropriate for a quantum for which + `PipelineTaskConnections.adjustdQuantum` raised `NoWorkFound`. + """ + return cls.NO_WORK | cls.ADJUST_QUANTUM_RAISED | cls.ANY_OUTPUTS_MISSING | cls.ALL_OUTPUTS_MISSING + + def concise(self) -> str: + """Return a concise string representation of the flags. + + Returns + ------- + s : `str` + Two-character string representation, with the first character + indicating whether any predicted outputs were missing and the + second representing any exceptions raised. This representation is + not always complete; some rare combinations of flags are displayed + as if only one of the flags was set. + + Notes + ----- + The `legend` method returns a description of the returned codes. + """ + char1 = "" + if self & QuantumSuccessCaveats.ALL_OUTPUTS_MISSING: + char1 = "*" + elif self & QuantumSuccessCaveats.ANY_OUTPUTS_MISSING: + char1 = "+" + char2 = "" + if self & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED: + char2 = "A" + elif self & QuantumSuccessCaveats.UNPROCESSABLE_DATA: + char2 = "D" + elif self & QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK: + char2 = "U" + elif self & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR: + char2 = "P" + elif self & QuantumSuccessCaveats.NO_WORK: + char2 = "N" + return char1 + char2 + + @staticmethod + def legend() -> dict[str, str]: + """Return a `dict` with human-readable descriptions of the characters + used in `concise`. + + Returns + ------- + legend : `dict` [ `str`, `str` ] + Mapping from character code to description. + """ + return { + "+": "at least one predicted output is missing, but not all", + "*": "all predicated outputs were missing (besides logs and metadata)", + "A": "adjustQuantum raised NoWorkFound; an updated QG would not include this quantum", + "D": "algorithm considers data too bad to be processable", + "U": "one or more input dataset as incomplete due to an upstream failure", + "P": "task failed but wrote partial outputs; considered a partial success", + "N": "runQuantum raised NoWorkFound", + } + + class GetSetDictMetadataHolder(Protocol): """Protocol for objects that have a ``metadata`` attribute that satisfies `GetSetDictMetadata`. @@ -67,6 +193,8 @@ class NoWorkFound(BaseException): logic to trap it. """ + FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK + class UpstreamFailureNoWorkFound(NoWorkFound): """A specialization of `NoWorkFound` that indicates that an upstream task @@ -74,6 +202,8 @@ class UpstreamFailureNoWorkFound(NoWorkFound): from bringing down an entire visit). """ + FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK + class RepeatableQuantumError(RuntimeError): """Exception that may be raised by PipelineTasks (and code they delegate @@ -145,6 +275,8 @@ class UnprocessableDataError(NoWorkFound): situation. """ + FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UNPROCESSABLE_DATA + class AnnotatedPartialOutputsError(RepeatableQuantumError): """Exception that runQuantum raises when the (partial) outputs it has @@ -161,6 +293,8 @@ class AnnotatedPartialOutputsError(RepeatableQuantumError): invalidate any outputs that are already written. """ + FLAGS: ClassVar = QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR + @classmethod def annotate( cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger diff --git a/tests/test_quantum_success_caveats.py b/tests/test_quantum_success_caveats.py new file mode 100644 index 000000000..5ac022706 --- /dev/null +++ b/tests/test_quantum_success_caveats.py @@ -0,0 +1,63 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from lsst.pipe.base import QuantumSuccessCaveats + + +class QuantumSuccessCaveatsTestCase(unittest.TestCase): + """Tests for the QuantumSuccessCaveats flag enum.""" + + def test_from_adjust_quantum_no_work(self): + """Test caveats for the case where adjustQuantum raises NoWorkFound.""" + caveats = QuantumSuccessCaveats.from_adjust_quantum_no_work() + self.assertEqual(caveats.concise(), "*A") + self.assertLessEqual(set(caveats.concise()), caveats.legend().keys()) + + def test_concise(self): + """Test the concise representation of caveats.""" + caveats = QuantumSuccessCaveats.ANY_OUTPUTS_MISSING | QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK + self.assertEqual(caveats.concise(), "+U") + self.assertLessEqual(set(caveats.concise()), caveats.legend().keys()) + caveats = ( + QuantumSuccessCaveats.ALL_OUTPUTS_MISSING + | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING + | QuantumSuccessCaveats.UNPROCESSABLE_DATA + ) + self.assertEqual(caveats.concise(), "*D") + self.assertLessEqual(set(caveats.concise()), caveats.legend().keys()) + caveats = QuantumSuccessCaveats.ANY_OUTPUTS_MISSING | QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR + self.assertEqual(caveats.concise(), "+P") + self.assertLessEqual(set(caveats.concise()), caveats.legend().keys()) + caveats = QuantumSuccessCaveats.NO_WORK + self.assertEqual(caveats.concise(), "N") + self.assertLessEqual(set(caveats.concise()), caveats.legend().keys()) + + +if __name__ == "__main__": + unittest.main() From 113e8486ef008c6543f826f2ad2d2351dc3406cc Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 11 Jan 2025 10:18:25 -0500 Subject: [PATCH 05/10] Fix loop-variable bug in QPG. --- python/lsst/pipe/base/quantum_provenance_graph.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index e2674cfae..063fcf662 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -1053,6 +1053,7 @@ def __resolve_duplicates( # Loop over each dataset in the outputs of a single quantum. for dataset_key in self.iter_outputs_of(quantum_key): dataset_info = self.get_dataset_info(dataset_key) + dataset_type_name = dataset_key.dataset_type_name visible_runs.update( run for run, dataset_run in dataset_info["runs"].items() if dataset_run.visible ) From 685158cf77807db3577a45244dbf702ddff90fdb Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 11 Jan 2025 11:00:52 -0500 Subject: [PATCH 06/10] Include success caveats in QPG. --- .../pipe/base/quantum_provenance_graph.py | 110 +++++++++++++++--- 1 file changed, 97 insertions(+), 13 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 063fcf662..16db86c8d 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -48,10 +48,11 @@ import networkx import pydantic -from lsst.daf.butler import Butler, DataCoordinate, DataIdValue +from lsst.daf.butler import Butler, DataCoordinate, DataIdValue, DatasetRef from lsst.resources import ResourcePathExpression from lsst.utils.logging import getLogger +from ._status import QuantumSuccessCaveats from .graph import QuantumGraph if TYPE_CHECKING: @@ -160,6 +161,8 @@ class QuantumRunStatus(Enum): class QuantumRun(pydantic.BaseModel): """Information about a quantum in a given run collection.""" + model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) # for DatasetRef attrs. + id: uuid.UUID """The quantum graph node ID associated with the dataId in a specific run. """ @@ -168,6 +171,19 @@ class QuantumRun(pydantic.BaseModel): """The status of the quantum in that run. """ + caveats: QuantumSuccessCaveats | None = None + """Flags that describe possibly-qualified successes. + + This is `None` when `status` is not `SUCCESSFUL` or `LOGS_MISSING`. It + may also be `None` if metadata was not loaded or had no success flags. + """ + + metadata_ref: DatasetRef + """Predicted DatasetRef for the metadata dataset.""" + + log_ref: DatasetRef + """Predicted DatasetRef for the log dataset.""" + class QuantumInfoStatus(Enum): """The status of a quantum (a particular task run on a particular dataID) @@ -232,6 +248,13 @@ class QuantumInfo(TypedDict): wonky state. """ + caveats: QuantumSuccessCaveats | None + """Flags that describe possibly-qualified successes. + + This is `None` when `status` is not `SUCCESSFUL`. It may also be `None` + if metadata was not loaded or had no success flags. + """ + recovered: bool """The quantum was originally not successful but was ultimately successful. """ @@ -417,6 +440,17 @@ def n_failed(self) -> int: """Return a count of `failed` quanta.""" return len(self.failed_quanta) + caveats: dict[str, list[dict[str, DataIdValue]]] = pydantic.Field(default_factory=dict) + """Quanta that were successful with caveats. + + Keys are 2-character codes returned by `QuantumSuccessCaveats.concise`; + values are lists of data IDs of quanta with those caveats. Quanta that were + unqualified successes are not included. + + Quanta for which success flags were not read from metadata will not be + included. + """ + failed_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list) """A list of all `UnsuccessfulQuantumSummary` objects associated with the FAILED quanta. This is a report containing their data IDs, the status @@ -458,23 +492,27 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo self.n_successful += 1 if info["recovered"]: self.recovered_quanta.append(dict(info["data_id"].required)) + if caveats := info["caveats"]: + code = caveats.concise() + self.caveats.setdefault(code, []).append(dict(info["data_id"].required)) case QuantumInfoStatus.WONKY: self.wonky_quanta.append(UnsuccessfulQuantumSummary.from_info(info)) case QuantumInfoStatus.BLOCKED: self.n_blocked += 1 case QuantumInfoStatus.FAILED: failed_quantum_summary = UnsuccessfulQuantumSummary.from_info(info) - log_key = info["log"] if do_store_logs: - for run in info["runs"]: + for quantum_run in info["runs"].values(): try: - # should probably upgrade this to use a dataset - # ref - log = butler.get(log_key.dataset_type_name, info["data_id"], collections=run) + log = butler.get(quantum_run.log_ref) except LookupError: - failed_quantum_summary.messages.append(f"Logs not ingested for {run!r}") + failed_quantum_summary.messages.append( + f"Logs not ingested for {quantum_run.log_ref!r}" + ) except FileNotFoundError: - failed_quantum_summary.messages.append(f"Logs missing or corrupt for {run!r}") + failed_quantum_summary.messages.append( + f"Logs missing or corrupt for {quantum_run.log_ref!r}" + ) else: failed_quantum_summary.messages.extend( [record.message for record in log if record.levelno >= logging.ERROR] @@ -498,7 +536,8 @@ def add_data_id_group(self, other_summary: TaskSummary) -> None: self.n_blocked += other_summary.n_blocked self.n_unknown += other_summary.n_unknown self.n_expected += other_summary.n_expected - + for code in self.caveats.keys() | other_summary.caveats.keys(): + self.caveats.setdefault(code, []).extend(other_summary.caveats.get(code, [])) self.wonky_quanta.extend(other_summary.wonky_quanta) self.recovered_quanta.extend(other_summary.recovered_quanta) self.failed_quanta.extend(other_summary.failed_quanta) @@ -775,7 +814,12 @@ def get_dataset_info(self, key: DatasetKey) -> DatasetInfo: """ return self._xgraph.nodes[key] - def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None: + def __add_new_graph( + self, + butler: Butler, + qgraph: QuantumGraph | ResourcePathExpression, + read_caveats: Literal["lazy", "exhaustive"] | None, + ) -> None: """Add a new quantum graph to the `QuantumProvenanceGraph`. Notes @@ -803,10 +847,15 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp butler : `lsst.daf.butler.Butler` The Butler used for this report. This should match the Butler used for the run associated with the executed quantum graph. - qgraph : `QuantumGraph` | `ResourcePathExpression` Either the associated quantum graph object or the uri of the location of said quantum graph. + read_caveats : `str` or `None` + Whether to read metadata files to get flags that describe qualified + successes. If `None`, no metadata files will be read and all + ``caveats`` fields will be `None`. If "exhaustive", all + metadata files will be read. If "lazy", only metadata files where + at least one predicted output is missing will be read. """ # first we load the quantum graph and associated output run collection if not isinstance(qgraph, QuantumGraph): @@ -831,13 +880,15 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp quantum_info.setdefault("recovered", False) new_quanta.append(quantum_key) self._quanta.setdefault(quantum_key.task_label, set()).add(quantum_key) + metadata_ref = node.quantum.outputs[f"{node.taskDef.label}_metadata"][0] + log_ref = node.quantum.outputs[f"{node.taskDef.label}_log"][0] # associate run collections with specific quanta. this is important # if the same quanta are processed in multiple runs as in recovery # workflows. quantum_runs = quantum_info.setdefault("runs", {}) # the `QuantumRun` here is the specific quantum-run collection # combination. - quantum_runs[output_run] = QuantumRun(id=node.nodeId) + quantum_runs[output_run] = QuantumRun(id=node.nodeId, metadata_ref=metadata_ref, log_ref=log_ref) # For each of the outputs of the quanta (datasets) make a key to # refer to the dataset. for ref in itertools.chain.from_iterable(node.quantum.outputs.values()): @@ -859,8 +910,10 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # save metadata and logs for easier status interpretation later if dataset_key.dataset_type_name.endswith("_metadata"): quantum_info["metadata"] = dataset_key + quantum_runs[output_run].metadata_ref = ref if dataset_key.dataset_type_name.endswith("_log"): quantum_info["log"] = dataset_key + quantum_runs[output_run].log_ref = ref for ref in itertools.chain.from_iterable(node.quantum.inputs.values()): dataset_key = DatasetKey(ref.datasetType.nameAndComponent()[0], ref.dataId.required_values) if dataset_key in self._xgraph: @@ -897,6 +950,19 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # infrastructure for transferring the logs to the datastore # failed. quantum_run.status = QuantumRunStatus.LOGS_MISSING + # If requested, read caveats from metadata. + if read_caveats == "exhaustive" or ( + read_caveats == "lazy" + and not all( + self.get_dataset_info(dataset_key)["runs"][output_run].produced + for dataset_key in self._xgraph.successors(quantum_key) + ) + ): + md = butler.get(quantum_run.metadata_ref) + try: + quantum_run.caveats = QuantumSuccessCaveats(md["quantum"]["caveats"]) + except LookupError: + pass # missing metadata means that the task did not finish. else: # if we have logs and no metadata, the task not finishing is @@ -929,6 +995,7 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp # A quantum can never escape a WONKY state. case (QuantumInfoStatus.WONKY, _): new_status = QuantumInfoStatus.WONKY + quantum_info["caveats"] = None # Any transition to a success (excluding from WONKY) is # a success; any transition from a failed state is also a # recovery. @@ -976,6 +1043,10 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp new_status = QuantumInfoStatus.FAILED # Update `QuantumInfo.status` for this quantum. quantum_info["status"] = new_status + if new_status is QuantumInfoStatus.SUCCESSFUL: + quantum_info["caveats"] = quantum_run.caveats + else: + quantum_info["caveats"] = None def __resolve_duplicates( self, @@ -1107,6 +1178,7 @@ def __resolve_duplicates( quantum_info["messages"].append( f"Outputs from different runs of the same quanta were visible: {visible_runs}." ) + quantum_info["caveats"] = None for dataset_key in self.iter_outputs_of(quantum_key): dataset_info = self.get_dataset_info(dataset_key) quantum_info["messages"].append( @@ -1125,6 +1197,7 @@ def assemble_quantum_provenance_graph( collections: Sequence[str] | None = None, where: str = "", curse_failed_logs: bool = False, + read_caveats: Literal["lazy", "exhaustive"] | None = "exhaustive", ) -> None: """Assemble the quantum provenance graph from a list of all graphs corresponding to processing attempts. @@ -1154,12 +1227,23 @@ def assemble_quantum_provenance_graph( `__resolve_duplicates` is run on a list of group-level collections then each will only show log datasets from their own failures as visible and datasets from others will be marked as cursed. + read_caveats : `str` or `None`, optional + Whether to read metadata files to get flags that describe qualified + successes. If `None`, no metadata files will be read and all + ``caveats`` fields will be `None`. If "exhaustive", all + metadata files will be read. If "lazy", only metadata files where + at least one predicted output is missing will be read. """ + if read_caveats not in ("lazy", "exhaustive", None): + raise TypeError( + f"Invalid option {read_caveats!r} for read_caveats; " + "should be 'lazy', 'exhaustive', or None." + ) output_runs = [] for graph in qgraphs: qgraph = graph if isinstance(graph, QuantumGraph) else QuantumGraph.loadUri(graph) assert qgraph.metadata is not None, "Saved QGs always have metadata." - self.__add_new_graph(butler, qgraph) + self.__add_new_graph(butler, qgraph, read_caveats=read_caveats) output_runs.append(qgraph.metadata["output_run"]) # If the user has not passed a `collections` variable if not collections: From cb0f2a26300df0184976300db9552c3d8ba20904 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 14 Jan 2025 15:50:57 -0500 Subject: [PATCH 07/10] Add properties to QuantumProvenanceGraph. --- python/lsst/pipe/base/quantum_provenance_graph.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 16db86c8d..098984185 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -42,7 +42,7 @@ import itertools import logging import uuid -from collections.abc import Iterator, Sequence +from collections.abc import Iterator, Mapping, Sequence, Set from enum import Enum from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, TypedDict, cast @@ -780,6 +780,16 @@ def __init__(self) -> None: # to True when resolve_duplicates completes. self._finalized: bool = False + @property + def quanta(self) -> Mapping[str, Set[QuantumKey]]: + """A mapping from task label to a set of keys for its quanta.""" + return self._quanta + + @property + def datasets(self) -> Mapping[str, Set[DatasetKey]]: + """A mapping from dataset type name to a set of keys for datasets.""" + return self._datasets + def get_quantum_info(self, key: QuantumKey) -> QuantumInfo: """Get a `QuantumInfo` object from the `QuantumProvenanceGraph` using a `QuantumKey`. From 9a3219c8678188ee0172ed15dc17887fc087cfba Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 14 Jan 2025 15:48:41 -0500 Subject: [PATCH 08/10] Add iter_downstream method to QuantumProvenanceGraph. --- .../pipe/base/quantum_provenance_graph.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 098984185..d169fb87b 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -1340,3 +1340,23 @@ def get_producer_of(self, dataset_key: DatasetKey) -> QuantumKey: """ (result,) = self._xgraph.predecessors(dataset_key) return result + + def iter_downstream( + self, key: QuantumKey | DatasetKey + ) -> Iterator[tuple[QuantumKey, QuantumInfo] | tuple[DatasetKey, DatasetInfo]]: + """Iterate over the quanta and datasets that are downstream of a + quantum or dataset. + + Parameters + ---------- + key : `QuantumKey` or `DatasetKey` + Starting node. + + Returns + ------- + iter : `~collections.abc.Iterator` [ `tuple` ] + An iterator over pairs of (`QuantumKey`, `QuantumInfo`) or + (`DatasetKey`, `DatasetInfo`). + """ + for key in networkx.dag.descendants(self._xgraph, key): + yield (key, self._xgraph.nodes[key]) # type: ignore From d601a8e8fb009028173e1f08b7e99d60e9a62bad Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Wed, 15 Jan 2025 09:42:04 -0500 Subject: [PATCH 09/10] Add changelog entry. --- doc/changes/DM-47730.feature.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 doc/changes/DM-47730.feature.md diff --git a/doc/changes/DM-47730.feature.md b/doc/changes/DM-47730.feature.md new file mode 100644 index 000000000..445420f9a --- /dev/null +++ b/doc/changes/DM-47730.feature.md @@ -0,0 +1,4 @@ +Add the `QuantumSuccessCaveats` flag enum, which can be used to report on `NoWorkFound` and other qualified successes in execution. + +This adds the flag enum itself and functionality in `QuantumProvenanceGraph` (which backs `pipetask report --force-v2`) to include it in reports. +It relies on additional changes in `lsst.ctrl.mpexec.SingleQuantumExecutor` to write the caveat flags into task metadata. From 5810ee95502648bf59bfd6cf49e6733cb17b7af9 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 16 Jan 2025 08:43:53 -0500 Subject: [PATCH 10/10] Guard against spurious int->float conversions in TaskMetadata. --- python/lsst/pipe/base/quantum_provenance_graph.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index d169fb87b..32c1d131a 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -970,7 +970,10 @@ def __add_new_graph( ): md = butler.get(quantum_run.metadata_ref) try: - quantum_run.caveats = QuantumSuccessCaveats(md["quantum"]["caveats"]) + # Int conversion guards against spurious conversion to + # float that can apparently sometimes happen in + # TaskMetadata. + quantum_run.caveats = QuantumSuccessCaveats(int(md["quantum"]["caveats"])) except LookupError: pass # missing metadata means that the task did not finish.