From f0a9b9bcb18c1087ad90a768e8a4af8fc6c26f46 Mon Sep 17 00:00:00 2001 From: Audrey Budlong Date: Thu, 15 Aug 2024 11:45:20 -0700 Subject: [PATCH 1/5] Cleanup code comment. --- python/lsst/ap/association/diaPipe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/lsst/ap/association/diaPipe.py b/python/lsst/ap/association/diaPipe.py index b8c8ba46..5744dce6 100644 --- a/python/lsst/ap/association/diaPipe.py +++ b/python/lsst/ap/association/diaPipe.py @@ -464,6 +464,7 @@ def run(self, assocResults.unAssocDiaSources, solarSystemObjectTable, diffIm) + # Create new DiaObjects from unassociated diaSources. createResults = self.createNewDiaObjects( ssoAssocResult.unAssocDiaSources) toAssociate = [] @@ -476,6 +477,7 @@ def run(self, nTotalSsObjects = ssoAssocResult.nTotalSsObjects nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects else: + # Create new DiaObjects from unassociated diaSources. createResults = self.createNewDiaObjects( assocResults.unAssocDiaSources) toAssociate = [] @@ -486,7 +488,6 @@ def run(self, nTotalSsObjects = 0 nAssociatedSsObjects = 0 - # Create new DiaObjects from unassociated diaSources. self._add_association_meta_data(assocResults.nUpdatedDiaObjects, assocResults.nUnassociatedDiaObjects, createResults.nNewDiaObjects, From 1b3192d2ddc0863b569d1ab3cb0b44aae8ef7b9e Mon Sep 17 00:00:00 2001 From: Audrey Budlong Date: Thu, 19 Sep 2024 13:09:43 -0700 Subject: [PATCH 2/5] Update random integer function --- tests/utils_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils_tests.py b/tests/utils_tests.py index 312d9ce4..d3340512 100644 --- a/tests/utils_tests.py +++ b/tests/utils_tests.py @@ -103,7 +103,7 @@ def makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nSources) rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nSources) if randomizeObjects: - objectIds = diaObjectIds[rng.randint(len(diaObjectIds), size=nSources)] + objectIds = diaObjectIds[rng.integers(len(diaObjectIds), size=nSources)] else: objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nSources)]] From f5a912473cf537374922f034e12585ab5b78bd03 Mon Sep 17 00:00:00 2001 From: Audrey Budlong Date: Thu, 19 Sep 2024 13:10:11 -0700 Subject: [PATCH 3/5] Attach real photoCalib to test exposure --- tests/utils_tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/utils_tests.py b/tests/utils_tests.py index d3340512..f7a3e43d 100644 --- a/tests/utils_tests.py +++ b/tests/utils_tests.py @@ -240,6 +240,7 @@ def makeExposure(flipX=False, flipY=False): exposure.setDetector(detector) exposure.info.setVisitInfo(visit) exposure.setFilter(afwImage.FilterLabel(band='g')) + exposure.setPhotoCalib(afwImage.PhotoCalib(1., 0., exposure.getBBox())) return exposure From 330a15a74d8f6fe531a7a8ac2d59661c5ae40f1c Mon Sep 17 00:00:00 2001 From: Audrey Budlong Date: Thu, 19 Sep 2024 13:14:47 -0700 Subject: [PATCH 4/5] Add solar system test sources --- tests/utils_tests.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/utils_tests.py b/tests/utils_tests.py index f7a3e43d..f4badc23 100644 --- a/tests/utils_tests.py +++ b/tests/utils_tests.py @@ -133,6 +133,34 @@ def makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False return pd.DataFrame(data=data) +def makeSolarSystemSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False): + """Make a test set of solar system sources. + + Parameters + ---------- + nSources : `int` + Number of sources to create. + diaObjectIds : `numpy.ndarray` + Integer Ids of diaobjects to "associate" with the DiaSources. + exposure : `lsst.afw.image.Exposure` + Exposure to create sources over. + randomizeObjects : `bool`, optional + If True, randomly draw from `diaObjectIds` to generate the ids in the + output catalog, otherwise just iterate through them, repeating as + necessary to get nSources objectIds. + + Returns + ------- + solarSystemSources : `pandas.DataFrame` + Solar system sources generated across the exposure. + """ + solarSystemSources = makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False) + solarSystemSources["ssObjectId"] = rng.integers(0, high=2**63-1, size=nSources) + solarSystemSources["Err(arcsec)"] = rng.uniform(0.2, 0.4, size=nSources) + + return solarSystemSources + + def makeDiaForcedSources(nForcedSources, diaObjectIds, exposure, rng, randomizeObjects=False): """Make a test set of DiaSources. From f1c51032990bd32e559211746b92c354d9ae726a Mon Sep 17 00:00:00 2001 From: Audrey Budlong Date: Wed, 28 Aug 2024 11:57:33 -0700 Subject: [PATCH 5/5] Refactoring diaPipe.py --- python/lsst/ap/association/diaPipe.py | 356 ++++++++++++++++++-------- python/lsst/ap/association/utils.py | 15 +- tests/test_diaPipe.py | 20 +- 3 files changed, 272 insertions(+), 119 deletions(-) diff --git a/python/lsst/ap/association/diaPipe.py b/python/lsst/ap/association/diaPipe.py index 5744dce6..d311e5cc 100644 --- a/python/lsst/ap/association/diaPipe.py +++ b/python/lsst/ap/association/diaPipe.py @@ -45,7 +45,7 @@ PackageAlertsTask) from lsst.ap.association.ssoAssociation import SolarSystemAssociationTask from lsst.ap.association.utils import convertTableToSdmSchema, readSchemaFromApdb, dropEmptyColumns, \ - make_empty_catalog + make_empty_catalog, makeEmptyForcedSourceTable from lsst.daf.base import DateTime from lsst.meas.base import DetectorVisitIdGeneratorConfig, \ DiaObjectCalculationTask @@ -428,6 +428,8 @@ def run(self, The band in which the new DiaSources were detected. idGenerator : `lsst.meas.base.IdGenerator` Object that generates source IDs and random number generator seeds. + solarSystemObjectTable : `pandas.DataFrame` + Preloaded Solar System objects expected to be visible in the image. Returns ------- @@ -451,11 +453,156 @@ def run(self, # Accept either legacySolarSystemTable or optional solarSystemObjectTable. if legacySolarSystemTable is not None and solarSystemObjectTable is None: solarSystemObjectTable = legacySolarSystemTable + if not preloadedDiaObjects.empty: diaObjects = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects, buffer=self.config.imagePixelMargin) else: diaObjects = preloadedDiaObjects + + # Associate DiaSources with DiaObjects + associatedDiaSources, newDiaObjects = self.associateDiaSources( + diaSourceTable, solarSystemObjectTable, diffIm, diaObjects + ) + + # Merge associated diaSources + mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds = self.mergeAssociatedCatalogs( + preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects + ) + + # Compute DiaObject Summary statistics from their full DiaSource + # history. + diaCalResult = self.diaCalculation.run( + mergedDiaObjects, + mergedDiaSourceHistory, + updatedDiaObjectIds, + [band]) + + # Test for duplication in the updated DiaObjects. + if self.testDataFrameIndex(diaCalResult.diaObjectCat): + raise RuntimeError( + "Duplicate DiaObjects (loaded + updated) created after " + "DiaCalculation. This is unexpected behavior and should be " + "reported. Exiting.") + if self.testDataFrameIndex(diaCalResult.updatedDiaObjects): + raise RuntimeError( + "Duplicate DiaObjects (updated) created after " + "DiaCalculation. This is unexpected behavior and should be " + "reported. Exiting.") + + # Forced source measurement + if self.config.doRunForcedMeasurement: + diaForcedSources = self.runForcedMeasurement( + diaCalResult.diaObjectCat, diaCalResult.updatedDiaObjects, exposure, diffIm, idGenerator + ) + + else: + # alertPackager needs correct columns + diaForcedSources = makeEmptyForcedSourceTable(self.schema) + + # Write results to Alert Production Database (APDB) + self.writeToApdb(diaCalResult.updatedDiaObjects, associatedDiaSources, diaForcedSources) + + # Package alerts + if self.config.doPackageAlerts: + # Append new forced sources to the full history + diaForcedSourcesFull = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources, + "preloadedDiaForcedSources") + if self.testDataFrameIndex(diaForcedSources): + self.log.warning( + "Duplicate DiaForcedSources created after merge with " + "history and new sources. This may cause downstream " + "problems. Dropping duplicates.") + # Drop duplicates via index and keep the first appearance. + # Reset due to the index shape being slight different than + # expected. + diaForcedSourcesFull = diaForcedSourcesFull.groupby( + diaForcedSourcesFull.index).first() + diaForcedSourcesFull.reset_index(drop=True, inplace=True) + diaForcedSourcesFull.set_index( + ["diaObjectId", "diaForcedSourceId"], + drop=False, + inplace=True) + self.alertPackager.run(associatedDiaSources, + diaCalResult.diaObjectCat, + preloadedDiaSources, + diaForcedSourcesFull, + diffIm, + exposure, + template, + doRunForcedMeasurement=self.config.doRunForcedMeasurement, + ) + + # For historical reasons, apdbMarker is a Config even if it's not meant to be read. + # A default Config is the cheapest way to satisfy the storage class. + marker = self.config.apdb.value if self.config.doConfigureApdb else pexConfig.Config() + return pipeBase.Struct(apdbMarker=marker, + associatedDiaSources=associatedDiaSources, + diaForcedSources=diaForcedSources, + diaObjects=diaCalResult.diaObjectCat, + ) + + def createNewDiaObjects(self, unAssocDiaSources): + """Loop through the set of DiaSources and create new DiaObjects + for unassociated DiaSources. + + Parameters + ---------- + unAssocDiaSources : `pandas.DataFrame` + Set of DiaSources to create new DiaObjects from. + + Returns + ------- + results : `lsst.pipe.base.Struct` + Results struct containing: + + - diaSources : `pandas.DataFrame` + DiaSource catalog with updated DiaObject ids. + - newDiaObjects : `pandas.DataFrame` + Newly created DiaObjects from the unassociated DiaSources. + - nNewDiaObjects : `int` + Number of newly created diaObjects. + """ + if len(unAssocDiaSources) == 0: + newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject") + else: + unAssocDiaSources["diaObjectId"] = unAssocDiaSources["diaSourceId"] + newDiaObjects = convertTableToSdmSchema(self.schema, unAssocDiaSources, + tableName="DiaObject") + return pipeBase.Struct(diaSources=unAssocDiaSources, + newDiaObjects=newDiaObjects, + nNewDiaObjects=len(newDiaObjects)) + + @timeMethod + def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, diaObjects): + """Associate DiaSources with DiaObjects. + + Associate new DiaSources with existing DiaObjects. Create new + DiaObjects fron unassociated DiaSources. Index DiaSource catalogue + after associations. Append new DiaObjects and DiaSources to their + previous history. Test for DiaSource and DiaObject duplications. + Compute DiaObject Summary statistics from their full DiaSource + history. Test for duplication in the updated DiaObjects. + + Parameters + ---------- + diaSourceTable : `pandas.DataFrame` + Newly detected DiaSources. + solarSystemObjectTable : `pandas.DataFrame` + Preloaded Solar System objects expected to be visible in the image. + diffIm : `lsst.afw.image.ExposureF` + Difference image exposure in which the sources in ``diaSourceCat`` + were detected. + diaObjects : `pandas.DataFrame` + Table of DiaObjects from preloaded DiaObjects. + + Returns + ------- + associatedDiaSources : `pandas.DataFrame` + Associated DiaSources with DiaObjects. + newDiaObjects : `pandas.DataFrame` + Table of new DiaObjects after association. + """ # Associate new DiaSources with existing DiaObjects. assocResults = self.associator.run(diaSourceTable, diaObjects) @@ -493,6 +640,43 @@ def run(self, createResults.nNewDiaObjects, nTotalSsObjects, nAssociatedSsObjects) + self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects", + assocResults.nUpdatedDiaObjects, + assocResults.nUnassociatedDiaObjects, + createResults.nNewDiaObjects, + ) + return (associatedDiaSources, createResults.newDiaObjects) + + @timeMethod + def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects): + """Merge the associated diaSource and diaObjects to their previous history. + + Parameters + ---------- + preloadedDiaSources : `pandas.DataFrame` + Previously detected DiaSources, loaded from the APDB. + associatedDiaSources : `pandas.DataFrame` + Associated DiaSources with DiaObjects. + diaObjects : `pandas.DataFrame` + Table of DiaObjects from preloaded DiaObjects. + newDiaObjects : `pandas.DataFrame` + Table of new DiaObjects after association. + + Raises + ------ + RuntimeError + Raised if duplicate DiaObjects or duplicate DiaSources are found. + + Returns + ------- + mergedDiaSourceHistory : `pandas.DataFrame` + The combined catalog, with all of the rows from preloadedDiaSources + catalog ordered before the rows of associatedDiaSources catalog. + mergedDiaObjects : `pandas.DataFrame` + Table of new DiaObjects merged with their history. + updatedDiaObjectIds : `numpy.Array` + Object Id's from associated diaSources. + """ # Index the DiaSource catalog for this visit after all associations # have been made. updatedDiaObjectIds = associatedDiaSources["diaObjectId"][ @@ -505,12 +689,14 @@ def run(self, # Append new DiaObjects and DiaSources to their previous history. if diaObjects.empty: - diaObjects = createResults.newDiaObjects.set_index("diaObjectId", drop=False) - elif not createResults.newDiaObjects.empty: - diaObjects = pd.concat( + mergedDiaObjects = newDiaObjects.set_index("diaObjectId", drop=False) + elif not newDiaObjects.empty: + mergedDiaObjects = pd.concat( [diaObjects, - createResults.newDiaObjects.set_index("diaObjectId", drop=False)], + newDiaObjects.set_index("diaObjectId", drop=False)], sort=True) + else: + mergedDiaObjects = diaObjects if self.testDataFrameIndex(diaObjects): raise RuntimeError( "Duplicate DiaObjects created after association. This is " @@ -532,50 +718,71 @@ def run(self, "already populated Apdb. If this was not the case then there " "was an unexpected failure in Association while matching " "sources to objects, and should be reported. Exiting.") + return (mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds) - # Compute DiaObject Summary statistics from their full DiaSource - # history. - diaCalResult = self.diaCalculation.run( - diaObjects, - mergedDiaSourceHistory, - updatedDiaObjectIds, - [band]) - # Test for duplication in the updated DiaObjects. - if self.testDataFrameIndex(diaCalResult.diaObjectCat): - raise RuntimeError( - "Duplicate DiaObjects (loaded + updated) created after " - "DiaCalculation. This is unexpected behavior and should be " - "reported. Exiting.") - if self.testDataFrameIndex(diaCalResult.updatedDiaObjects): - raise RuntimeError( - "Duplicate DiaObjects (updated) created after " - "DiaCalculation. This is unexpected behavior and should be " - "reported. Exiting.") + @timeMethod + def runForcedMeasurement(self, diaObjects, updatedDiaObjects, exposure, diffIm, idGenerator): + """Forced Source Measurement - if self.config.doRunForcedMeasurement: - # Force photometer on the Difference and Calibrated exposures using - # the new and updated DiaObject locations. - diaForcedSources = self.diaForcedSource.run( - diaCalResult.diaObjectCat, - diaCalResult.updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(), - exposure, - diffIm, - idGenerator=idGenerator) - else: - # alertPackager needs correct columns - diaForcedSources = pd.DataFrame(columns=[ - "diaForcedSourceId", "diaObjectID", "ccdVisitID", "psfFlux", "psfFluxErr", - "ra", "dec", "midpointMjdTai", "band", - ]) + Forced photometry on the difference and calibrated exposures using the + new and updated DiaObject locations. - # Store DiaSources, updated DiaObjects, and DiaForcedSources in the - # Apdb. + Parameters + ---------- + diaObjects : `pandas.DataFrame` + Catalog of DiaObjects. + updatedDiaObjects : `pandas.DataFrame` + Catalog of updated DiaObjects. + exposure : `lsst.afw.image.ExposureF` + Calibrated exposure differenced with a template to create + ``diffIm``. + diffIm : `lsst.afw.image.ExposureF` + Difference image exposure in which the sources in ``diaSourceCat`` + were detected. + idGenerator : `lsst.meas.base.IdGenerator` + Object that generates source IDs and random number generator seeds. + + Returns + ------- + diaForcedSources : `pandas.DataFrame` + Catalog of calibrated forced photometered fluxes on both the + difference and direct images at DiaObject locations. + """ + # Force photometer on the Difference and Calibrated exposures using + # the new and updated DiaObject locations. + diaForcedSources = self.diaForcedSource.run( + diaObjects, + updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(), + exposure, + diffIm, + idGenerator=idGenerator) self.log.info(f"Updating {len(diaForcedSources)} diaForcedSources in the APDB") diaForcedSources = convertTableToSdmSchema(self.schema, diaForcedSources, tableName="DiaForcedSource", ) + return diaForcedSources + + @timeMethod + def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources): + """Write to the Alert Production Database (Apdb). + + Store DiaSources, updated DiaObjects, and DiaForcedSources in the + Alert Production Database (Apdb). + + Parameters + ---------- + updatedDiaObjects : `pandas.DataFrame` + Catalog of updated DiaObjects. + associatedDiaSources : `pandas.DataFrame` + Associated DiaSources with DiaObjects. + diaForcedSources : `pandas.DataFrame` + Catalog of calibrated forced photometered fluxes on both the + difference and direct images at DiaObject locations. + """ + # Store DiaSources, updated DiaObjects, and DiaForcedSources in the + # Apdb. # Drop empty columns that are nullable in the APDB. - diaObjectStore = dropEmptyColumns(self.schema, diaCalResult.updatedDiaObjects, tableName="DiaObject") + diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject") diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource") diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource") self.apdb.store( @@ -585,73 +792,6 @@ def run(self, diaForcedSourceStore) self.log.info("APDB updated.") - if self.config.doPackageAlerts: - diaForcedSources = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources, - "preloadedDiaForcedSources") - if self.testDataFrameIndex(diaForcedSources): - self.log.warning( - "Duplicate DiaForcedSources created after merge with " - "history and new sources. This may cause downstream " - "problems. Dropping duplicates.") - # Drop duplicates via index and keep the first appearance. - # Reset due to the index shape being slight different than - # expected. - diaForcedSources = diaForcedSources.groupby( - diaForcedSources.index).first() - diaForcedSources.reset_index(drop=True, inplace=True) - diaForcedSources.set_index( - ["diaObjectId", "diaForcedSourceId"], - drop=False, - inplace=True) - self.alertPackager.run(associatedDiaSources, - diaCalResult.diaObjectCat, - preloadedDiaSources, - diaForcedSources, - diffIm, - exposure, - template, - doRunForcedMeasurement=self.config.doRunForcedMeasurement, - ) - - # For historical reasons, apdbMarker is a Config even if it's not meant to be read. - # A default Config is the cheapest way to satisfy the storage class. - marker = self.config.apdb.value if self.config.doConfigureApdb else pexConfig.Config() - return pipeBase.Struct(apdbMarker=marker, - associatedDiaSources=associatedDiaSources, - diaForcedSources=diaForcedSources, - diaObjects=diaObjects, - ) - - def createNewDiaObjects(self, unAssocDiaSources): - """Loop through the set of DiaSources and create new DiaObjects - for unassociated DiaSources. - - Parameters - ---------- - unAssocDiaSources : `pandas.DataFrame` - Set of DiaSources to create new DiaObjects from. - - Returns - ------- - results : `lsst.pipe.base.Struct` - Results struct containing: - - - ``diaSources`` : DiaSource catalog with updated DiaObject ids. - (`pandas.DataFrame`) - - ``newDiaObjects`` : Newly created DiaObjects from the - unassociated DiaSources. (`pandas.DataFrame`) - - ``nNewDiaObjects`` : Number of newly created diaObjects.(`int`) - """ - if len(unAssocDiaSources) == 0: - newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject") - else: - unAssocDiaSources["diaObjectId"] = unAssocDiaSources["diaSourceId"] - newDiaObjects = convertTableToSdmSchema(self.schema, unAssocDiaSources, - tableName="DiaObject") - return pipeBase.Struct(diaSources=unAssocDiaSources, - newDiaObjects=newDiaObjects, - nNewDiaObjects=len(newDiaObjects)) - def testDataFrameIndex(self, df): """Test the sorted DataFrame index for duplicates. diff --git a/python/lsst/ap/association/utils.py b/python/lsst/ap/association/utils.py index dc87f740..60a89fc1 100644 --- a/python/lsst/ap/association/utils.py +++ b/python/lsst/ap/association/utils.py @@ -22,7 +22,8 @@ """Utilities for working with the APDB. """ __all__ = ("convertTableToSdmSchema", "readSdmSchemaFile", "readSchemaFromApdb", - "dropEmptyColumns", "make_empty_catalog", "getMidpointFromTimespan") + "dropEmptyColumns", "make_empty_catalog", "getMidpointFromTimespan", + "makeEmptyForcedSourceTable") from collections.abc import Mapping import os @@ -329,3 +330,15 @@ def ssObjectID_to_objID(ssObjectID): objID = ''.join([chr((ssObjectID >> (8 * i)) % 256) for i in reversed(range(0, 7))]) return objID, ssObjectID >> (8 * 7) % 256 + + +def makeEmptyForcedSourceTable(schema): + """Return a dataframe with the correct columns for diaForcedSources table. + + Returns + ------- + diaForcedSources : `pandas.DataFrame` + Empty dataframe. + """ + diaForcedSources = convertTableToSdmSchema(schema, pd.DataFrame(), tableName="DiaForcedSource") + return diaForcedSources diff --git a/tests/test_diaPipe.py b/tests/test_diaPipe.py index d56af3ea..8c5d8d60 100644 --- a/tests/test_diaPipe.py +++ b/tests/test_diaPipe.py @@ -36,7 +36,8 @@ from lsst.pipe.base.testUtils import assertValidOutput from lsst.ap.association import DiaPipelineTask -from utils_tests import makeExposure, makeDiaObjects, makeDiaSources, makeDiaForcedSources +from utils_tests import makeExposure, makeDiaObjects, makeDiaSources, makeDiaForcedSources, \ + makeSolarSystemSources def _makeMockDataFrame(): @@ -57,15 +58,11 @@ def _makeMockDataFrame(): class TestDiaPipelineTask(unittest.TestCase): @classmethod - def _makeDefaultConfig(cls, - config_file, - doPackageAlerts=False, - doSolarSystemAssociation=False): + def _makeDefaultConfig(cls, config_file, **kwargs): config = DiaPipelineTask.ConfigClass() config.doConfigureApdb = False config.apdb_config_url = config_file - config.doPackageAlerts = doPackageAlerts - config.doSolarSystemAssociation = doSolarSystemAssociation + config.update(**kwargs) return config def setUp(self): @@ -84,6 +81,8 @@ def setUp(self): 100, self.diaObjects["diaObjectId"].to_numpy(), self.exposure, rng) self.diaForcedSources = makeDiaForcedSources( 200, self.diaObjects["diaObjectId"].to_numpy(), self.exposure, rng) + self.ssSources = makeSolarSystemSources( + 20, self.diaObjects["diaObjectId"].to_numpy(), self.exposure, rng) apdb_config = daxApdb.ApdbSql.init_database(db_url="sqlite://") self.config_file = tempfile.NamedTemporaryFile() @@ -145,13 +144,15 @@ def testRunWithoutAlertsOrSolarSystem(self): """ self._testRun(doPackageAlerts=False, doSolarSystemAssociation=False) - def _testRun(self, doPackageAlerts=False, doSolarSystemAssociation=False): + def _testRun(self, doPackageAlerts=False, doSolarSystemAssociation=False, doRunForcedMeasurement=False): """Test the normal workflow of each ap_pipe step. """ config = self._makeDefaultConfig( config_file=self.config_file.name, doPackageAlerts=doPackageAlerts, - doSolarSystemAssociation=doSolarSystemAssociation) + doSolarSystemAssociation=doSolarSystemAssociation, + doRunForcedMeasurement=doRunForcedMeasurement, + ) task = DiaPipelineTask(config=config) # Set DataFrame index testing to always return False. Mocks return # true for this check otherwise. @@ -167,7 +168,6 @@ def _testRun(self, doPackageAlerts=False, doSolarSystemAssociation=False): # appropriately. subtasksToMock = [ "diaCalculation", - "diaForcedSource", ] if doPackageAlerts: subtasksToMock.append("alertPackager")