Skip to content

DM-45569: Add color from fgcm for fitting a PSF color term in Piff. #1111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 129 additions & 6 deletions python/lsst/pipe/tasks/finalizeCharacterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
]

import astropy.table
import astropy.units as u
import numpy as np
import esutil
from smatch.matcher import Matcher


import lsst.pex.config as pexConfig
Expand Down Expand Up @@ -81,6 +83,21 @@ class FinalizeCharacterizationConnectionsBase(
deferLoad=True,
multiple=True,
)
fgcm_standard_star = pipeBase.connectionTypes.Input(
doc=('Catalog of fgcm for color corrections, and indexes to the '
'isolated_star_cats catalogs.'),
name='fgcm_standard_star',
storageClass='ArrowAstropy',
dimensions=('instrument', 'tract', 'skymap'),
deferLoad=True,
multiple=True,
)

def __init__(self, *, config=None):
super().__init__(config=config)

if not self.config.psf_determiner['piff'].useColor:
self.inputs.remove("fgcm_standard_star")


class FinalizeCharacterizationConnections(
Expand Down Expand Up @@ -327,6 +344,9 @@ def __init__(self, initInputs=None, **kwargs):

# Only log warning and fatal errors from the source_selector
self.source_selector.log.setLevel(self.source_selector.log.WARN)
self.isPsfDeterminerPiff = False
if isinstance(self.psf_determiner, lsst.meas.extensions.piff.piffPsfDeterminer.PiffPsfDeterminerTask):
self.isPsfDeterminerPiff = True

def _make_output_schema_mapper(self, input_schema):
"""Make the schema mapper from the input schema to the output schema.
Expand Down Expand Up @@ -401,6 +421,17 @@ def _make_output_schema_mapper(self, input_schema):
type=np.int32,
doc='Detector number for the sources.',
)
output_schema.addField(
'psfColorValue',
type=np.float32,
doc="Color used in PSF fit."
)
output_schema.addField(
'psfColorType',
type=str,
size=10,
doc="Color used in PSF fit."
)

alias_map = input_schema.getAliasMap()
alias_map_output = afwTable.AliasMap()
Expand Down Expand Up @@ -434,6 +465,18 @@ def _make_selection_schema_mapper(self, input_schema):

selection_schema.setAliasMap(input_schema.getAliasMap())

selection_schema.addField(
'psfColorValue',
type=np.float32,
doc="Color used in PSF fit."
)
selection_schema.addField(
'psfColorType',
type=str,
size=10,
doc="Color used in PSF fit."
)

return mapper, selection_schema

def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict):
Expand Down Expand Up @@ -542,7 +585,31 @@ def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_

return isolated_table, isolated_source_table

def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table):
def add_src_colors(self, srcCat, fgcmCat, band):

if self.isPsfDeterminerPiff and fgcmCat is not None:

raSrc = (srcCat['coord_ra'] * u.radian).to(u.degree).value
decSrc = (srcCat['coord_dec'] * u.radian).to(u.degree).value

with Matcher(raSrc, decSrc) as matcher:
idx, idxSrcCat, idxColorCat, d = matcher.query_radius(
fgcmCat["ra"],
fgcmCat["dec"],
1. / 3600.0,
return_indices=True,
)

magStr1 = self.psf_determiner.config.color[band][0]
magStr2 = self.psf_determiner.config.color[band][2]
colors = fgcmCat[f'mag_{magStr1}'] - fgcmCat[f'mag_{magStr2}']

for idSrc, idColor in zip(idxSrcCat, idxColorCat):
srcCat[idSrc]['psfColorValue'] = colors[idColor]
srcCat[idSrc]['psfColorType'] = f"{magStr1}-{magStr2}"

def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src,
isolated_source_table, fgcm_standard_star_cat):
"""Compute psf model and aperture correction map for a single exposure.

Parameters
Expand Down Expand Up @@ -622,6 +689,12 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
# We need to copy over the calib_psf flags because they were not in the mapper
measured_src['calib_psf_candidate'] = selected_src['calib_psf_candidate']
measured_src['calib_psf_reserved'] = selected_src['calib_psf_reserved']
if hasattr(exposure.filter, "bandLabel"):
band = exposure.filter.bandLabel
else:
band = None
self.add_src_colors(selected_src, fgcm_standard_star_cat, band)
self.add_src_colors(measured_src, fgcm_standard_star_cat, band)

# Select the psf candidates from the selection catalog
try:
Expand All @@ -639,6 +712,7 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
in zip(psf_selection_result.psfCandidates,
~psf_cand_cat['calib_psf_reserved']) if use]
flag_key = psf_cand_cat.schema['calib_psf_used'].asKey()

