Skip to content

Commit

Permalink
Move trailed source filtering to filterDiaSourceCatalog
Browse files Browse the repository at this point in the history
Move the trailed source filtering into filterDiaSourceCatalog. Additionally, update filtering so only trails that extend of the edge or which have long trails and are near the edge are filtered out, along with the sources which exceed the max trail length.
  • Loading branch information
bsmartradio committed May 8, 2024
1 parent b860c6a commit 19cb1d5
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 395 deletions.
2 changes: 1 addition & 1 deletion data/association-flag-map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ columns:
doc: Fake source template injection in source footprint
- name: base_PixelFlags_flag_injected_templateCenter
bit: 33
doc: Fake source template injection center in source footprint
doc: Fake source template injection center in source footprint
1 change: 0 additions & 1 deletion python/lsst/ap/association/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@
from .loadDiaCatalogs import *
from .packageAlerts import *
from .diaPipe import *
from .trailedSourceFilter import *
from .transformDiaSourceCatalog import *
from .version import *
44 changes: 3 additions & 41 deletions python/lsst/ap/association/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
from lsst.utils.timer import timeMethod
from .trailedSourceFilter import TrailedSourceFilterTask

# Enforce an error for unsafe column/array value setting in pandas.
pd.options.mode.chained_assignment = 'raise'
Expand All @@ -49,19 +48,6 @@ class AssociationConfig(pexConfig.Config):
default=1.0,
)

trailedSourceFilter = pexConfig.ConfigurableField(
target=TrailedSourceFilterTask,
doc="Subtask to remove long trailed sources based on catalog source "
"morphological measurements.",
)

doTrailedSourceFilter = pexConfig.Field(
doc="Run traildeSourceFilter to remove long trailed sources from "
"output catalog.",
dtype=bool,
default=True,
)


