diff --git a/ctapipe/containers.py b/ctapipe/containers.py index cb4282c5d4c..ae8d97101b0 100644 --- a/ctapipe/containers.py +++ b/ctapipe/containers.py @@ -34,6 +34,7 @@ "MorphologyContainer", "BaseHillasParametersContainer", "CameraHillasParametersContainer", + "ImageFitParametersContainer", "CameraTimingParametersContainer", "ParticleClassificationContainer", "PedestalContainer", @@ -308,6 +309,37 @@ class HillasParametersContainer(BaseHillasParametersContainer): psi = Field(nan * u.deg, "rotation angle of ellipse", unit=u.deg) +class ImageFitParametersContainer(HillasParametersContainer): + """ + Hillas Parameters after fitting. The cog position + is given in meter from the camera center. + """ + + skewness_uncertainty = Field(nan, "measure of skewness uncertainty") + likelihood = Field(nan, "measure of likelihood") + goodness_of_fit = Field( + nan, "measure of goodness of fit, mean likelihood subtracted to the likelihood" + ) + n_pixels = Field(-1, "number of pixels used in the fit") + free_parameters = Field(-1, "number of free parameters") + is_valid = Field(False, "True if the fit is valid") + is_accurate = Field( + False, "returns True if the fit is accurate. If False, the fit is not reliable." + ) + + fov_lon_uncertainty = Field(nan * u.deg, "centroid x uncertainty", unit=u.deg) + fov_lat_uncertainty = Field(nan * u.deg, "centroid y uncertainty", unit=u.deg) + r_uncertainty = Field(nan * u.deg, "centroid r uncertainty", unit=u.deg) + phi_uncertainty = Field( + nan * u.deg, "polar coordinate of centroid uncertainty", unit=u.deg + ) + psi_uncertainty = Field( + nan * u.deg, "Uncertainty in rotation angle of ellipse", unit=u.deg + ) + amplitude = Field(nan, "Amplitude of the fitted model") + amplitude_uncertainty = Field(nan, "error in amplitude from the fit") + + class LeakageContainer(Container): """ Fraction of signal in 1 or 2-pixel width border from the edge of the diff --git a/ctapipe/image/concentration.py b/ctapipe/image/concentration.py index fcd78cc5132..d9ede5c67d6 100644 --- a/ctapipe/image/concentration.py +++ b/ctapipe/image/concentration.py @@ -1,14 +1,14 @@ -import numpy as np import astropy.units as u +import numpy as np from ..containers import ( + CameraHillasParametersContainer, ConcentrationContainer, HillasParametersContainer, - CameraHillasParametersContainer, ) -from .hillas import camera_to_shower_coordinates from ..instrument import CameraGeometry from ..utils.quantities import all_to_value +from .hillas import camera_to_shower_coordinates __all__ = ["concentration_parameters"] @@ -53,7 +53,7 @@ def concentration_parameters(geom: CameraGeometry, image, hillas_parameters): delta_y = pix_y - y # take pixels within one pixel diameter from the cog - mask_cog = (delta_x ** 2 + delta_y ** 2) < pixel_width ** 2 + mask_cog = (delta_x**2 + delta_y**2) < pixel_width**2 conc_cog = np.sum(image[mask_cog]) / h.intensity if hillas_parameters.width.value != 0: @@ -61,7 +61,7 @@ def concentration_parameters(geom: CameraGeometry, image, hillas_parameters): longi, trans = camera_to_shower_coordinates( pix_x, pix_y, x, y, h.psi.to_value(u.rad) ) - mask_core = (longi ** 2 / length ** 2) + (trans ** 2 / width ** 2) <= 1.0 + mask_core = (longi**2 / length**2) + (trans**2 / width**2) <= 1.0 conc_core = image[mask_core].sum() / h.intensity else: conc_core = 0.0 diff --git a/ctapipe/image/ellipsoid.py b/ctapipe/image/ellipsoid.py new file mode 100644 index 00000000000..7f3eb1444aa --- /dev/null +++ b/ctapipe/image/ellipsoid.py @@ -0,0 +1,376 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +# -*- coding: UTF-8 -*- +""" +Shower image parametrization based on image fitting. +""" + +from enum import Enum + +import astropy.units as u +import numpy as np +from astropy.coordinates import Angle +from iminuit import Minuit + +from ctapipe.image.cleaning import dilate +from ctapipe.image.hillas import hillas_parameters +from ctapipe.image.pixel_likelihood import ( + mean_poisson_likelihood_gaussian, + neg_log_likelihood_approx, +) +from ctapipe.image.toymodel import SkewedGaussian + +from ..containers import ImageFitParametersContainer + +PED_TABLE = { + "LSTCam": 2.8, + "NectarCam": 2.3, + "FlashCam": 2.3, + "SST-Camera": 0.5, + "testcam": 0.5, +} +SPE_WIDTH = 0.5 + +__all__ = [ + "image_fit_parameters", + "ImageFitParameterizationError", + "PDFType", +] + + +class PDFType(Enum): + gaussian = "gaussian" + skewed = "skewed" + + +def create_initial_guess(geometry, hillas, size): + """ + This function computes the seeds of the fit with the Hillas parameters + + Parameters + ---------- + geometry: ctapipe.instrument.CameraGeometry + Camera geometry, the cleaning mask should be applied to improve performance + hillas: HillasParametersContainer + Hillas parameters + size: float/int + Total charge after cleaning and dilation + + Returns + ------- + initial_guess: dict + Seed parameters of the fit. + """ + unit = geometry.pix_x.unit + initial_guess = {} + + initial_guess["cog_x"] = hillas.fov_lon.to_value(unit) + initial_guess["cog_y"] = hillas.fov_lat.to_value(unit) + + initial_guess["length"] = hillas.length.to_value(unit) + initial_guess["width"] = hillas.width.to_value(unit) + initial_guess["psi"] = hillas.psi.value + + initial_guess["skewness"] = hillas.skewness + + if (hillas.width.value == 0) or (hillas.length.value == 0): + raise ImageFitParameterizationError("Hillas width and/or length is zero") + + initial_guess["amplitude"] = size + return initial_guess + + +def boundaries(geometry, image, dilated_mask, hillas, pdf): + """ + Computes the boundaries of the fit. + + Parameters + ---------- + geometry : ctapipe.instrument.CameraGeometry + Camera geometry + image : ndarray + Charge in each pixel, no cleaning mask should be applied + dilated_mask : ndarray + mask (array of booleans) after image cleaning and dilation + hillas : HillasParametersContainer + Hillas parameters + pdf: PDFType instance + e.g. PDFType("gaussian") + + Returns + ------- + list of boundaries + """ + x = geometry.pix_x.value + y = geometry.pix_y.value + unit = geometry.pix_x.unit + camera_radius = geometry.radius.to_value(unit) + + max_x = np.max(x[dilated_mask]) + min_x = np.min(x[dilated_mask]) + max_y = np.max(y[dilated_mask]) + min_y = np.min(y[dilated_mask]) + + ang_unit = hillas.psi.unit + psi_unc = 10 * unit + psi_min, psi_max = max( + hillas.psi.value - psi_unc.to_value(ang_unit), -np.pi / 2 + ), min(hillas.psi.value + psi_unc.to_value(ang_unit), np.pi / 2) + + cogx_min, cogx_max = np.sign(min_x) * min(np.abs(min_x), camera_radius), np.sign( + max_x + ) * min(np.abs(max_x), camera_radius) + + cogy_min, cogy_max = np.sign(min_y) * min(np.abs(min_y), camera_radius), np.sign( + max_y + ) * min(np.abs(max_y), camera_radius) + + long_dis = 2 * np.sqrt( + (max_x - min_x) ** 2 + (max_y - min_y) ** 2 + ) # maximum distance of the shower in the longitudinal direction + + width_unc = 0.1 * unit + length_min, length_max = hillas.length.to_value(unit), long_dis + width_min, width_max = hillas.width.to_value(unit), hillas.width.to_value( + unit + ) + width_unc.to_value(unit) + + scale = length_min / np.sqrt(1 - 2 / np.pi) + skew_min, skew_max = min(max(-0.99, hillas.skewness - 0.3), 0.99), max( + -0.99, min(0.99, hillas.skewness + 0.3) + ) # Guess from Hillas unit tests + + if pdf == PDFType.gaussian: + amplitude = np.sum(image[(dilated_mask) & (image > 0)]) / ( + 2 * np.pi * width_min * length_min + ) + else: + amplitude = ( + np.sum(image[(dilated_mask) & (image > 0)]) + / scale + / (2 * np.pi * width_min) + ) + + bounds = [ + (cogx_min, cogx_max), + (cogy_min, cogy_max), + (psi_min, psi_max), + (length_min, length_max), + (width_min, width_max), + (skew_min, skew_max), + (0, amplitude), + ] + + return bounds + + +class ImageFitParameterizationError(RuntimeError): + pass + + +def image_fit_parameters( + geom, + image, + n_row, + cleaned_mask, + pdf=PDFType.skewed, + bounds=None, +): + """ + Computes image parameters for a given shower image. + Implementation based on https://arxiv.org/pdf/1211.0254.pdf + + Parameters + ---------- + geom : ctapipe.instrument.CameraGeometry + Camera geometry + image : ndarray + Charge in each pixel, no cleaning mask should be applied + bounds : list + default format: [(low_x, high_x), (low_y, high_y), ...] + Boundary conditions. If bounds == None, boundaries function is applied as a default. + n_row : int + number of extra rows of neighbors added to the cleaning mask + cleaned_mask : ndarray + Cleaning mask (array of booleans) after cleaning + pdf: PDFType instance + e.g. PDFType("gaussian") + + Returns + ------- + ImageFitParametersContainer: + container of image parameters after fitting + """ + # For likelihood calculation we need the width of the + # pedestal distribution for each pixel + # currently this is not available from the calibration, + # so for now lets hard code it in a dict + + pedestal = PED_TABLE[geom.name] + pdf = PDFType(pdf) + pdf_dict = { + PDFType.gaussian: SkewedGaussian, + PDFType.skewed: SkewedGaussian, + } + + unit = geom.pix_x.unit + pix_x = geom.pix_x + pix_y = geom.pix_y + image = np.asanyarray(image, dtype=np.float64) + + if np.sum(image) == 0.0: + raise ImageFitParameterizationError("size=0, cannot calculate HillasParameters") + + if isinstance(image, np.ma.masked_array): + image = np.ma.filled(image, 0) + + if not (pix_x.shape == pix_y.shape == image.shape == cleaned_mask.shape): + raise ValueError("Image length and number of pixels do not match") + + cleaned_image = image.copy() + cleaned_image[~cleaned_mask] = 0.0 + cleaned_image[cleaned_image < 0] = 0.0 + + dilated_mask = cleaned_mask.copy() + for row in range(n_row): + dilated_mask = dilate(geom, dilated_mask) + + dilated_image = image.copy() + dilated_image[~dilated_mask] = 0.0 + dilated_image[dilated_image < 0] = 0.0 + size = np.sum(dilated_image) + + hillas = hillas_parameters(geom, cleaned_image) + + x0 = create_initial_guess(geom, hillas, size) # seeds + + if np.count_nonzero(image) <= len(x0): + raise ImageFitParameterizationError( + "The number of free parameters is higher than the number of pixels to fit, cannot perform fit" + ) + + def likelihood(cog_x, cog_y, psi, length, width, skewness, amplitude): + parameters = [ + cog_x * unit, + cog_y * unit, + length * unit, + width * unit, + psi * u.rad, + skewness, + amplitude, + ] + + prediction = pdf_dict[pdf](*parameters).pdf(geom.pix_x, geom.pix_y) + + prediction[np.isnan(prediction)] = 1e9 + like = neg_log_likelihood_approx(dilated_image, prediction, SPE_WIDTH, pedestal) + if np.isnan(like): + like = 1e9 + return like + + if pdf == PDFType.gaussian: + x0["skewness"] = 0 + + m = Minuit( + likelihood, + **x0, + ) + + if pdf == PDFType.gaussian: + m.fixed = np.zeros(len(x0.values())) + m.fixed[-2] = True + + if bounds is None: + bounds = boundaries(geom, image, dilated_mask, hillas, pdf) + m.limits = bounds + else: + m.limits = bounds + + m.errordef = 1 # neg log likelihood + m.simplex().migrad() + m.hesse() + + likelihood = m.fval + pars = m.values + errors = m.errors + + like_array = likelihood + like_array *= dilated_mask + goodness_of_fit = np.sum( + like_array[like_array > 0] + - mean_poisson_likelihood_gaussian( + pdf_dict[pdf]( + pars[0] * unit, + pars[1] * unit, + pars[3] * unit, + pars[4] * unit, + pars[2] * u.rad, + pars[5], + pars[6], + ).pdf(geom.pix_x, geom.pix_y), + SPE_WIDTH, + pedestal, + ) + ) + + fit_rcog = np.linalg.norm([pars[0], pars[1]]) + fit_phi = np.arctan2(pars[1], pars[0]) + + # The uncertainty in r and phi is calculated by propagating errors of the x and y coordinates + b = pars[1] ** 2 + pars[0] ** 2 + A = (-pars[1] / b) ** 2 + B = (pars[0] / b) ** 2 + fit_phi_err = np.sqrt(A * errors[0] ** 2 + B * errors[1] ** 2) + fit_rcog_err = np.sqrt( + pars[0] ** 2 / b * errors[0] ** 2 + pars[1] ** 2 / b * errors[1] ** 2 + ) + + delta_x = pix_x.value - pars[0] + delta_y = pix_y.value - pars[1] + + # calculate higher order moments along shower axes + longitudinal = delta_x * np.cos(pars[2]) + delta_y * np.sin(pars[2]) + + m3_long = np.average(longitudinal**3, weights=image) + skewness_ = m3_long / pars[3] ** 3 + + m4_long = np.average(longitudinal**4, weights=image) + kurtosis_long = m4_long / pars[3] ** 4 + + if pdf == PDFType.gaussian: + skewness_long = skewness_ + skewness_uncertainty = np.nan + else: + skewness_long = pars[5] + skewness_uncertainty = errors[5] + + amplitude = pars[6] + amplitude_uncertainty = errors[6] + + return ImageFitParametersContainer( + fov_lon=u.Quantity(pars[0], unit), + fov_lon_uncertainty=u.Quantity(errors[0], unit), + fov_lat=u.Quantity(pars[1], unit), + fov_lat_uncertainty=u.Quantity(errors[1], unit), + r=u.Quantity(fit_rcog, unit), + r_uncertainty=u.Quantity(fit_rcog_err, unit), + phi=Angle(fit_phi, unit=u.rad), + phi_uncertainty=Angle(fit_phi_err, unit=u.rad), + intensity=size, + amplitude=amplitude, + amplitude_uncertainty=amplitude_uncertainty, + length=u.Quantity(pars[3], unit), + length_uncertainty=u.Quantity(errors[3], unit), + width=u.Quantity(pars[4], unit), + width_uncertainty=u.Quantity(errors[4], unit), + psi=Angle(pars[2], unit=u.rad), + psi_uncertainty=Angle(errors[2], unit=u.rad), + skewness=skewness_long, + skewness_uncertainty=skewness_uncertainty, + kurtosis=kurtosis_long, + likelihood=likelihood, + goodness_of_fit=goodness_of_fit, + n_pixels=np.count_nonzero(cleaned_image), + free_parameters=m.nfit, + is_valid=m.valid, + is_accurate=m.accurate, + ) diff --git a/ctapipe/image/tests/test_ellipsoid.py b/ctapipe/image/tests/test_ellipsoid.py new file mode 100644 index 00000000000..25b1c8806a8 --- /dev/null +++ b/ctapipe/image/tests/test_ellipsoid.py @@ -0,0 +1,410 @@ +import numpy as np +import numpy.ma as ma +import pytest +from astropy import units as u +from astropy.coordinates import Angle +from numpy import isclose +from numpy.testing import assert_allclose +from pytest import approx + +from ctapipe.containers import ImageFitParametersContainer +from ctapipe.coordinates import TelescopeFrame +from ctapipe.image import hillas_parameters, tailcuts_clean, toymodel +from ctapipe.image.concentration import concentration_parameters +from ctapipe.image.ellipsoid import ( + ImageFitParameterizationError, + PDFType, + boundaries, + image_fit_parameters, +) +from ctapipe.instrument import CameraGeometry, SubarrayDescription + + +def create_sample_image( + psi="-30d", + x=0.05 * u.m, + y=0.05 * u.m, + width=0.1 * u.m, + length=0.2 * u.m, + intensity=1500, + geometry=None, +): + + if geometry is None: + s = SubarrayDescription.read("dataset://gamma_prod5.simtel.zst") + geometry = s.tel[1].camera.geometry + + # make a toymodel shower model + model = toymodel.Gaussian(x=x, y=y, width=width, length=length, psi=psi) + + # generate toymodel image in camera for this shower model. + rng = np.random.default_rng(0) + image, signal, noise = model.generate_image( + geometry, intensity=intensity, nsb_level_pe=3, rng=rng + ) + + # calculate pixels likely containing signal + clean_mask = tailcuts_clean(geometry, image, 10, 5) + + return image, clean_mask + + +def test_boundaries(prod5_lst): + # Test default functin for finding the boundaries of the fit + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + image, clean_mask = create_sample_image(geometry=geom) + + cleaned_image = image.copy() + cleaned_image[~clean_mask] = 0.0 + + hillas = hillas_parameters(geom_nom, cleaned_image) + bounds = boundaries(geom_nom, image, clean_mask, hillas, pdf=PDFType("gaussian")) + + for i in range(len(bounds)): + assert bounds[i][1] > bounds[i][0] # upper limit > lower limit + + +def compare_result(x, y): + if np.isnan(x) and np.isnan(y): + x = 0 + y = 0 + ux = u.Quantity(x) + uy = u.Quantity(y) + assert isclose(ux.value, uy.value) + assert ux.unit == uy.unit + + +def compare_fit_params(fit1, fit2): + fit1_dict = fit1.as_dict() + fit2_dict = fit2.as_dict() + for key in fit1_dict.keys(): + compare_result(fit1_dict[key], fit2_dict[key]) + + +def test_fit_selected(prod5_lst): + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + image, clean_mask = create_sample_image(geometry=geom) + + image_zeros = image.copy() + image_zeros[~clean_mask] = 0.0 + + image_selected = ma.masked_array(image.copy(), mask=~clean_mask) + + results = image_fit_parameters( + geom_nom, + image_zeros, + n_row=2, + cleaned_mask=clean_mask, + ) + results_selected = image_fit_parameters( + geom_nom, + image_selected, + n_row=2, + cleaned_mask=clean_mask, + ) + compare_fit_params(results, results_selected) + + +def test_dilation(prod5_lst): + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + image, clean_mask = create_sample_image(geometry=geom) + + results = image_fit_parameters( + geom_nom, + image, + n_row=0, + cleaned_mask=clean_mask, + ) + + assert_allclose(results.intensity, np.sum(image[clean_mask]), rtol=1e-4, atol=1e-4) + + results = image_fit_parameters( + geom_nom, + image, + n_row=2, + cleaned_mask=clean_mask, + ) + + assert results.intensity > np.sum(image[clean_mask]) + + +def test_imagefit_failure(prod5_lst): + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + blank_image = np.zeros(geom.n_pixels) + + with pytest.raises(ImageFitParameterizationError): + image_fit_parameters( + geom_nom, + blank_image, + n_row=2, + cleaned_mask=(blank_image == 1), + ) + + +def test_fit_container(prod5_lst): + geom = prod5_lst.camera.geometry + image, clean_mask = create_sample_image(psi="0d", geometry=geom) + + geom_telescope_frame = geom.transform_to(TelescopeFrame()) + params = image_fit_parameters( + geom_telescope_frame, + image, + n_row=2, + cleaned_mask=clean_mask, + ) + assert isinstance(params, ImageFitParametersContainer) + + +def test_truncated(prod5_lst): + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + widths = u.Quantity([0.05, 0.06], u.deg) + lengths = u.Quantity([0.4, 0.5], u.deg) + intensity = 5000 + + xs = u.Quantity([0.8, 0.9, -0.9, -0.8], u.deg) + ys = u.Quantity([-0.9, -0.8, 0.8, 0.9], u.deg) + + for x, y, width, length in zip(xs, ys, widths, lengths): + image, clean_mask = create_sample_image( + geometry=geom_nom, x=x, y=y, length=length, width=width, intensity=intensity + ) + cleaned_image = image.copy() + cleaned_image[~clean_mask] = 0.0 + + # Gaussian + result = image_fit_parameters( + geom_nom, image, n_row=2, cleaned_mask=clean_mask, pdf=PDFType("gaussian") + ) + + hillas = hillas_parameters(geom_nom, cleaned_image) + + assert result.length.value > hillas.length.value + + conc_fit = concentration_parameters(geom_nom, cleaned_image, result).core + conc_hillas = concentration_parameters(geom_nom, cleaned_image, hillas).core + + assert conc_fit > conc_hillas + assert conc_fit > 0.4 + + # Skewed + result = image_fit_parameters( + geom_nom, image, n_row=2, cleaned_mask=clean_mask, pdf=PDFType("skewed") + ) + + assert result.length.value > hillas.length.value + + conc_fit = concentration_parameters(geom_nom, cleaned_image, result).core + + assert conc_fit > conc_hillas + assert conc_fit > 0.4 + + +def test_percentage(prod5_lst): + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + + # Gaussian + image, clean_mask = create_sample_image(psi="0d", geometry=geom) + + fit = image_fit_parameters( + geom_nom, image, n_row=2, cleaned_mask=clean_mask, pdf=PDFType("gaussian") + ) + + cleaned_image = image.copy() + cleaned_image[~clean_mask] = 0.0 + conc = concentration_parameters(geom_nom, image, fit) + signal_inside_ellipse = conc.core + + if fit.is_valid and fit.is_accurate: + assert signal_inside_ellipse > 0.3 + + +def test_skewness(prod5_lst): + rng = np.random.default_rng(42) + + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + + widths = u.Quantity([0.04, 0.05, 0.06], u.deg) + lengths = u.Quantity([0.2, 0.3, 0.4], u.deg) + intensities = np.array([500, 1500, 2000]) + + xs = u.Quantity([0.9, 0.9, -0.9, -0.9], u.deg) + ys = u.Quantity([0.9, -0.9, 0.9, -0.9], u.deg) + psis = Angle([-60, -45, 0, 45, 60, 90], unit="deg") + skews = np.array([0, 0.5, -0.5, 0.8, -0.8, 0.9]) + + for x, y, skew, intensity, width, length in zip( + xs, ys, skews, intensities, widths, lengths + ): + for psi in psis: + + model_skewed = toymodel.SkewedGaussian( + x=x, y=y, width=width, length=length, psi=psi, skewness=skew + ) + + image, signal, noise = model_skewed.generate_image( + geom_nom, intensity=intensity, nsb_level_pe=0, rng=rng + ) + + clean_mask = np.array(signal) > 0 + result = image_fit_parameters( + geom_nom, + signal, + n_row=0, + cleaned_mask=clean_mask, + pdf=PDFType("skewed"), + ) + + if result.is_valid or result.is_accurate: + assert u.isclose(result.fov_lon, x, rtol=0.1) + assert u.isclose(result.fov_lat, y, rtol=0.1) + + assert u.isclose(result.width, width, rtol=0.1) + assert u.isclose(result.length, length, rtol=0.1) + + psi_same = result.psi.to_value(u.deg) == approx(psi.deg, abs=2) + psi_opposite = abs(result.psi.to_value(u.deg) - psi.deg) == approx( + 180.0, abs=2 + ) + assert psi_same or psi_opposite + + if psi_same: + assert result.skewness == approx(skew, abs=0.3) + else: + assert result.skewness == approx(-skew, abs=0.3) + + assert signal.sum() == result.intensity + + +def test_gaussian_skewness(prod5_lst): + rng = np.random.default_rng(42) + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + + model_gaussian = toymodel.Gaussian( + x=1.0 * u.deg, + y=1.0 * u.deg, + width=0.05 * u.deg, + length=0.3 * u.deg, + psi=20 * u.deg, + ) + + image, signal, noise = model_gaussian.generate_image( + geom_nom, intensity=1500, nsb_level_pe=0, rng=rng + ) + + clean_mask = np.array(signal) > 0 + result = image_fit_parameters( + geom_nom, signal, n_row=0, cleaned_mask=clean_mask, pdf=PDFType("gaussian") + ) + result_skew = image_fit_parameters( + geom_nom, signal, n_row=0, cleaned_mask=clean_mask, pdf=PDFType("skewed") + ) + assert result.skewness == approx(0, abs=0.1) + assert result_skew.skewness == approx(0, abs=0.1) + + +@pytest.mark.filterwarnings("error") +def test_single_pixel(): + x = y = np.arange(3) + x, y = np.meshgrid(x, y) + + geom = CameraGeometry( + name="testcam", + pix_id=np.arange(9), + pix_x=x.ravel() * u.deg, + pix_y=y.ravel() * u.deg, + pix_type="rectangular", + pix_area=np.full(9, 1.0) * u.deg**2, + ) + + image = np.zeros((3, 3)) + image[1, 1] = 10 + image = image.ravel() + + clean_mask = np.array(image) > 0 + + with pytest.raises(ImageFitParameterizationError): + image_fit_parameters( + geom, image, n_row=2, cleaned_mask=clean_mask, pdf=PDFType("skewed") + ) + + with pytest.raises(ImageFitParameterizationError): + image_fit_parameters( + geom, image, n_row=2, cleaned_mask=clean_mask, pdf=PDFType("gaussian") + ) + + +def test_reconstruction_in_telescope_frame(prod5_lst): + """ + Compare the reconstruction in the telescope + and camera frame. + """ + np.random.seed(42) + + geom = prod5_lst.camera.geometry + telescope_frame = TelescopeFrame() + geom_nom = geom.transform_to(telescope_frame) + + width = 0.04 * u.deg + length = 0.40 * u.deg + intensity = 5000 + + xs = u.Quantity([0.9, 0.9, -0.9, -0.9], u.deg) + ys = u.Quantity([0.9, -0.9, 0.9, -0.9], u.deg) + psis = Angle([-90, -45, 0, 45, 90], unit="deg") + skews = 0.0, 0.2, 0.5 + + for x, y in zip(xs, ys): + for psi in psis: + for skew in skews: + # generate a toy image + model = toymodel.SkewedGaussian( + x=x, y=y, width=width, length=length, psi=psi, skewness=skew + ) + image, signal, noise = model.generate_image( + geom_nom, intensity=intensity, nsb_level_pe=5 + ) + + telescope_result = image_fit_parameters( + geom_nom, + signal, + n_row=0, + cleaned_mask=(np.array(signal) > 0), + pdf=PDFType("skewed"), + ) + + if telescope_result.is_valid or telescope_result.is_accurate: + assert u.isclose(telescope_result.fov_lon, x, rtol=0.02) + assert u.isclose(telescope_result.fov_lat, y, rtol=0.02) + assert u.isclose(telescope_result.length, length, rtol=0.1) + assert u.isclose(telescope_result.width, width, rtol=0.1) + + psi_same = telescope_result.psi.to_value(u.deg) == approx( + psi.deg, abs=1 + ) + psi_opposite = abs( + telescope_result.psi.to_value(u.deg) - psi.deg + ) == approx(180.0, abs=1) + assert psi_same or psi_opposite + + if psi_same: + assert telescope_result.skewness == approx(skew, abs=0.3) + else: + assert telescope_result.skewness == approx(-skew, abs=0.3) + + assert signal.sum() == telescope_result.intensity diff --git a/ctapipe/image/tests/test_hillas.py b/ctapipe/image/tests/test_hillas.py index b3b148c8bc6..b136a53a7f8 100644 --- a/ctapipe/image/tests/test_hillas.py +++ b/ctapipe/image/tests/test_hillas.py @@ -76,7 +76,6 @@ def test_hillas_selected(prod5_lst): results = hillas_parameters(geom, image_zeros) results_selected = hillas_parameters(geom_selected, image_selected) - compare_hillas(results, results_selected) diff --git a/ctapipe/image/toymodel.py b/ctapipe/image/toymodel.py index 7c1575e75c4..67e9b6b1ba0 100644 --- a/ctapipe/image/toymodel.py +++ b/ctapipe/image/toymodel.py @@ -18,13 +18,16 @@ >>> print(image.shape) (400,) """ + from abc import ABCMeta, abstractmethod import astropy.units as u import numpy as np +import scipy +from astropy.coordinates import Angle from numpy.random import default_rng from scipy.ndimage import convolve1d -from scipy.stats import multivariate_normal, norm, skewnorm +from scipy.stats import norm, skewnorm from ctapipe.image.hillas import camera_to_shower_coordinates from ctapipe.utils import linalg @@ -240,7 +243,8 @@ def expected_signal(self, camera, intensity): class Gaussian(ImageModel): - def __init__(self, x, y, length, width, psi): + @u.quantity_input + def __init__(self, x, y, length, width, psi, amplitude=None): """Create 2D Gaussian model for a shower image in a camera. Parameters @@ -253,43 +257,52 @@ def __init__(self, x, y, length, width, psi): length of shower (major axis) psi : u.Quantity[angle] rotation angle about the centroid (0=x-axis) + amplitude : float + normalization amplitude Returns ------- - a `scipy.stats` object - + model : ndarray + 2D Gaussian distribution """ - self.unit = x.unit + self.x = x self.y = y self.width = width self.length = length self.psi = psi + self.amplitude = amplitude + self.unit = self.x.unit + + if self.amplitude is None: + self.amplitude = 1 / ( + 2 + * np.pi + * self.width.to_value(self.unit) + * self.length.to_value(self.unit) + ) + + @u.quantity_input + def pdf(self, x, y): + """2d probability for photon electrons in the camera plane""" + mu = u.Quantity([self.x, self.y]).to_value(self.unit) + rotation = linalg.rotation_matrix_2d(-Angle(self.psi)) + pos = np.column_stack([x.to_value(self.unit), y.to_value(self.unit)]) + long, trans = rotation @ (pos - mu).T - aligned_covariance = np.array( - [ - [self.length.to_value(self.unit) ** 2, 0], - [0, self.width.to_value(self.unit) ** 2], - ] - ) - # rotate by psi angle: C' = R C R+ - rotation = linalg.rotation_matrix_2d(self.psi) - rotated_covariance = rotation @ aligned_covariance @ rotation.T - self.dist = multivariate_normal( - mean=u.Quantity([self.x, self.y]).to_value(self.unit), - cov=rotated_covariance, + gaussian_pdf = self.amplitude * np.exp( + -0.5 * (long) ** 2 / self.length.to_value(self.unit) ** 2 + - 0.5 * (trans) ** 2 / self.width.to_value(self.unit) ** 2 ) - def pdf(self, x, y): - """2d probability for photon electrons in the camera plane""" - X = np.column_stack([x.to_value(self.unit), y.to_value(self.unit)]) - return self.dist.pdf(X) + return gaussian_pdf class SkewedGaussian(ImageModel): """A shower image that has a skewness along the major axis.""" - def __init__(self, x, y, length, width, psi, skewness): + @u.quantity_input + def __init__(self, x, y, length, width, psi, skewness, amplitude=None): """Create 2D skewed Gaussian model for a shower image in a camera. Skewness is only applied along the main shower axis. See https://en.wikipedia.org/wiki/Skew_normal_distribution and @@ -306,10 +319,15 @@ def __init__(self, x, y, length, width, psi, skewness): length of shower (major axis) psi : u.Quantity[angle] rotation angle about the centroid (0=x-axis) + skewness: float + skewness of the shower in longitudinal direction + amplitude : float + normalization amplitude Returns ------- - a `scipy.stats` object + model : ndarray + 2D Skewed Gaussian distribution """ self.unit = x.unit @@ -319,6 +337,8 @@ def __init__(self, x, y, length, width, psi, skewness): self.length = length self.psi = psi self.skewness = skewness + self.amplitude = amplitude + self.unit = self.x.unit a, loc, scale = self._moments_to_parameters() self.long_dist = skewnorm(a=a, loc=loc, scale=scale) @@ -339,11 +359,28 @@ def _moments_to_parameters(self): return a, loc, scale + @u.quantity_input def pdf(self, x, y): """2d probability for photon electrons in the camera plane.""" + mu = u.Quantity([self.x, self.y]).to_value(self.unit) + + rotation = linalg.rotation_matrix_2d(-Angle(self.psi)) pos = np.column_stack([x.to_value(self.unit), y.to_value(self.unit)]) - long, trans = self.rotation @ (pos - self.mu).T - return self.trans_dist.pdf(trans) * self.long_dist.pdf(long) + long, trans = rotation @ (pos - mu).T + + trans_pdf = np.exp(-1 / 2 * (trans) ** 2 / self.width.to_value(self.unit) ** 2) + + a, loc, scale = self._moments_to_parameters() + + if self.amplitude is None: + self.amplitude = 1 / (2 * np.pi * scale * self.width.value) + + return ( + self.amplitude + * trans_pdf + * np.exp(-1 / 2 * ((long - loc) / scale) ** 2) + * (1 + scipy.special.erf(a / np.sqrt(2) * (long - loc) / scale)) + ) class RingGaussian(ImageModel): diff --git a/ctapipe/instrument/camera/geometry.py b/ctapipe/instrument/camera/geometry.py index dd55b14425b..64b20df3592 100644 --- a/ctapipe/instrument/camera/geometry.py +++ b/ctapipe/instrument/camera/geometry.py @@ -232,6 +232,10 @@ def guess_radius(self): (self.pix_x[border] - cx) ** 2 + (self.pix_y[border] - cy) ** 2 ).mean() + @lazyproperty + def radius(self): + return self.guess_radius() + def transform_to(self, frame: BaseCoordinateFrame): """Transform the pixel coordinates stored in this geometry and the pixel and camera rotations to another camera coordinate frame. diff --git a/docs/changes/2275.feature.rst b/docs/changes/2275.feature.rst new file mode 100644 index 00000000000..a08aba229b8 --- /dev/null +++ b/docs/changes/2275.feature.rst @@ -0,0 +1 @@ +Add parametric model fitting algorithm for image parametrization