Skip to content

Commit

Permalink
Add Pipeline.get_data_id.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Jun 9, 2023
1 parent 3766d9f commit 7dcc699
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 13 deletions.
15 changes: 3 additions & 12 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from lsst.daf.butler.registry import MissingCollectionError, MissingDatasetTypeError
from lsst.daf.butler.registry.queries import DataCoordinateQueryResults
from lsst.daf.butler.registry.wildcards import CollectionWildcard
from lsst.utils import doImportType

# -----------------------------
# Imports for other modules --
Expand Down Expand Up @@ -1606,18 +1605,10 @@ def makeGraph(
scaffolding = _PipelineScaffolding(pipeline, registry=self.registry)
if not collections and (scaffolding.initInputs or scaffolding.inputs or scaffolding.prerequisites):
raise ValueError("Pipeline requires input datasets but no input collections provided.")
instrument_class: Optional[Any] = None
if isinstance(pipeline, Pipeline):
instrument_class_name = pipeline.getInstrument()
if instrument_class_name is not None:
instrument_class = doImportType(instrument_class_name)
pipeline = list(pipeline.toExpandedPipeline())
if instrument_class is not None:
dataId = DataCoordinate.standardize(
dataId, instrument=instrument_class.getName(), universe=self.registry.dimensions
)
elif dataId is None:
if dataId is None:
dataId = DataCoordinate.makeEmpty(self.registry.dimensions)
if isinstance(pipeline, Pipeline):
dataId = pipeline.get_data_id(self.registry.dimensions).union(dataId)
with scaffolding.connectDataIds(
self.registry, collections, userQuery, dataId, datasetQueryConstraint, bind
) as commonDataIds:
Expand Down
31 changes: 30 additions & 1 deletion python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@

# -----------------------------
# Imports for other modules --
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.daf.butler import (
DataCoordinate,
DatasetType,
DimensionUniverse,
NamedValueSet,
Registry,
SkyPixDimension,
)
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImportType
from lsst.utils.introspection import get_full_type_name
Expand Down Expand Up @@ -613,6 +620,28 @@ def getInstrument(self) -> Optional[str]:
"""
return self._pipelineIR.instrument

def get_data_id(self, universe: DimensionUniverse) -> DataCoordinate:
"""Return a data ID with all dimension constraints embedded in the
pipeline.
Parameters
----------
universe : `lsst.daf.butler.DimensionUniverse`
Object that defines all dimensions.
Returns
-------
data_id : `lsst.daf.butler.DataCoordinate`
Data ID with all dimension constraints embedded in the
pipeline.
"""
instrument_class_name = self._pipelineIR.instrument
if instrument_class_name is not None:
instrument_class = doImportType(instrument_class_name)
if instrument_class is not None:
return DataCoordinate.standardize(instrument=instrument_class.getName(), universe=universe)
return DataCoordinate.makeEmpty(universe)

def addTask(self, task: Union[Type[PipelineTask], str], label: str) -> None:
"""Add a new task to the pipeline, or replace a task that is already
associated with the supplied label.
Expand Down

0 comments on commit 7dcc699

Please sign in to comment.