Skip to content

Commit

Permalink
rebase main branch and reformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
clara-escanuela committed Mar 16, 2023
1 parent 30abb64 commit bfef038
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 75 deletions.
200 changes: 136 additions & 64 deletions ctapipe/image/ellipsoid.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,20 @@
import numpy as np
from astropy.coordinates import Angle
from ctapipe.image.cleaning import dilate
import scipy.optimize as opt
from ctapipe.image.hillas import hillas_parameters
from ctapipe.image.pixel_likelihood import neg_log_likelihood_approx
from ctapipe.image.toymodel import Gaussian, SkewedCauchy, SkewedGaussian
from iminuit import Minuit
from ctapipe.image.pixel_likelihood import chi_squared, neg_log_likelihood_approx, neg_log_likelihood_numeric
from ctapipe.image.hillas import hillas_parameters, camera_to_shower_coordinates
from ctapipe.image.toymodel import SkewedCauchy, SkewedGaussian, Gaussian

from ..containers import CameraImageFitParametersContainer, ImageFitParametersContainer
from itertools import combinations
from ctapipe.image.leakage import leakage_parameters
from ctapipe.image.concentration import concentration_parameters

__all__ = ["image_fit_parameters", "ImageFitParameterizationError"]


def create_initial_guess(geometry, image, pdf):
"""
This function computes the initial guess for the image fit using the Hillas parameters
Parameters
----------
geometry : ctapipe.instrument.CameraGeometry
Expand All @@ -36,10 +34,10 @@ def create_initial_guess(geometry, image, pdf):
Returns
-------
initial_guess : Hillas parameters
initial_guess : Hillas parameters
"""
hillas = hillas_parameters(geometry, image)

initial_guess = {}
initial_guess["x"] = hillas.x
initial_guess["y"] = hillas.y
Expand All @@ -48,22 +46,31 @@ def create_initial_guess(geometry, image, pdf):
initial_guess["psi"] = hillas.psi
initial_guess["skewness"] = hillas.skewness
skew23 = np.abs(hillas.skewness) ** (2 / 3)
delta = np.sign(hillas.skewness) * np.sqrt((np.pi / 2 * skew23) / (skew23 + (0.5 * (4 - np.pi)) ** (2 / 3)))
scale_skew = hillas.length.to_value(u.m) / np.sqrt(1 - 2 * delta ** 2 / np.pi)

delta = np.sign(hillas.skewness) * np.sqrt(
(np.pi / 2 * skew23) / (skew23 + (0.5 * (4 - np.pi)) ** (2 / 3))
)
scale_skew = hillas.length.to_value(u.m) / np.sqrt(1 - 2 * delta**2 / np.pi)

if pdf == "Gaussian":
initial_guess["amplitude"] = hillas.intensity/(2*np.pi*hillas.length.value*hillas.width.value)
initial_guess["amplitude"] = hillas.intensity / (
2 * np.pi * hillas.length.value * hillas.width.value
)
if pdf == "Cauchy":
initial_guess["amplitude"] = hillas.intensity/(np.pi*scale_skew*hillas.width.value)
initial_guess["amplitude"] = hillas.intensity / (
np.pi * scale_skew * hillas.width.value
)
if pdf == "Skewed":
initial_guess["amplitude"] = hillas.intensity/(np.sqrt(2*np.pi)*scale_skew*hillas.width.value)
initial_guess["amplitude"] = hillas.intensity / (
np.sqrt(2 * np.pi) * scale_skew * hillas.width.value
)

return initial_guess


def extra_rows(n, cleaned_mask, geometry):
"""
This function adds n extra rows to the cleaning mask for a better fit of the shower tail
Parameters
----------
n : int
Expand All @@ -82,6 +89,7 @@ def extra_rows(n, cleaned_mask, geometry):

return mask


def boundaries(geometry, image, cleaning_mask, clean_row_mask, x0, pdf):
"""
Computes the boundaries of the fit.
Expand All @@ -100,7 +108,7 @@ def boundaries(geometry, image, cleaning_mask, clean_row_mask, x0, pdf):
seeds of the fit
pdf: str
PDF name
Returns
-------
Limits of the fit for each free parameter
Expand All @@ -109,40 +117,69 @@ def boundaries(geometry, image, cleaning_mask, clean_row_mask, x0, pdf):
row_image[~clean_row_mask] = 0.0
row_image[row_image < 0] = 0.0

cleaned_image= image.copy()
cleaned_image = image.copy()
cleaned_image[~cleaning_mask] = 0.0

