Skip to content

Commit

Permalink
Move loadDiaCatalogs to a standalone pipelineTask
Browse files Browse the repository at this point in the history
  • Loading branch information
isullivan committed Aug 14, 2024
1 parent 19031e9 commit 9f2e272
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 122 deletions.
64 changes: 38 additions & 26 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
Additionally performs forced photometry on the calibrated and difference
images at the updated locations of DiaObjects.
Currently loads directly from the Apdb rather than pre-loading.
"""

__all__ = ("DiaPipelineConfig",
Expand All @@ -44,7 +42,6 @@
from lsst.ap.association import (
AssociationTask,
DiaForcedSourceTask,
LoadDiaCatalogsTask,
PackageAlertsTask)
from lsst.ap.association.ssoAssociation import SolarSystemAssociationTask
from lsst.ap.association.utils import convertTableToSdmSchema, readSchemaFromApdb, dropEmptyColumns, \
Expand Down Expand Up @@ -94,6 +91,24 @@ class DiaPipelineConnections(
storageClass="ExposureF",
name="{fakesType}{coaddName}Diff_templateExp",
)
preloadedDiaObjects = connTypes.Input(
doc="DiaObjects preloaded from the APDB.",
name="preloaded_diaObjects",
storageClass="DataFrame",
dimensions=("instrument", "group", "detector"),
)
preloadedDiaSources = connTypes.Input(
doc="DiaSources preloaded from the APDB.",
name="preloaded_diaSources",
storageClass="DataFrame",
dimensions=("instrument", "group", "detector"),
)
preloadedDiaForcedSources = connTypes.Input(
doc="DiaForcedSources preloaded from the APDB.",
name="preloaded_diaForcedSources",
storageClass="DataFrame",
dimensions=("instrument", "group", "detector"),
)
apdbMarker = connTypes.Output(
doc="Marker dataset storing the configuration of the Apdb for each "
"visit/detector. Used to signal the completion of the pipeline.",
Expand Down Expand Up @@ -232,10 +247,6 @@ class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
"band not on this list, the appropriate band specific columns "
"must be added to the Apdb schema in dax_apdb.",
)
diaCatalogLoader = pexConfig.ConfigurableField(
target=LoadDiaCatalogsTask,
doc="Task to load DiaObjects and DiaSources from the Apdb.",
)
associator = pexConfig.ConfigurableField(
target=AssociationTask,
doc="Task used to associate DiaSources with DiaObjects.",
Expand All @@ -253,15 +264,6 @@ class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
target=DiaObjectCalculationTask,
doc="Task to compute summary statistics for DiaObjects.",
)
doLoadForcedSources = pexConfig.Field(
dtype=bool,
default=True,
deprecated="Added to allow disabling forced sources for performance "
"reasons during the ops rehearsal. "
"It is expected to be removed.",
doc="Load forced DiaSource history from the APDB? "
"This should only be turned off for debugging purposes.",
)
doRunForcedMeasurement = pexConfig.Field(
dtype=bool,
default=True,
Expand Down Expand Up @@ -360,7 +362,6 @@ def __init__(self, initInputs=None, **kwargs):
else:
self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url)
self.schema = readSchemaFromApdb(self.apdb)
self.makeSubtask("diaCatalogLoader")
self.makeSubtask("associator")
self.makeSubtask("diaCalculation")
if self.config.doRunForcedMeasurement:
Expand Down Expand Up @@ -388,6 +389,9 @@ def run(self,
diffIm,
exposure,
template,
preloadedDiaObjects,
preloadedDiaSources,
preloadedDiaForcedSources,
band,
idGenerator):
"""Process DiaSources and DiaObjects.
Expand All @@ -411,6 +415,12 @@ def run(self,
``diffIm``.
template : `lsst.afw.image.ExposureF`
Template exposure used to create diffIm.
preloadedDiaObjects : `pandas.DataFrame`
Previously detected DiaObjects, loaded from the APDB.
preloadedDiaSources : `pandas.DataFrame`
Previously detected DiaSources, loaded from the APDB.
preloadedDiaForcedSources : `pandas.DataFrame`
Catalog of previously detected forced DiaSources, from the APDB
band : `str`
The band in which the new DiaSources were detected.
idGenerator : `lsst.meas.base.IdGenerator`
Expand All @@ -429,15 +439,17 @@ def run(self,
- ``diaForcedSources`` : Catalog of new and previously detected
forced DiaSources. (`pandas.DataFrame`)
- ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`)
Raises
------
RuntimeError
Raised if duplicate DiaObjects or duplicate DiaSources are found.
"""
# Load the DiaObjects and DiaSource history.
loaderResult = self.diaCatalogLoader.run(diffIm, self.apdb,
doLoadForcedSources=self.config.doLoadForcedSources)
if not loaderResult.diaObjects.empty:
diaObjects = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), loaderResult.diaObjects,
if not preloadedDiaObjects.empty:
diaObjects = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects,
buffer=self.config.imagePixelMargin)
else:
diaObjects = loaderResult.diaObjects
diaObjects = preloadedDiaObjects
# Associate new DiaSources with existing DiaObjects.
assocResults = self.associator.run(diaSourceTable, diaObjects)

Expand Down Expand Up @@ -500,7 +512,7 @@ def run(self,
"failure in Association while matching and creating new "
"DiaObjects and should be reported. Exiting.")

mergedDiaSourceHistory = self.mergeCatalogs(associatedDiaSources, loaderResult.diaSources,
mergedDiaSourceHistory = self.mergeCatalogs(preloadedDiaSources, associatedDiaSources,
"preloadedDiaSources")

# Test for DiaSource duplication first. If duplicates are found,
Expand Down Expand Up @@ -569,7 +581,7 @@ def run(self,
self.log.info("APDB updated.")

if self.config.doPackageAlerts:
diaForcedSources = self.mergeCatalogs(diaForcedSources, loaderResult.diaForcedSources,
diaForcedSources = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources,
"preloadedDiaForcedSources")
if self.testDataFrameIndex(diaForcedSources):
self.log.warning(
Expand All @@ -588,7 +600,7 @@ def run(self,
inplace=True)
self.alertPackager.run(associatedDiaSources,
diaCalResult.diaObjectCat,
loaderResult.diaSources,
preloadedDiaSources,
diaForcedSources,
diffIm,
exposure,
Expand Down
Loading

0 comments on commit 9f2e272

Please sign in to comment.