Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-29761: start saving per-quantum provenance and propagating nothing-to-do cases #183

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion python/lsst/pipe/base/connectionTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import dataclasses
import typing
from typing import Callable, Iterable, Optional
from typing import Callable, ClassVar, Iterable, Optional

from lsst.daf.butler import (
CollectionSearch,
Expand Down Expand Up @@ -194,6 +194,12 @@ class BaseInput(DimensionedConnection):
"""
deferLoad: bool = False

optional: ClassVar[bool] = False
"""Quanta are not generated and never (fully) executed when there are no
datasets for a non-optional input connection, and most input connections
cannot be made optional.
"""


@dataclasses.dataclass(frozen=True)
class Input(BaseInput):
Expand Down Expand Up @@ -225,6 +231,12 @@ class PrerequisiteInput(BaseInput):
using the DatasetType, registry, quantum dataId, and input collections
passed to it. If no function is specified, the default temporal spatial
lookup will be used.
optional : `bool`, optional
If `True` (`False` is default), generate quanta for this task even
when no input datasets for this connection exist. Missing optional
inputs will cause `ButlerQuantumContext.get` to return `None` or a
smaller container (never a same-size container with some `None`
elements) during execution.

Notes
-----
Expand All @@ -249,10 +261,12 @@ class PrerequisiteInput(BaseInput):
between dimensions are not desired (e.g. a task that wants all detectors
in each visit for which the visit overlaps a tract, not just those where
that detector+visit combination overlaps the tract).
- Prerequisite inputs may be optional (regular inputs are never optional).

"""
lookupFunction: Optional[Callable[[DatasetType, Registry, DataCoordinate, CollectionSearch],
Iterable[DatasetRef]]] = None
optional: bool = False


@dataclasses.dataclass(frozen=True)
Expand Down
130 changes: 110 additions & 20 deletions python/lsst/pipe/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@

from . import config as configMod
from .connectionTypes import (InitInput, InitOutput, Input, PrerequisiteInput,
Output, BaseConnection)
from lsst.daf.butler import DatasetRef, DatasetType, NamedKeyDict, Quantum
Output, BaseConnection, BaseInput)
from .executed_quantum import NoWorkQuantum
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, NamedKeyMapping, Quantum

if typing.TYPE_CHECKING:
from .config import PipelineTaskConfig
Expand Down Expand Up @@ -194,8 +195,8 @@ def __init__(cls, name, bases, dct, **kwargs):


class QuantizedConnection(SimpleNamespace):
"""A Namespace to map defined variable names of connections to their
`lsst.daf.buter.DatasetRef`s
"""A Namespace to map defined variable names of connections to the
associated `lsst.daf.butler.DatasetRef` objects.

This class maps the names used to define a connection on a
PipelineTaskConnectionsClass to the corresponding
Expand Down Expand Up @@ -456,26 +457,38 @@ def buildDatasetRefs(self, quantum: Quantum) -> typing.Tuple[InputQuantizedConne
"in input quantum")
return inputDatasetRefs, outputDatasetRefs

