Skip to content

Commit

Permalink
Merge pull request #463 from lsst/tickets/DM-47730
Browse files Browse the repository at this point in the history
DM-47730: add "success caveats" system to report on NoWorkFound and similar cases
  • Loading branch information
TallJimbo authored Jan 21, 2025
2 parents 4e4daad + 5810ee9 commit 0c32b46
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 21 deletions.
4 changes: 4 additions & 0 deletions doc/changes/DM-47730.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add the `QuantumSuccessCaveats` flag enum, which can be used to report on `NoWorkFound` and other qualified successes in execution.

This adds the flag enum itself and functionality in `QuantumProvenanceGraph` (which backs `pipetask report --force-v2`) to include it in reports.
It relies on additional changes in `lsst.ctrl.mpexec.SingleQuantumExecutor` to write the caveat flags into task metadata.
22 changes: 16 additions & 6 deletions python/lsst/pipe/base/_quantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
from typing import Any

import astropy.units as u
from lsst.daf.butler import DatasetRef, DimensionUniverse, LimitedButler, Quantum
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, DimensionUniverse, LimitedButler, Quantum
from lsst.utils.introspection import get_full_type_name
from lsst.utils.logging import PeriodicLogger, getLogger

from .automatic_connection_constants import LOG_OUTPUT_CONNECTION_NAME, METADATA_OUTPUT_CONNECTION_NAME
from .connections import DeferredDatasetRef, InputQuantizedConnection, OutputQuantizedConnection
from .struct import Struct

Expand Down Expand Up @@ -139,7 +140,7 @@ def _reduce_kwargs(self) -> dict[str, Any]:
kwargs["max_mem"] = int(self.max_mem.value)
return kwargs

