diff --git a/doc/changes/DM-40243.feature.md b/doc/changes/DM-40243.feature.md new file mode 100644 index 000000000..e644365cf --- /dev/null +++ b/doc/changes/DM-40243.feature.md @@ -0,0 +1,3 @@ +When looking up prerequisite inputs with skypix data IDs (e.g. reference catalogs) for a quantum whose data ID is not spatial, use the union of the spatial regions of the input and output datasets as a constraint. + +This keeps global sequence-point tasks from being given all such datasets in the input collections. diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index 33b846f72..c3784bcbe 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -50,10 +50,12 @@ NamedValueSet, Quantum, Registry, + SkyPixDimension, ) from lsst.daf.butler.registry import MissingCollectionError, MissingDatasetTypeError from lsst.daf.butler.registry.queries import DataCoordinateQueryResults from lsst.daf.butler.registry.wildcards import CollectionWildcard +from lsst.sphgeom import PixelizationABC, RangeSet # ----------------------------- # Imports for other modules -- @@ -415,6 +417,27 @@ def __repr__(self) -> str: inputs to this quantum. """ + def computeSpatialExtent(self, pixelization: PixelizationABC) -> RangeSet: + """Return the spatial extent of this quantum's inputs and outputs in + a skypix system. + + Parameters + ---------- + pixelization : `lsst.sphgeom.PixelizationABC` + Pixelization system. + + Returns + ------- + extent : `lsst.sphgeom.RangeSet` + Ranges of sky pixels that touch this quantum's inputs and outputs. + """ + result = RangeSet() + for dataset_type, datasets in itertools.chain(self.inputs.items(), self.outputs.items()): + if dataset_type.dimensions.spatial: + for data_id in datasets.keys(): + result |= pixelization.envelope(data_id.region) + return result + def makeQuantum(self, datastore_records: Mapping[str, DatastoreRecordData] | None = None) -> Quantum: """Transform the scaffolding object into a true `Quantum` instance. @@ -1318,6 +1341,30 @@ def resolveDatasetRefs( # which just means there are no datasets here. prereq_refs = [] else: + where = "" + bind: dict[str, Any] = {} + if not quantum.dataId.graph.spatial: + # This has skypix dimensions (probably a reference + # catalog), but the quantum's data is not spatial + # (it's probably a full-survey sequence point). + # Try to limit the spatial extent to the union of + # the spatial extent of the inputs and outputs. + for dimension in datasetType.dimensions: + if isinstance(dimension, SkyPixDimension): + extent = quantum.computeSpatialExtent(dimension.pixelization) + pixels: list[int] = [] + for begin, end in extent: + pixels.extend(range(begin, end)) + if not pixels: + _LOG.warning( + "Prerequisite input %r to task %r may be unbounded.", + datasetType.name, + quantum.task.taskDef.label, + ) + else: + bind["quantum_extent"] = pixels + where = f"{dimension.name} IN (quantum_extent)" + break # Most general case. prereq_refs = [ prereq_ref if component is None else prereq_ref.makeComponentRef(component) @@ -1326,6 +1373,8 @@ def resolveDatasetRefs( collections=collections, dataId=quantum.dataId, findFirst=True, + where=where, + bind=bind, ).expanded() ]