Skip to content

Commit

Permalink
Merge pull request #308 from lsst/tickets/DM-37786-v24
Browse files Browse the repository at this point in the history
DM-37786-v24: backport task control over dataset-query-constraints defaults
  • Loading branch information
TallJimbo authored Feb 23, 2023
2 parents 641d719 + ac18f79 commit fe0c299
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 5 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-37786.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an error message that says that repository state has changed during QuantumGraph generation when init input datasets are just missing.
1 change: 1 addition & 0 deletions doc/changes/DM-37786.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow `PipelineTasks` to provide defaults for the `--dataset-query-constraints` option for the `pipe_task` tool.
56 changes: 56 additions & 0 deletions python/lsst/pipe/base/connectionTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,62 @@ def __post_init__(self) -> None:

@dataclasses.dataclass(frozen=True)
class Input(BaseInput):
"""Class used for declaring PipelineTask input connections
Parameters
----------
name : `str`
The default name used to identify the dataset type
storageClass : `str`
The storage class used when (un)/persisting the dataset type
multiple : `bool`
Indicates if this connection should expect to contain multiple objects
of the given dataset type. Tasks with more than one connection with
``multiple=True`` with the same dimensions may want to implement
`PipelineTaskConnections.adjustQuantum` to ensure those datasets are
consistent (i.e. zip-iterable) in `PipelineTask.runQuantum` and notify
the execution system as early as possible of outputs that will not be
produced because the corresponding input is missing.
dimensions : iterable of `str`
The `lsst.daf.butler.Butler` `lsst.daf.butler.Registry` dimensions used
to identify the dataset type identified by the specified name
deferLoad : `bool`
Indicates that this dataset type will be loaded as a
`lsst.daf.butler.DeferredDatasetHandle`. PipelineTasks can use this
object to load the object at a later time.
minimum : `bool`
Minimum number of datasets required for this connection, per quantum.
This is checked in the base implementation of
`PipelineTaskConnections.adjustQuantum`, which raises `NoWorkFound` if
the minimum is not met for `Input` connections (causing the quantum to
be pruned, skipped, or never created, depending on the context), and
`FileNotFoundError` for `PrerequisiteInput` connections (causing
QuantumGraph generation to fail). `PipelineTask` implementations may
provide custom `~PipelineTaskConnections.adjustQuantum` implementations
for more fine-grained or configuration-driven constraints, as long as
they are compatible with this minium.
deferGraphConstraint: `bool`, optional
If `True`, do not include this dataset type's existence in the initial
query that starts the QuantumGraph generation process. This can be
used to make QuantumGraph generation faster by avoiding redundant
datasets, and in certain cases it can (along with careful attention to
which tasks are included in the same QuantumGraph) be used to work
around the QuantumGraph generation algorithm's inflexible handling of
spatial overlaps. This option has no effect when the connection is not
an overall input of the pipeline (or subset thereof) for which a graph
is being created, and it never affects the ordering of quanta.
Raises
------
TypeError
Raised if ``minimum`` is greater than one but ``multiple=False``.
NotImplementedError
Raised if ``minimum`` is zero for a regular `Input` connection; this
is not currently supported by our QuantumGraph generation algorithm.
"""

deferGraphConstraint: bool = False