def adjustQuantum(self, datasetRefMap: NamedKeyDict[DatasetType, typing.Set[DatasetRef]]
) -> NamedKeyDict[DatasetType, typing.Set[DatasetRef]]:
def adjustQuantum(
self,
inputs: Iterable[typing.Tuple[str, BaseInput, typing.FrozenSet[DatasetRef]]],
label: str,
dataId: DataCoordinate,
) -> typing.Iterable[typing.Tuple[str, BaseInput, typing.AbstractSet[DatasetRef]]]:
"""Override to make adjustments to `lsst.daf.butler.DatasetRef` objects
in the `lsst.daf.butler.core.Quantum` during the graph generation stage
of the activator.

The base class implementation simply checks that input connections with
``multiple`` set to `False` have no more than one dataset.

Parameters
----------
datasetRefMap : `NamedKeyDict`
Mapping from dataset type to a `set` of
`lsst.daf.butler.DatasetRef` objects
inputs : `Iterable` of `tuple`
Three-element tuples, each a connection name, the connection
instance, and a `frozenset` of associated
`lsst.daf.butler.DatasetRef` objects, for all input and
prerequisite input connections. Guaranteed to support multi-pass
iteration.
label : `str`
Label for this task in the pipeline (should be used in all
diagnostic messages).
dataId : `lsst.daf.butler.DataCoordintae`
Data ID for this quantum in the pipeline (should be used in all
diagnostic messages).

Returns
-------
datasetRefMap : `NamedKeyDict`
Modified mapping of input with possibly adjusted
`lsst.daf.butler.DatasetRef` objects.
adjusted : `Iterable` of `tuple`
Iterable of tuples of the same form as ``inputs``, with adjusted
sets of `lsst.daf.butler.DatasetRef` objects (datasets may be
removed, but not added). Connections not returned at all will be
considered to be unchanged.

Raises
------
Expand All @@ -486,16 +499,93 @@ def adjustQuantum(self, datasetRefMap: NamedKeyDict[DatasetType, typing.Set[Data
Overrides of this function have the option of raising an Exception
if a field in the input does not satisfy a need for a corresponding
pipelineTask, i.e. no reference catalogs are found.
NoWorkQuantum
Raised to indicate that this quantum should not be run; one or more
of its expected inputs do not exist, and if possible, should be
pruned from the QuantumGraph.

Notes
-----
The base class implementation performs useful checks and should be
called via `super` by most custom implementations. It always returns
an empty iterable, because it makes no changes.
"""
for connection in itertools.chain(iterConnections(self, "inputs"),
iterConnections(self, "prerequisiteInputs")):
refs = datasetRefMap[connection.name]
for name, connection, refs in inputs:
if not connection.multiple and len(refs) > 1:
raise ScalarError(
f"Found multiple datasets {', '.join(str(r.dataId) for r in refs)} "
f"for scalar connection {connection.name} ({refs[0].datasetType.name})."
f"for scalar connection {label}.{name} ({connection.name}) "
f"for quantum data ID {dataId}."
)
if not connection.optional and not refs:
if isinstance(connection, PrerequisiteInput):
# This branch should only be possible during QG generation,
# or if someone deleted the dataset between making the QG
# and trying to run it. Either one should be a hard error.
raise FileNotFoundError(
f"No datasets found for non-optional connection {label}.{name} ({connection.name}) "
f"for quantum data ID {dataId}."
)
else:
# This branch should be impossible during QG generation,
# because that algorithm can only make quanta whose inputs
# are either already present or should be created during
# execution. It can trigger during execution if the input
# wasn't actually created by an upstream task in the same
# graph.
raise NoWorkQuantum(label, name, connection)
return ()

def translateAdjustQuantumInputs(
self,
datasets: NamedKeyMapping[DatasetType, typing.Set[DatasetRef]],
) -> typing.List[typing.Tuple[str, BaseInput, typing.FrozenSet[DatasetRef]]]:
"""Translate a mapping of input datasets keyed on dataset type to the
form expected by the ``input`` argument to `adjustQuantum`.

Parameters
----------
datasets : `lsst.daf.butler.NamedKeyMapping`
Mapping from `DatasetType` to a set of `DatasetRef`. Need not
include all input dataset types; those missing will be mapped to
empty sets in the result.

Returns
-------
translated : `list` of `tuple`
List of 3-element tuples of the form expected as the ``inputs``
argument to `adjustQuantum`. Includes all input and prerequisite
inputs, even there are no associated datasets.
"""
results = []
for connectionName in itertools.chain(self.inputs, self.prerequisiteInputs):
connectionInstance = getattr(self, connectionName)
results.append(
(
connectionName,
connectionInstance,
frozenset(datasets.get(connectionInstance.name, frozenset())),
)
return datasetRefMap
)
return results

def hasPostWriteLogic(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ , I just never know what to do with boolean flag getters; I feel like "starts with verb" implies "it's a method", but there's pretty much no other way to name boolean properties. I'm not even consistent about that within this commit.

Everybody else in the world, please just agree on what we should do in this case and I'll happily follow along.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the implied rest of the question "and not a property"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just an attribute, That's the great thing about properties is that they can be migrated or substituted in place of an attribute if needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one shouldn't be an attribute, because I think the right specialization pattern is "override method [or property]".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that better than setting a class level attribute? I really don't understand why this is such an uncommon thing. Are you trying to protect it from changing, and if so other code would be needed here.

"""Test whether this `PipelineTask` can fail even after all outputs
have been written.

When this returns `False` (the default base class behavior), execution
harnesses and QuantumGraph generation algorithms may assume that:

- any quantum execution that yielded all predicted outputs was a
success, without checking actual exit status.

- any quantum execution that yields no predicted outputs can be
treated as if `NoWorkQuantum` was raised.

These assumptions enable important optimizations in code that attempts
to quickly determine the status of an executed quantum.
"""
return False


def iterConnections(connections: PipelineTaskConnections,
Expand Down
Loading