pix_area = geometry.pix_area.value[0]
area = pix_area * np.count_nonzero(row_image)

leakage = leakage_parameters(geometry, image, clean_row_mask) #TODO: length boundaries dependent on leakage or distance of centroid to centre of camera
fract_pix_border = leakage.pixels_width_2
fract_int_border = leakage.intensity_width_2

delta_x = geometry.pix_x.value - x0["x"].value
delta_y = geometry.pix_y.value - x0["y"].value
longitudinal = delta_x * np.cos(x0["psi"].value) + delta_y * np.sin(x0["psi"].value)
transverse = delta_x * -np.sin(x0["psi"].value) + delta_y * np.cos(x0["psi"].value)

cogx_min, cogx_max = np.min(geometry.pix_x.value[cleaned_image>0.3*np.max(cleaned_image)]), np.max(geometry.pix_x.value[cleaned_image>0.3*np.max(cleaned_image)])
cogy_min, cogy_max = np.min(geometry.pix_y.value[cleaned_image>0.3*np.max(cleaned_image)]), np.max(geometry.pix_y.value[cleaned_image>0.3*np.max(cleaned_image)])
cogx_min, cogx_max = np.min(
geometry.pix_x.value[cleaned_image > 0.3 * np.max(cleaned_image)]
), np.max(geometry.pix_x.value[cleaned_image > 0.3 * np.max(cleaned_image)])
cogy_min, cogy_max = np.min(
geometry.pix_y.value[cleaned_image > 0.3 * np.max(cleaned_image)]
), np.max(geometry.pix_y.value[cleaned_image > 0.3 * np.max(cleaned_image)])

x_dis = np.max(longitudinal[row_image>0]) - np.min(longitudinal[row_image>0])
y_dis = np.max(transverse[row_image>0]) - np.min(transverse[row_image>0])
length_min, length_max = np.sqrt(pix_area), x_dis
x_dis = np.max(longitudinal[row_image > 0]) - np.min(longitudinal[row_image > 0])
y_dis = np.max(transverse[row_image > 0]) - np.min(transverse[row_image > 0])
length_min, length_max = np.sqrt(pix_area), x_dis

width_min, width_max = np.sqrt(pix_area), y_dis
scale = length_min/ np.sqrt(1 - 2 / np.pi)
scale = length_min / np.sqrt(1 - 2 / np.pi)
skew_min, skew_max = -0.99, 0.99

if pdf == "Gaussian":
return [(cogx_min, cogx_max), (cogy_min, cogy_max), (-np.pi/2, np.pi/2), (length_min, length_max), (width_min, width_max), (0, np.sum(row_image)*1/(2*np.pi*width_min*length_min))]
return [
(cogx_min, cogx_max),
(cogy_min, cogy_max),
(-np.pi / 2, np.pi / 2),
(length_min, length_max),
(width_min, width_max),
(0, np.sum(row_image) * 1 / (2 * np.pi * width_min * length_min)),
]
if pdf == "Skewed":
return [(cogx_min, cogx_max), (cogy_min, cogy_max), (-np.pi/2, np.pi/2), (length_min, length_max), (width_min, width_max), (skew_min, skew_max), (0, np.sum(row_image) * 1/scale * 1/(np.sqrt(2*np.pi)*width_min))]
return [
(cogx_min, cogx_max),
(cogy_min, cogy_max),
(-np.pi / 2, np.pi / 2),
(length_min, length_max),
(width_min, width_max),
(skew_min, skew_max),
(0, np.sum(row_image) * 1 / scale * 1 / (np.sqrt(2 * np.pi) * width_min)),
]
if pdf == "Cauchy":
return [(cogx_min, cogx_max), (cogy_min, cogy_max), (-np.pi/2, np.pi/2), (length_min, length_max), (width_min, width_max), (skew_min, skew_max), (0, np.sum(row_image) * 1/scale * 1/(np.pi*width_min))]
return [
(cogx_min, cogx_max),
(cogy_min, cogy_max),
(-np.pi / 2, np.pi / 2),
(length_min, length_max),
(width_min, width_max),
(skew_min, skew_max),
(0, np.sum(row_image) * 1 / scale * 1 / (np.pi * width_min)),
]


class ImageFitParameterizationError(RuntimeError):
pass

def image_fit_parameters(geom, image, n, cleaned_mask, spe_width, pedestal, pdf="Cauchy", bounds=None):