try:
psf, cell_set = self.psf_determiner.determinePsf(exposure,
psf_determiner_list,
Expand All @@ -649,7 +723,7 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
visit, detector, e)
return None, None, measured_src
# Verify that the PSF is usable by downstream tasks
sigma = psf.computeShape(psf.getAveragePosition()).getDeterminantRadius()
sigma = psf.computeShape(psf.getAveragePosition(), psf.getAverageColor()).getDeterminantRadius()
if np.isnan(sigma):
self.log.warning('Failed to determine psf for visit %d, detector %d: '
'Computed final PSF size is NAN.',
Expand Down Expand Up @@ -727,19 +801,29 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
tract in sorted(isolated_star_source_dict_temp.keys())}

if self.config.psf_determiner['piff'].useColor:
fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle
for handle in input_handle_dict['fgcm_standard_star']}
fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for
tract in sorted(fgcm_standard_star_dict_temp.keys())}
else:
fgcm_standard_star_dict = None

struct = self.run(
visit,
band,
isolated_star_cat_dict,
isolated_star_source_dict,
fgcm_standard_star_dict,
src_dict,
calexp_dict,
)

butlerQC.put(struct.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat)
butlerQC.put(struct.output_table, outputRefs.finalized_src_table)

def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict,
fgcm_standard_star_dict, src_dict, calexp_dict):
"""
Run the FinalizeCharacterizationTask.

Expand Down Expand Up @@ -807,6 +891,19 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
measured_src_tables = []
measured_src_table = None

if fgcm_standard_star_dict is not None:

fgcm_standard_star_cat = []

for tract in fgcm_standard_star_dict:
astropy_fgcm = fgcm_standard_star_dict[tract].get()
table_fgcm = np.asarray(astropy_fgcm)
fgcm_standard_star_cat.append(table_fgcm)

fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat)
else:
fgcm_standard_star_cat = None

self.log.info("Running finalizeCharacterization on %d detectors.", len(detector_keys))
for detector in detector_keys:
self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)
Expand All @@ -818,7 +915,8 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
detector,
exposure,
src,
isolated_source_table
isolated_source_table,
fgcm_standard_star_cat,
)

# And now we package it together...
Expand Down Expand Up @@ -870,12 +968,21 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
tract in sorted(isolated_star_source_dict_temp.keys())}

if self.config.psf_determiner['piff'].useColor:
fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle
for handle in input_handle_dict['fgcm_standard_star']}
fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for
tract in sorted(fgcm_standard_star_dict_temp.keys())}
else:
fgcm_standard_star_dict = None

struct = self.run(
visit,
band,
detector,
isolated_star_cat_dict,
isolated_star_source_dict,
fgcm_standard_star_dict,
input_handle_dict['src'],
input_handle_dict['calexp'],
)
Expand All @@ -889,7 +996,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
outputRefs.finalized_src_detector_table,
)

def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, src, exposure):
def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict,
fgcm_standard_star_dict, src, exposure):
"""
Run the FinalizeCharacterizationDetectorTask.

Expand All @@ -905,6 +1013,8 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc
Per-tract dict of isolated star catalog handles.
isolated_star_source_dict : `dict`
Per-tract dict of isolated star source catalog handles.
fgcm_standard_star_dict : `dict`
Per-tract dict of fgcm photometry catalog handles.
src : `lsst.afw.table.SourceCatalog`
Src catalog.
exposure : `lsst.afw.image.Exposure`
Expand All @@ -925,7 +1035,7 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc
_, isolated_source_table = self.concat_isolated_star_cats(
band,
isolated_star_cat_dict,
isolated_star_source_dict
isolated_star_source_dict,
)

if isolated_source_table is None:
Expand All @@ -943,12 +1053,25 @@ def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_sourc

self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)

if fgcm_standard_star_dict is not None:
fgcm_standard_star_cat = []