@staticmethod
@classmethod
def _unpickle_via_factory(
cls: type[ExecutionResources], args: Sequence[Any], kwargs: dict[str, Any]
) -> ExecutionResources:
Expand All @@ -153,11 +154,11 @@ def _unpickle_via_factory(
def __reduce__(
self,
) -> tuple[
Callable[[type[ExecutionResources], Sequence[Any], dict[str, Any]], ExecutionResources],
tuple[type[ExecutionResources], Sequence[Any], dict[str, Any]],
Callable[[Sequence[Any], dict[str, Any]], ExecutionResources],
tuple[Sequence[Any], dict[str, Any]],
]:
"""Pickler."""
return self._unpickle_via_factory, (self.__class__, [], self._reduce_kwargs())
return self._unpickle_via_factory, ([], self._reduce_kwargs())


class QuantumContext:
Expand Down Expand Up @@ -202,9 +203,17 @@ def __init__(
for refs in quantum.inputs.values():
for ref in refs:
self.allInputs.add((ref.datasetType, ref.dataId))
for refs in quantum.outputs.values():
for dataset_type, refs in quantum.outputs.items():
if dataset_type.name.endswith(METADATA_OUTPUT_CONNECTION_NAME) or dataset_type.name.endswith(
LOG_OUTPUT_CONNECTION_NAME
):
# Don't consider log and metadata datasets to be outputs in
# this context, because we don't want the task to be able to
# write them itself; that's for the execution system to do.
continue
for ref in refs:
self.allOutputs.add((ref.datasetType, ref.dataId))
self.outputsPut: set[tuple[DatasetType, DataCoordinate]] = set()
self.__butler = butler

def _get(self, ref: DeferredDatasetRef | DatasetRef | None) -> Any:
Expand All @@ -223,6 +232,7 @@ def _put(self, value: Any, ref: DatasetRef) -> None:
"""Store data in butler."""
self._checkMembership(ref, self.allOutputs)
self.__butler.put(value, ref)
self.outputsPut.add((ref.datasetType, ref.dataId))

def get(
self,
Expand Down
136 changes: 135 additions & 1 deletion python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
from __future__ import annotations

import abc
import enum
import logging
from typing import Protocol
from typing import ClassVar, Protocol

from lsst.utils import introspection

from ._task_metadata import GetSetDictMetadata, NestedMetadataDict

__all__ = (
"QuantumSuccessCaveats",
"UnprocessableDataError",
"AnnotatedPartialOutputsError",
"NoWorkFound",
Expand All @@ -46,6 +48,130 @@
)


class QuantumSuccessCaveats(enum.Flag):
"""Flags that add caveats to a "successful" quantum.
Quanta can be considered successful even if they do not produce some of
their expected outputs (and even if they do not produce all of their
expected outputs), as long as the condition is sufficiently well understood
that downstream processing should succeed.
"""

NO_CAVEATS = 0
"""All outputs were produced and no exceptions were raised."""

ANY_OUTPUTS_MISSING = enum.auto()
"""At least one predicted output was not produced."""

ALL_OUTPUTS_MISSING = enum.auto()
"""No predicted outputs (except logs and metadata) were produced.
`ANY_OUTPUTS_MISSING` is also set whenever this flag it set.
"""

NO_WORK = enum.auto()
"""A subclass of `NoWorkFound` was raised.
This does not necessarily imply that `ANY_OUTPUTS_MISSING` is not set,
since a `PipelineTask.runQuantum` implementation could raise it after
directly writing all of its predicted outputs.
"""

ADJUST_QUANTUM_RAISED = enum.auto()
"""`NoWorkFound` was raised by `PipelineTaskConnnections.adjustQuantum`.
This indicates that if a new `QuantumGraph` had been generated immediately
before running this quantum, that quantum would not have even been
included, because required inputs that were expected to exist by the time
it was run (in the original `QuantumGraph`) were not actually produced.
`NO_WORK` and `ALL_OUTPUTS_MISSING` are also set whenever this flag is set.
"""

UPSTREAM_FAILURE_NO_WORK = enum.auto()
"""`UpstreamFailureNoWorkFound` was raised by `PipelineTask.runQuantum`.
This exception is raised by downstream tasks when an upstream task's
outputs were incomplete in a way that blocks it from running, often
because the upstream task raised `AnnotatedPartialOutputsError`.
`NO_WORK` is also set whenever this flag is set.
"""

UNPROCESSABLE_DATA = enum.auto()
"""`UnprocessableDataError` was raised by `PipelineTask.runQuantum`.
`NO_WORK` is also set whenever this flag is set.
"""

PARTIAL_OUTPUTS_ERROR = enum.auto()
"""`AnnotatedPartialOutputsError` was raised by `PipelineTask.runQuantum`
and the execution system was instructed to consider this a qualified
success.
"""

@classmethod
def from_adjust_quantum_no_work(cls) -> QuantumSuccessCaveats:
"""Return the set of flags appropriate for a quantum for which
`PipelineTaskConnections.adjustdQuantum` raised `NoWorkFound`.
"""
return cls.NO_WORK | cls.ADJUST_QUANTUM_RAISED | cls.ANY_OUTPUTS_MISSING | cls.ALL_OUTPUTS_MISSING

def concise(self) -> str:
"""Return a concise string representation of the flags.
Returns
-------
s : `str`
Two-character string representation, with the first character
indicating whether any predicted outputs were missing and the
second representing any exceptions raised. This representation is
not always complete; some rare combinations of flags are displayed
as if only one of the flags was set.
Notes
-----
The `legend` method returns a description of the returned codes.
"""
char1 = ""
if self & QuantumSuccessCaveats.ALL_OUTPUTS_MISSING:
char1 = "*"
elif self & QuantumSuccessCaveats.ANY_OUTPUTS_MISSING:
char1 = "+"
char2 = ""
if self & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED:
char2 = "A"
elif self & QuantumSuccessCaveats.UNPROCESSABLE_DATA:
char2 = "D"
elif self & QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK:
char2 = "U"
elif self & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR:
char2 = "P"
elif self & QuantumSuccessCaveats.NO_WORK:
char2 = "N"
return char1 + char2

@staticmethod
def legend() -> dict[str, str]:
"""Return a `dict` with human-readable descriptions of the characters
used in `concise`.
Returns
-------
legend : `dict` [ `str`, `str` ]
Mapping from character code to description.
"""
return {
"+": "at least one predicted output is missing, but not all",
"*": "all predicated outputs were missing (besides logs and metadata)",
"A": "adjustQuantum raised NoWorkFound; an updated QG would not include this quantum",
"D": "algorithm considers data too bad to be processable",
"U": "one or more input dataset as incomplete due to an upstream failure",
"P": "task failed but wrote partial outputs; considered a partial success",
"N": "runQuantum raised NoWorkFound",
}


class GetSetDictMetadataHolder(Protocol):
"""Protocol for objects that have a ``metadata`` attribute that satisfies
`GetSetDictMetadata`.
Expand All @@ -67,13 +193,17 @@ class NoWorkFound(BaseException):
logic to trap it.
"""

FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK


class UpstreamFailureNoWorkFound(NoWorkFound):
"""A specialization of `NoWorkFound` that indicates that an upstream task
had a problem that was ignored (e.g. to prevent a single-detector failure
from bringing down an entire visit).
"""

FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK


class RepeatableQuantumError(RuntimeError):
"""Exception that may be raised by PipelineTasks (and code they delegate
Expand Down Expand Up @@ -145,6 +275,8 @@ class UnprocessableDataError(NoWorkFound):
situation.
"""

FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UNPROCESSABLE_DATA


class AnnotatedPartialOutputsError(RepeatableQuantumError):
"""Exception that runQuantum raises when the (partial) outputs it has
Expand All @@ -161,6 +293,8 @@ class AnnotatedPartialOutputsError(RepeatableQuantumError):
invalidate any outputs that are already written.
"""

FLAGS: ClassVar = QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR

@classmethod
def annotate(
cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger
Expand Down
Loading

0 comments on commit 0c32b46

Please sign in to comment.