def image_fit_parameters(
geom, image, n, cleaned_mask, spe_width, pedestal, pdf="Cauchy", bounds=None
):
"""
Computes image parameters for a given shower image.
Expand Down Expand Up @@ -177,10 +214,11 @@ def image_fit_parameters(geom, image, n, cleaned_mask, spe_width, pedestal, pdf=
pix_y = geom.pix_y
image = np.asanyarray(image, dtype=np.float64)

pdf_dict = {"Gaussian": Gaussian,
"Skewed": SkewedGaussian,
"Cauchy": SkewedCauchy,
}
pdf_dict = {
"Gaussian": Gaussian,
"Skewed": SkewedGaussian,
"Cauchy": SkewedCauchy,
}

if isinstance(image, np.ma.masked_array):
image = np.ma.filled(image, 0)
Expand All @@ -193,36 +231,70 @@ def image_fit_parameters(geom, image, n, cleaned_mask, spe_width, pedestal, pdf=
x0 = create_initial_guess(geom, prev_image, pdf)

if np.count_nonzero(image) <= len(x0):
raise ImageFitParameterizationError("The number of free parameters is higher than the number of pixels to fit, cannot perform fit")
raise ImageFitParameterizationError(
"The number of free parameters is higher than the number of pixels to fit, cannot perform fit"
)

mask = extra_rows(n, cleaned_mask, geom)
cleaned_image = image.copy()
cleaned_image = image.copy()
cleaned_image[~mask] = 0.0
cleaned_image[cleaned_image<0] = 0.0
cleaned_image[cleaned_image < 0] = 0.0
size = np.sum(cleaned_image)

def fit(cog_x, cog_y, psi, length, width, skewness, amplitude):
prediction = pdf_dict[pdf](cog_x*unit, cog_y*unit, length*unit, width*unit, psi*u.rad, skewness, amplitude).pdf(geom.pix_x, geom.pix_y)
prediction = pdf_dict[pdf](
cog_x * unit,
cog_y * unit,
length * unit,
width * unit,
psi * u.rad,
skewness,
amplitude,
).pdf(geom.pix_x, geom.pix_y)
return neg_log_likelihood_approx(cleaned_image, prediction, spe_width, pedestal)

def fit_gauss(cog_x, cog_y, psi, length, width, amplitude):
prediction = pdf_dict[pdf](cog_x*unit, cog_y*unit, length*unit, width*unit, psi*u.rad, amplitude).pdf(geom.pix_x, geom.pix_y)
prediction = pdf_dict[pdf](
cog_x * unit,
cog_y * unit,
length * unit,
width * unit,
psi * u.rad,
amplitude,
).pdf(geom.pix_x, geom.pix_y)
return neg_log_likelihood_approx(cleaned_image, prediction, spe_width, pedestal)

if pdf != "Gaussian":
m = Minuit(fit, cog_x=x0['x'].value, cog_y=x0['y'].value, psi=x0['psi'].value, length=x0['length'].value, width=x0['width'].value, skewness=x0["skewness"], amplitude=x0["amplitude"])
m = Minuit(
fit,
cog_x=x0["x"].value,
cog_y=x0["y"].value,
psi=x0["psi"].value,
length=x0["length"].value,
width=x0["width"].value,
skewness=x0["skewness"],
amplitude=x0["amplitude"],
)
else:
m = Minuit(fit_gauss, cog_x=x0['x'].value, cog_y=x0['y'].value, psi=x0['psi'].value, length=x0['length'].value, width=x0['width'].value, amplitude=x0["amplitude"])
m = Minuit(
fit_gauss,
cog_x=x0["x"].value,
cog_y=x0["y"].value,
psi=x0["psi"].value,
length=x0["length"].value,
width=x0["width"].value,
amplitude=x0["amplitude"],
)

bounds = boundaries(geom, image, cleaned_mask, mask, x0, pdf)

if bounds == None:
if bounds is None:
bounds = boundaries(geom, image, cleaned_mask, mask, x0, pdf)
m.limits = bounds
if bounds != None:
if bounds is not None:
m.limits = bounds
m.errordef=1 #neg log likelihood

m.errordef = 1 # neg log likelihood
m.migrad()
m.hesse()

Expand All @@ -233,29 +305,31 @@ def fit_gauss(cog_x, cog_y, psi, length, width, amplitude):
fit_rcog = np.linalg.norm([pars[0], pars[1]])
fit_phi = np.arctan2(pars[1], pars[0])

b = pars[1]**2 + pars[0]**2
A = (-pars[1]/(b))**2
B = (pars[0]/(b))**2
fit_phi_err = np.sqrt(A*errors[0]**2 + B*errors[1]**2)
fit_rcog_err = np.sqrt(pars[0]**2/b*errors[0]**2 + pars[1]**2/b*errors[1]**2)
b = pars[1] ** 2 + pars[0] ** 2
A = (-pars[1] / (b)) ** 2
B = (pars[0] / (b)) ** 2
fit_phi_err = np.sqrt(A * errors[0] ** 2 + B * errors[1] ** 2)
fit_rcog_err = np.sqrt(
pars[0] ** 2 / b * errors[0] ** 2 + pars[1] ** 2 / b * errors[1] ** 2
)

delta_x = geom.pix_x.value - pars[0]
delta_y = geom.pix_y.value - pars[1]

longitudinal = delta_x * np.cos(pars[2]) + delta_y * np.sin(pars[2])

m4_long = np.average(longitudinal**4, weights=cleaned_image)
kurtosis_long = m4_long / pars[3]**4
kurtosis_long = m4_long / pars[3] ** 4

if pdf != "Gaussian":
skewness_long = pars[5]
amplitude=pars[6],
amplitude_uncertainty=errors[6]
amplitude = (pars[6],)
amplitude_uncertainty = errors[6]
else:
m3_long = np.average(longitudinal**3, weights=image)
skewness_long = m3_long / pars[3]**3
amplitude=pars[5],
amplitude_uncertainty=errors[5]
skewness_long = m3_long / pars[3] ** 3
amplitude = (pars[5],)
amplitude_uncertainty = errors[5]

if unit.is_equivalent(u.m):
return CameraImageFitParametersContainer(
Expand Down Expand Up @@ -283,7 +357,7 @@ def fit_gauss(cog_x, cog_y, psi, length, width, amplitude):
n_free_par=len(x0),
is_valid=m.valid,
is_accurate=m.accurate,
)
)
return ImageFitParametersContainer(
fov_lon=u.Quantity(pars[0], unit),
fov_lon_uncertainty=u.Quantity(errors[0], unit),
Expand All @@ -309,6 +383,4 @@ def fit_gauss(cog_x, cog_y, psi, length, width, amplitude):
n_free_par=len(x0),
is_valid=m.valid,
is_accurate=m.accurate,
)


)
1 change: 0 additions & 1 deletion ctapipe/image/image_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from .cleaning import ImageCleaner
from .concentration import concentration_parameters
from .hillas import hillas_parameters
from .ellipsoid import image_fit_parameters
from .leakage import leakage_parameters
from .modifications import ImageModifier
from .morphology import morphology_parameters
Expand Down
19 changes: 9 additions & 10 deletions ctapipe/image/tests/test_ellipsoid.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import pytest
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord
from numpy import isclose
from pytest import approx

from ctapipe.containers import (
CameraHillasParametersContainer,
HillasParametersContainer,
)
from ctapipe.containers import (CameraHillasParametersContainer,
HillasParametersContainer)
from ctapipe.coordinates import TelescopeFrame
from ctapipe.image import tailcuts_clean, toymodel
from ctapipe.image.ellipsoid import (ImageFitParameterizationError,
image_fit_parameters)
from ctapipe.image.hillas import HillasParameterizationError, hillas_parameters
from ctapipe.image.ellipsoid import ImageFitParameterizationError, image_fit_parameters
from ctapipe.instrument import CameraGeometry, SubarrayDescription
from numpy import isclose
from pytest import approx


def create_sample_image(
psi="-30d",
Expand Down Expand Up @@ -50,7 +49,7 @@ def test_imagefit_failure(prod5_lst):
blank_image = np.zeros(geom.n_pixels)

with pytest.raises(ImageFitParameterizationError):
image_fit_parameters(geom, blank_image)
image_fit_parameters(geom, blank_image, n=2, blank_image==1, spe_width=0.6, pesdestal=200)

def test_hillas_similarity(prod5_lst):
geom = prod5_lst.camera.geometry
Expand All @@ -59,7 +58,7 @@ def test_hillas_similarity(prod5_lst):
cleaned_image = image.copy()
cleaned_image[~clean_mask] = 0

imagefit = image_fit_parameters(geom, image_zeros)
imagefit = image_fit_parameters(geom, image_zeros, n=2, clean_mask, spe_width=0.6, pedestal=200) #TODO: include proper numbers from simulation
hillas = hillas_parameters(geom, image_zeros)

assert_allclose(imagefit.r, hillas.r, rtol=0.2)
Expand Down

0 comments on commit bfef038

Please sign in to comment.