for tract in fgcm_standard_star_dict:
astropy_fgcm = fgcm_standard_star_dict[tract].get()
table_fgcm = np.asarray(astropy_fgcm)
fgcm_standard_star_cat.append(table_fgcm)

fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat)
else:
fgcm_standard_star_cat = None

psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map(
visit,
detector,
exposure,
src,
isolated_source_table,
fgcm_standard_star_cat,
)

# And now we package it together...
Expand Down
30 changes: 27 additions & 3 deletions tests/test_finalizeCharacterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ def compute_psf_and_ap_corr_map(
exposure,
src,
isolated_src_table,
fgcm_standard_star_cat,
use_super=False,
):
"""A mocked version of this method."""
if use_super:
return super().compute_psf_and_ap_corr_map(visit, detector, exposure, src, isolated_src_table)
return super().compute_psf_and_ap_corr_map(visit, detector, exposure, src,
isolated_src_table, fgcm_standard_star_cat)

return _make_dummy_psf_and_ap_corr_map()

Expand All @@ -98,6 +100,7 @@ def compute_psf_and_ap_corr_map(
exposure,
src,
isolated_src_table,
fgcm_standard_star_cat,
use_super=False,
):
"""A mocked version of this method."""
Expand Down Expand Up @@ -125,7 +128,7 @@ def setUp(self):
config=config_det,
)

self.isolated_star_cat_dict, self.isolated_star_source_dict = self._make_isocats()
self.isolated_star_cat_dict, self.isolated_star_source_dict, self.fgcm_cat = self._make_isocats()

def _make_isocats(self):
"""Make test isolated star catalogs.
Expand Down Expand Up @@ -153,8 +156,16 @@ def _make_isocats(self):
dtype_source = [('sourceId', 'i8'),
('obj_index', 'i4')]

dtype_fgcm = [
('ra', 'f8'),
('dec', 'f8'),
('mag_g', 'f8'),
('mag_i', 'f8'),
]

isolated_star_cat_dict = {}
isolated_star_source_dict = {}
fgcm_cat_dict = {}

np.random.seed(12345)

Expand Down Expand Up @@ -212,14 +223,22 @@ def _make_isocats(self):

counter += nsource_per_band_per_star

fgcm_cat = np.zeros(nstar, dtype=dtype_fgcm)
fgcm_cat['ra'] = ra
fgcm_cat['dec'] = dec
fgcm_cat['mag_g'] = np.random.uniform(low=16, high=20, size=nstar)
fgcm_cat['mag_i'] = np.random.uniform(low=16, high=20, size=nstar)

source_cat = np.concatenate(source_cats)

isolated_star_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(cat),
storageClass="ArrowAstropy")
isolated_star_source_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(source_cat),
storageClass="ArrowAstropy")
fgcm_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(fgcm_cat),
storageClass="ArrowAstropy")

return isolated_star_cat_dict, isolated_star_source_dict
return isolated_star_cat_dict, isolated_star_source_dict, fgcm_cat_dict

def test_concat_isolated_star_cats(self):
"""Test concatenation and reservation of the isolated star catalogs.
Expand Down Expand Up @@ -290,13 +309,15 @@ def test_compute_psf_and_ap_corr_map_no_sources(self):
detector = 0
exposure = None
isolated_source_table = None
fgcm_cat = None
with self.assertLogs(level=logging.WARNING) as cm:
psf, ap_corr_map, measured_src = self.finalizeCharacterizationTask.compute_psf_and_ap_corr_map(
visit,
detector,
exposure,
src,
isolated_source_table,
fgcm_cat,
use_super=True,
)
self.assertIn(
Expand Down Expand Up @@ -333,6 +354,7 @@ def test_run_visit(self):
band,
self.isolated_star_cat_dict,
self.isolated_star_source_dict,
self.fgcm_cat,
src_dict,
calexp_dict,
)
Expand Down Expand Up @@ -369,6 +391,7 @@ def test_run_detectors(self):
detector0,
self.isolated_star_cat_dict,
self.isolated_star_source_dict,
self.fgcm_cat,
src0,
calexp0,
)
Expand All @@ -379,6 +402,7 @@ def test_run_detectors(self):
detector1,
self.isolated_star_cat_dict,
self.isolated_star_source_dict,
self.fgcm_cat,
src1,
calexp1,
)
Expand Down