def __post_init__(self) -> None:
super().__post_init__()
if self.minimum == 0:
Expand Down
34 changes: 31 additions & 3 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
DimensionGraph,
DimensionUniverse,
NamedKeyDict,
NamedValueSet,
Quantum,
Registry,
)
Expand Down Expand Up @@ -552,6 +553,7 @@ def __init__(self, pipeline: Union[Pipeline, Iterable[TaskDef]], *, registry: Re
attr,
_DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr), universe=registry.dimensions),
)
self.defaultDatasetQueryConstraints = datasetTypes.queryConstraints
# Aggregate all dimensions for all non-init, non-prerequisite
# DatasetTypes. These are the ones we'll include in the big join
# query.
Expand Down Expand Up @@ -613,6 +615,11 @@ def __repr__(self) -> str:
per-Quantum when generating the graph (`_DatasetDict`).
"""

defaultDatasetQueryConstraints: NamedValueSet[DatasetType]
"""Datasets that should be used as constraints in the initial query,
according to tasks (`NamedValueSet`).
"""

dimensions: DimensionGraph
"""All dimensions used by any regular input, intermediate, or output
(not prerequisite) dataset; the set of dimension used in the "Big Join
Expand Down Expand Up @@ -675,15 +682,21 @@ def connectDataIds(
# inputs and outputs. We limit the query to only dimensions that are
# associated with the input dataset types, but don't (yet) try to
# obtain the dataset_ids for those inputs.
_LOG.debug("Submitting data ID query and materializing results.")
_LOG.debug(
"Submitting data ID query over dimensions %s and materializing results.",
list(self.dimensions.names),
)
queryArgs: Dict[str, Any] = {
"dimensions": self.dimensions,
"where": userQuery,
"dataId": externalDataId,
}
if datasetQueryConstraint == DatasetQueryConstraintVariant.ALL:
_LOG.debug("Constraining graph query using all datasets in pipeline.")
queryArgs["datasets"] = list(self.inputs)
_LOG.debug(
"Constraining graph query using default of %s.",
list(self.defaultDatasetQueryConstraints.names),
)
queryArgs["datasets"] = list(self.defaultDatasetQueryConstraints)
queryArgs["collections"] = collections
elif datasetQueryConstraint == DatasetQueryConstraintVariant.OFF:
_LOG.debug("Not using dataset existence to constrain query.")
Expand Down Expand Up @@ -845,6 +858,16 @@ def resolveDatasetRefs(
collectionTypes=CollectionType.RUN,
)

# Updating constrainedByAllDatasets here is not ideal, but we have a
# few different code paths that each transfer different pieces of
# information about what dataset query constraints were applied here,
# and none of them has the complete picture until we get here. We're
# long overdue for a QG generation rewrite that will make this go away
# entirely anyway.
constrainedByAllDatasets = (
constrainedByAllDatasets and self.defaultDatasetQueryConstraints == self.inputs.keys()
)

# Look up [init] intermediate and output datasets in the output
# collection, if there is an output collection.
if run is not None or skipCollections is not None:
Expand Down Expand Up @@ -913,6 +936,11 @@ def resolveDatasetRefs(
f"or the input collections have been modified since "
f"QuantumGraph generation began."
)
elif not datasetType.dimensions:
raise RuntimeError(
f"Dataset {datasetType.name!r} (with no dimensions) could not be found in "
f"collections {collections}."
)
else:
# if the common dataIds were not constrained using all the
# input dataset types, it is possible that some data ids
Expand Down
32 changes: 30 additions & 2 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from .config import PipelineTaskConfig
from .configOverrides import ConfigOverrides
from .connections import iterConnections
from .connectionTypes import Input
from .pipelineTask import PipelineTask
from .task import _TASK_METADATA_TYPE

Expand Down Expand Up @@ -773,6 +774,12 @@ class TaskDatasetTypes:
at the Pipeline level.
"""

queryConstraints: NamedValueSet[DatasetType]
"""Regular inputs that should not be used as constraints on the initial
QuantumGraph generation data ID query, according to their tasks
(`NamedValueSet`).
"""

prerequisites: NamedValueSet[DatasetType]
"""Dataset types that are prerequisite inputs to this Task.
Expand Down Expand Up @@ -1011,10 +1018,18 @@ def makeDatasetTypesSet(

outputs.freeze()

inputs = makeDatasetTypesSet("inputs", is_input=True)
queryConstraints = NamedValueSet(
inputs[c.name]
for c in cast(Iterable[Input], iterConnections(taskDef.connections, "inputs"))
if not c.deferGraphConstraint
)

return cls(
initInputs=makeDatasetTypesSet("initInputs", is_input=True),
initOutputs=initOutputs,
inputs=makeDatasetTypesSet("inputs", is_input=True),
inputs=inputs,
queryConstraints=queryConstraints,
prerequisites=makeDatasetTypesSet("prerequisiteInputs", is_input=True),
outputs=outputs,
)
Expand Down Expand Up @@ -1061,6 +1076,12 @@ class PipelineDatasetTypes:
produced.
"""

queryConstraints: NamedValueSet[DatasetType]
"""Regular inputs that should be used as constraints on the initial
QuantumGraph generation data ID query, according to their tasks
(`NamedValueSet`).
"""

prerequisites: NamedValueSet[DatasetType]
"""Dataset types that are prerequisite inputs for the full Pipeline.
Expand Down Expand Up @@ -1133,6 +1154,7 @@ def fromPipeline(
allInitInputs = NamedValueSet[DatasetType]()
allInitOutputs = NamedValueSet[DatasetType]()
prerequisites = NamedValueSet[DatasetType]()
queryConstraints = NamedValueSet[DatasetType]()
byTask = dict()
if include_packages:
allInitOutputs.add(
Expand Down Expand Up @@ -1161,6 +1183,9 @@ def fromPipeline(
allInitInputs.update(thisTask.initInputs)
allInitOutputs.update(thisTask.initOutputs)
allInputs.update(thisTask.inputs)
# Inputs are query constraints if any task considers them a query
# constraint.
queryConstraints.update(thisTask.queryConstraints)
prerequisites.update(thisTask.prerequisites)
allOutputs.update(thisTask.outputs)
byTask[taskDef.label] = thisTask
Expand Down Expand Up @@ -1213,11 +1238,14 @@ def frozen(s: AbstractSet[DatasetType]) -> NamedValueSet[DatasetType]:
s.freeze()
return s

inputs = frozen(allInputs - allOutputs - intermediateComponents)

return cls(
initInputs=frozen(allInitInputs - allInitOutputs),
initIntermediates=frozen(allInitInputs & allInitOutputs),
initOutputs=frozen(allInitOutputs - allInitInputs),
inputs=frozen(allInputs - allOutputs - intermediateComponents),
inputs=inputs,
queryConstraints=frozen(queryConstraints & inputs),
# If there are storage class differences in inputs and outputs
# the intermediates have to choose priority. Here choose that
# inputs to tasks much match the requested storage class by
Expand Down

0 comments on commit fe0c299

Please sign in to comment.