class AssociationTask(pipeBase.Task):
"""Associate DIAOSources into existing DIAObjects.
Expand All @@ -75,16 +61,10 @@ class AssociationTask(pipeBase.Task):
ConfigClass = AssociationConfig
_DefaultName = "association"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.config.doTrailedSourceFilter:
self.makeSubtask("trailedSourceFilter")

@timeMethod
def run(self,
diaSources,
diaObjects,
exposure_time=None):
diaObjects):
"""Associate the new DiaSources with existing DiaObjects.
Parameters
Expand All @@ -93,8 +73,6 @@ def run(self,
New DIASources to be associated with existing DIAObjects.
diaObjects : `pandas.DataFrame`
Existing diaObjects from the Apdb.
exposure_time : `float`, optional
Exposure time from difference image.
Returns
-------
Expand All @@ -112,30 +90,15 @@ def run(self,
matched to new DiaSources. (`int`)
- ``nUnassociatedDiaObjects`` : Number of DiaObjects that were
not matched a new DiaSource. (`int`)
- ``longTrailedSources`` : DiaSources which have trail lengths
greater than max_trail_length/second*exposure_time.
(`pandas.DataFrame``)
"""
diaSources = self.check_dia_source_radec(diaSources)

if self.config.doTrailedSourceFilter:
diaTrailedResult = self.trailedSourceFilter.run(diaSources, exposure_time)
diaSources = diaTrailedResult.diaSources
longTrailedSources = diaTrailedResult.longTrailedDiaSources

self.log.info("%i DiaSources exceed max_trail_length, dropping from source "
"catalog." % len(diaTrailedResult.longTrailedDiaSources))
self.metadata.add("num_filtered", len(diaTrailedResult.longTrailedDiaSources))
else:
longTrailedSources = pd.DataFrame(columns=diaSources.columns)

if len(diaObjects) == 0:
return pipeBase.Struct(
matchedDiaSources=pd.DataFrame(columns=diaSources.columns),
unAssocDiaSources=diaSources,
nUpdatedDiaObjects=0,
nUnassociatedDiaObjects=0,
longTrailedSources=longTrailedSources)
nUnassociatedDiaObjects=0)

matchResult = self.associate_sources(diaObjects, diaSources)

Expand All @@ -145,8 +108,7 @@ def run(self,
matchedDiaSources=matchResult.diaSources[mask].reset_index(drop=True),
unAssocDiaSources=matchResult.diaSources[~mask].reset_index(drop=True),
nUpdatedDiaObjects=matchResult.nUpdatedDiaObjects,
nUnassociatedDiaObjects=matchResult.nUnassociatedDiaObjects,
longTrailedSources=longTrailedSources)
nUnassociatedDiaObjects=matchResult.nUnassociatedDiaObjects)

def check_dia_source_radec(self, dia_sources):
"""Check that all DiaSources have non-NaN values for RA/DEC.
Expand Down
26 changes: 7 additions & 19 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,22 @@

import warnings

import numpy as np
import pandas as pd

from lsst.daf.base import DateTime
import lsst.dax.apdb as daxApdb
from lsst.meas.base import DetectorVisitIdGeneratorConfig, DiaObjectCalculationTask
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.pipe.base.connectionTypes as connTypes
from lsst.utils.timer import timeMethod

import numpy as np
import pandas as pd
from lsst.ap.association import (
AssociationTask,
DiaForcedSourceTask,
LoadDiaCatalogsTask,
PackageAlertsTask)
from lsst.ap.association.ssoAssociation import SolarSystemAssociationTask
from lsst.daf.base import DateTime
from lsst.meas.base import DetectorVisitIdGeneratorConfig, \
DiaObjectCalculationTask
from lsst.utils.timer import timeMethod


class DiaPipelineConnections(
Expand Down Expand Up @@ -119,12 +118,6 @@ class DiaPipelineConnections(
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
)
longTrailedSources = pipeBase.connectionTypes.Output(
doc="Optional output temporarily storing long trailed diaSources.",
dimensions=("instrument", "visit", "detector"),
storageClass="DataFrame",
name="{fakesType}{coaddName}Diff_longTrailedSrc",
)

def __init__(self, *, config=None):
super().__init__(config=config)
Expand All @@ -137,8 +130,6 @@ def __init__(self, *, config=None):
self.outputs.remove("diaForcedSources")
if not config.doSolarSystemAssociation:
self.inputs.remove("solarSystemObjectTable")
if not config.associator.doTrailedSourceFilter:
self.outputs.remove("longTrailedSources")

def adjustQuantum(self, inputs, outputs, label, dataId):
"""Override to make adjustments to `lsst.daf.butler.DatasetRef` objects
Expand Down Expand Up @@ -438,10 +429,8 @@ def run(self,
buffer=self.config.imagePixelMargin)
else:
diaObjects = loaderResult.diaObjects

# Associate new DiaSources with existing DiaObjects.
assocResults = self.associator.run(diaSourceTable, diaObjects,
exposure_time=diffIm.visitInfo.exposureTime)
assocResults = self.associator.run(diaSourceTable, diaObjects)

if self.config.doSolarSystemAssociation:
ssoAssocResult = self.solarSystemAssociator.run(
Expand Down Expand Up @@ -627,7 +616,6 @@ def run(self,
associatedDiaSources=associatedDiaSources,
diaForcedSources=diaForcedSources,
diaObjects=diaObjects,
longTrailedSources=assocResults.longTrailedSources
)

def createNewDiaObjects(self, unAssocDiaSources):
Expand Down
118 changes: 112 additions & 6 deletions python/lsst/ap/association/filterDiaSourceCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,28 @@ class FilterDiaSourceCatalogConnections(
dimensions={"instrument", "visit", "detector"},
)

diffImVisitInfo = connTypes.Input(
doc="VisitInfo of diffIm.",
name="{fakesType}{coaddName}Diff_differenceExp.visitInfo",
storageClass="VisitInfo",
dimensions=("instrument", "visit", "detector"),
)

longTrailedSources = connTypes.Output(
doc="Optional output temporarily storing long trailed diaSources.",
dimensions=("instrument", "visit", "detector"),
storageClass="ArrowAstropy",
name="{fakesType}{coaddName}Diff_longTrailedSrc",
)

def __init__(self, *, config=None):
super().__init__(config=config)
if not self.config.doWriteRejectedSources:
if not self.config.doWriteRejectedSkySources:
self.outputs.remove("rejectedDiaSources")
if not self.config.doTrailedSourceFilter:
self.outputs.remove("longTrailedSources")
if not self.config.doWriteTrailedSources:
self.outputs.remove("longTrailedSources")


class FilterDiaSourceCatalogConfig(
Expand All @@ -79,13 +97,37 @@ class FilterDiaSourceCatalogConfig(
"removed before storing the output DiaSource catalog.",
)

doWriteRejectedSources = pexConfig.Field(
doWriteRejectedSkySources = pexConfig.Field(
dtype=bool,
default=True,
doc="Store the output DiaSource catalog containing all the rejected "
"sky sources."
)

doTrailedSourceFilter = pexConfig.Field(
doc="Run trailedSourceFilter to remove long trailed sources from the"
"diaSource output catalog.",
dtype=bool,
default=True,
)

doWriteTrailedSources = pexConfig.Field(
doc="Write trailed diaSources sources to a table.",
dtype=bool,
default=True,
deprecated="Trailed sources will not be written out during production."
)

max_trail_length = pexConfig.Field(
dtype=float,
doc="Length of long trailed sources to remove from the input catalog, "
"in arcseconds per second. Default comes from DMTN-199, which "
"requires removal of sources with trails longer than 10 "
"degrees/day, which is 36000/3600/24 arcsec/second, or roughly"
"0.416 arcseconds per second.",
default=36000/3600.0/24.0,
)


class FilterDiaSourceCatalogTask(pipeBase.PipelineTask):
"""Filter out sky sources from a DiaSource catalog."""
Expand All @@ -94,13 +136,15 @@ class FilterDiaSourceCatalogTask(pipeBase.PipelineTask):
_DefaultName = "filterDiaSourceCatalog"

@timeMethod
def run(self, diaSourceCat):
def run(self, diaSourceCat, diffImVisitInfo):
"""Filter sky sources from the supplied DiaSource catalog.
Parameters
----------
diaSourceCat : `lsst.afw.table.SourceCatalog`
Catalog of sources measured on the difference image.
diffImVisitInfo: `lsst.afw.image.VisitInfo`
VisitInfo for the difference image corresponding to diaSourceCat.
Returns
-------
Expand All @@ -109,9 +153,13 @@ def run(self, diaSourceCat):
``filteredDiaSourceCat`` : `lsst.afw.table.SourceCatalog`
The catalog of filtered sources.
``rejectedDiaSources`` : `lsst.afw.table.SourceCatalog`
The catalog of rejected sources.
The catalog of rejected sky sources.
``longTrailedDiaSources`` : `astropy.table.Table`
DiaSources which have trail lengths greater than
max_trail_length*exposure_time.
"""
rejectedSkySources = None
exposure_time = diffImVisitInfo.exposureTime
if self.config.doRemoveSkySources:
sky_source_column = diaSourceCat["sky_source"]
num_sky_sources = np.sum(sky_source_column)
Expand All @@ -120,6 +168,64 @@ def run(self, diaSourceCat):
self.log.info(f"Filtered {num_sky_sources} sky sources.")
if not rejectedSkySources:
rejectedSkySources = SourceCatalog(diaSourceCat.getSchema())
filterResults = pipeBase.Struct(filteredDiaSourceCat=diaSourceCat,
rejectedDiaSources=rejectedSkySources)

if self.config.doTrailedSourceFilter:
trail_mask = self._check_dia_source_trail(diaSourceCat, exposure_time)
longTrailedDiaSources = diaSourceCat[trail_mask].copy(deep=True)
diaSourceCat = diaSourceCat[~trail_mask]

self.log.info("%i DiaSources exceed max_trail_length %f arcseconds per second, "
"dropping from source catalog."
% (self.config.max_trail_length, len(diaSourceCat)))
self.metadata.add("num_filtered", len(longTrailedDiaSources))

if self.config.doWriteTrailedSources:
filterResults = pipeBase.Struct(filteredDiaSourceCat=diaSourceCat,
rejectedDiaSources=rejectedSkySources,
longTrailedSources=longTrailedDiaSources.asAstropy())
else:
filterResults = pipeBase.Struct(filteredDiaSourceCat=diaSourceCat,
rejectedDiaSources=rejectedSkySources)

else:
filterResults = pipeBase.Struct(filteredDiaSourceCat=diaSourceCat,
rejectedDiaSources=rejectedSkySources)

return filterResults

def _check_dia_source_trail(self, dia_sources, exposure_time):
"""Find DiaSources that have long trails or trails with indeterminant
end points.
Return a mask of sources with lengths greater than
(``config.max_trail_length`` multiplied by the exposure time)
arcseconds.
Additionally, set mask if
``ext_trailedSources_Naive_flag_off_image`` is set or if
``ext_trailedSources_Naive_flag_suspect_long_trail`` and
``ext_trailedSources_Naive_flag_edge`` are both set.
Parameters
----------
dia_sources : `pandas.DataFrame`
Input diaSources to check for trail lengths.
exposure_time : `float`
Exposure time from difference image.
Returns
-------
trail_mask : `pandas.DataFrame`
Boolean mask for diaSources which are greater than the
Boolean mask for diaSources which are greater than the
cutoff length or have trails which extend beyond the edge of the
detector (off_image set). Also checks if both
suspect_long_trail and edge are set and masks those sources out.
"""
print(dia_sources.getSchema())

This comment has been minimized.

Copy link
@kfindeisen

kfindeisen May 9, 2024

Member

@bsmartradio, was this intended to be a permanent part of the task? If so, can I change it to a debug log?

The bare print is getting shredded in Loki, and that's making a mess of PP's dashboards (especially since some of the field descriptions have keywords like "error" or "fatal").

This comment has been minimized.

Copy link
@bsmartradio

bsmartradio May 9, 2024

Author Contributor

Oh darn it, no. It was definitely not meant to be there and was missed for removal.

This comment has been minimized.

Copy link
@kfindeisen

kfindeisen May 9, 2024

Member

Filed as DM-44279.

trail_mask = (dia_sources["ext_trailedSources_Naive_length"]
>= (self.config.max_trail_length*exposure_time))
trail_mask |= dia_sources['ext_trailedSources_Naive_flag_off_image']
trail_mask |= (dia_sources['ext_trailedSources_Naive_flag_suspect_long_trail']
& dia_sources['ext_trailedSources_Naive_flag_edge'])

return trail_mask
Loading

0 comments on commit 19cb1d5

Please sign in to comment.