diff --git a/src/ctapipe/calib/camera/extractor.py b/src/ctapipe/calib/camera/extractor.py deleted file mode 100644 index 7093d057f20..00000000000 --- a/src/ctapipe/calib/camera/extractor.py +++ /dev/null @@ -1,233 +0,0 @@ -""" -Extraction algorithms to compute the statistics from a sequence of images -""" - -__all__ = [ - "StatisticsExtractor", - "PlainExtractor", - "SigmaClippingExtractor", -] - -from abc import abstractmethod - -import numpy as np -from astropy.stats import sigma_clipped_stats - -from ctapipe.containers import StatisticsContainer -from ctapipe.core import TelescopeComponent -from ctapipe.core.traits import ( - Int, - List, -) - - -class StatisticsExtractor(TelescopeComponent): - sample_size = Int(2500, help="sample size").tag(config=True) - image_median_cut_outliers = List( - [-0.3, 0.3], - help="Interval of accepted image values (fraction with respect to camera median value)", - ).tag(config=True) - image_std_cut_outliers = List( - [-3, 3], - help="Interval (number of std) of accepted image standard deviation around camera median value", - ).tag(config=True) - - def __init__(self, subarray, config=None, parent=None, **kwargs): - """ - Base component to handle the extraction of the statistics - from a sequence of charges and pulse times (images). - - Parameters - ---------- - kwargs - """ - super().__init__(subarray=subarray, config=config, parent=parent, **kwargs) - - @abstractmethod - def __call__( - self, dl1_table, masked_pixels_of_sample=None, col_name="image" - ) -> list: - """ - Call the relevant functions to extract the statistics - for the particular extractor. - - Parameters - ---------- - dl1_table : ndarray - dl1 table with images and times stored in a numpy array of shape - (n_images, n_channels, n_pix). - col_name : string - column name in the dl1 table - - Returns - ------- - List StatisticsContainer: - List of extracted statistics and validity ranges - """ - - -class PlainExtractor(StatisticsExtractor): - """ - Extractor the statistics from a sequence of images - using numpy and scipy functions - """ - - def __call__( - self, dl1_table, masked_pixels_of_sample=None, col_name="image" - ) -> list: - # in python 3.12 itertools.batched can be used - image_chunks = ( - dl1_table[col_name].data[i : i + self.sample_size] - for i in range(0, len(dl1_table[col_name].data), self.sample_size) - ) - time_chunks = ( - dl1_table["time"][i : i + self.sample_size] - for i in range(0, len(dl1_table["time"]), self.sample_size) - ) - - # Calculate the statistics from a sequence of images - stats_list = [] - for images, times in zip(image_chunks, time_chunks): - stats_list.append( - self._plain_extraction(images, times, masked_pixels_of_sample) - ) - return stats_list - - def _plain_extraction( - self, images, times, masked_pixels_of_sample - ) -> StatisticsContainer: - # ensure numpy array - masked_images = np.ma.array(images, mask=masked_pixels_of_sample) - - # median over the sample per pixel - pixel_median = np.ma.median(masked_images, axis=0) - - # mean over the sample per pixel - pixel_mean = np.ma.mean(masked_images, axis=0) - - # std over the sample per pixel - pixel_std = np.ma.std(masked_images, axis=0) - - # median of the median over the camera - # median_of_pixel_median = np.ma.median(pixel_median, axis=1) - - # outliers from median - image_median_outliers = np.logical_or( - pixel_median < self.image_median_cut_outliers[0], - pixel_median > self.image_median_cut_outliers[1], - ) - - return StatisticsContainer( - validity_start=times[0], - validity_stop=times[-1], - mean=pixel_mean.filled(np.nan), - median=pixel_median.filled(np.nan), - median_outliers=image_median_outliers.filled(True), - std=pixel_std.filled(np.nan), - ) - - -class SigmaClippingExtractor(StatisticsExtractor): - """ - Extractor the statistics from a sequence of images - using astropy's sigma clipping functions - """ - - sigma_clipping_max_sigma = Int( - default_value=4, - help="Maximal value for the sigma clipping outlier removal", - ).tag(config=True) - sigma_clipping_iterations = Int( - default_value=5, - help="Number of iterations for the sigma clipping outlier removal", - ).tag(config=True) - - def __call__( - self, dl1_table, masked_pixels_of_sample=None, col_name="image" - ) -> list: - # in python 3.12 itertools.batched can be used - image_chunks = ( - dl1_table[col_name].data[i : i + self.sample_size] - for i in range(0, len(dl1_table[col_name].data), self.sample_size) - ) - time_chunks = ( - dl1_table["time"][i : i + self.sample_size] - for i in range(0, len(dl1_table["time"]), self.sample_size) - ) - - # Calculate the statistics from a sequence of images - stats_list = [] - for images, times in zip(image_chunks, time_chunks): - stats_list.append( - self._sigmaclipping_extraction(images, times, masked_pixels_of_sample) - ) - return stats_list - - def _sigmaclipping_extraction( - self, images, times, masked_pixels_of_sample - ) -> StatisticsContainer: - # ensure numpy array - masked_images = np.ma.array(images, mask=masked_pixels_of_sample) - - # median of the event images - # image_median = np.ma.median(masked_images, axis=-1) - - # mean, median, and std over the sample per pixel - max_sigma = self.sigma_clipping_max_sigma - pixel_mean, pixel_median, pixel_std = sigma_clipped_stats( - masked_images, - sigma=max_sigma, - maxiters=self.sigma_clipping_iterations, - cenfunc="mean", - axis=0, - ) - - # mask pixels without defined statistical values - pixel_mean = np.ma.array(pixel_mean, mask=np.isnan(pixel_mean)) - pixel_median = np.ma.array(pixel_median, mask=np.isnan(pixel_median)) - pixel_std = np.ma.array(pixel_std, mask=np.isnan(pixel_std)) - - unused_values = np.abs(masked_images - pixel_mean) > (max_sigma * pixel_std) - - # only warn for values discard in the sigma clipping, not those from before - # outliers = unused_values & (~masked_images.mask) - - # add outliers identified by sigma clipping for following operations - masked_images.mask |= unused_values - - # median of the median over the camera - median_of_pixel_median = np.ma.median(pixel_median, axis=1) - - # median of the std over the camera - median_of_pixel_std = np.ma.median(pixel_std, axis=1) - - # std of the std over camera - std_of_pixel_std = np.ma.std(pixel_std, axis=1) - - # outliers from median - image_deviation = pixel_median - median_of_pixel_median[:, np.newaxis] - image_median_outliers = np.logical_or( - image_deviation - < self.image_median_cut_outliers[0] * median_of_pixel_median[:, np.newaxis], - image_deviation - > self.image_median_cut_outliers[1] * median_of_pixel_median[:, np.newaxis], - ) - - # outliers from standard deviation - deviation = pixel_std - median_of_pixel_std[:, np.newaxis] - image_std_outliers = np.logical_or( - deviation - < self.image_std_cut_outliers[0] * std_of_pixel_std[:, np.newaxis], - deviation - > self.image_std_cut_outliers[1] * std_of_pixel_std[:, np.newaxis], - ) - - return StatisticsContainer( - validity_start=times[0], - validity_stop=times[-1], - mean=pixel_mean.filled(np.nan), - median=pixel_median.filled(np.nan), - median_outliers=image_median_outliers.filled(True), - std=pixel_std.filled(np.nan), - std_outliers=image_std_outliers.filled(True), - ) diff --git a/src/ctapipe/calib/camera/pointing.py b/src/ctapipe/calib/camera/pointing.py index d36e1a5b9cd..d5673b17693 100644 --- a/src/ctapipe/calib/camera/pointing.py +++ b/src/ctapipe/calib/camera/pointing.py @@ -14,7 +14,6 @@ from astroquery.vizier import Vizier # discuss this dependency with max etc. from scipy.odr import ODR, RealData -from ctapipe.calib.camera.extractor import StatisticsExtractor from ctapipe.containers import StarContainer from ctapipe.coordinates import EngineeringCameraFrame from ctapipe.core import TelescopeComponent @@ -28,7 +27,8 @@ from ctapipe.image import tailcuts_clean from ctapipe.image.psf_model import PSFModel from ctapipe.instrument import CameraGeometry -from ctapipe.io import FlatFieldInterpolator, PointingInterpolator +from ctapipe.monitoring.aggregator import StatisticsAggregator +from ctapipe.monitoring.interpolation import FlatFieldInterpolator, PointingInterpolator __all__ = ["PointingCalculator", "StarImageGenerator"] @@ -359,16 +359,16 @@ class PointingCalculator(TelescopeComponent): Attributes ---------- - stats_extractor: str - The name of the StatisticsExtractor subclass to be used to calculate the statistics of an image set + stats_aggregator: str + The name of the StatisticsAggregator subclass to be used to calculate the statistics of an image set telescope_location: dict The location of the telescope for which the pointing correction is to be calculated """ - stats_extractor = TelescopeParameter( - trait=ComponentName(StatisticsExtractor, default_value="Plain"), - default_value="Plain", - help="Name of the StatisticsExtractor Subclass to be used.", + stats_aggregator = TelescopeParameter( + trait=ComponentName(StatisticsAggregator, default_value="PlainAggregator"), + default_value="PlainAggregator", + help="Name of the StatisticsAggregator Subclass to be used.", ).tag(config=True) telescope_location = Dict( @@ -444,8 +444,8 @@ def __init__( height=self.telescope_location["elevation"] * u.m, ) - self.stats_aggregator = StatisticsExtractor.from_name( - self.stats_extractor, subarray=self.subarray, parent=self + self.image_aggregator = StatisticsAggregator.from_name( + self.stats_aggregator, subarray=self.subarray, parent=self ) self.set_camera(geometry) @@ -662,14 +662,14 @@ def _get_accumulated_images(self, data_table): np.square(relative_gains(data_table["time"][i]).median), ) - # then turn it into a table that the extractor can read + # then turn it into a table that the aggregator can read variance_image_table = QTable( [data_table["time"], variance_images], names=["time", "image"] ) - # get the cumulative variance images using the statistics extractor and return the value + # get the cumulative variance images using the statistics aggregator and return the value - variance_statistics = self.stats_aggregator( + variance_statistics = self.image_aggregator( variance_image_table, col_name="image" ) diff --git a/src/ctapipe/io/interpolation.py b/src/ctapipe/io/interpolation.py deleted file mode 100644 index e0e27470c99..00000000000 --- a/src/ctapipe/io/interpolation.py +++ /dev/null @@ -1,369 +0,0 @@ -from abc import ABCMeta, abstractmethod -from typing import Any - -import astropy.units as u -import numpy as np -import tables -from astropy.time import Time -from scipy.interpolate import interp1d - -from ctapipe.core import Component, traits - -from .astropy_helpers import read_table - - -class ChunkFunction: - - """ - Chunk Interpolator for the gain and pedestals - Interpolates data so that for each time the value from the latest starting - valid chunk is given or the earliest available still valid chunk for any - pixels without valid data. - - Parameters - ---------- - values : None | np.array - Numpy array of the data that is to be interpolated. - The first dimension needs to be an index over time - times : None | np.array - Time values over which data are to be interpolated - need to be sorted and have same length as first dimension of values - """ - - def __init__( - self, - start_times, - end_times, - values, - bounds_error=True, - fill_value="extrapolate", - assume_sorted=True, - copy=False, - ): - self.values = values - self.start_times = start_times - self.end_times = end_times - self.bounds_error = bounds_error - self.fill_value = fill_value - - def __call__(self, point): - if point < self.start_times[0]: - if self.bounds_error: - raise ValueError("below the interpolation range") - - if self.fill_value == "extrapolate": - return self.values[0] - - else: - a = np.empty(self.values[0].shape) - a[:] = np.nan - return a - - elif point > self.end_times[-1]: - if self.bounds_error: - raise ValueError("above the interpolation range") - - if self.fill_value == "extrapolate": - return self.values[-1] - - else: - a = np.empty(self.values[0].shape) - a[:] = np.nan - return a - - else: - i = np.searchsorted( - self.start_times, point, side="left" - ) # Latest valid chunk - j = np.searchsorted( - self.end_times, point, side="left" - ) # Earliest valid chunk - return np.where( - np.isnan(self.values[i - 1]), self.values[j], self.values[i - 1] - ) # Give value for latest chunk unless its nan. If nan give earliest chunk value - - -class Interpolator(Component, metaclass=ABCMeta): - """ - Interpolator parent class. - - Parameters - ---------- - h5file : None | tables.File - A open hdf5 file with read access. - """ - - bounds_error = traits.Bool( - default_value=True, - help="If true, raises an exception when trying to extrapolate out of the given table", - ).tag(config=True) - - extrapolate = traits.Bool( - help="If bounds_error is False, this flag will specify whether values outside" - "the available values are filled with nan (False) or extrapolated (True).", - default_value=False, - ).tag(config=True) - - telescope_data_group = None - required_columns = set() - expected_units = {} - - def __init__(self, h5file=None, **kwargs): - super().__init__(**kwargs) - - if h5file is not None and not isinstance(h5file, tables.File): - raise TypeError("h5file must be a tables.File") - self.h5file = h5file - - self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) - if self.bounds_error: - self.interp_options["bounds_error"] = True - elif self.extrapolate: - self.interp_options["bounds_error"] = False - self.interp_options["fill_value"] = "extrapolate" - else: - self.interp_options["bounds_error"] = False - self.interp_options["fill_value"] = np.nan - - self._interpolators = {} - - @abstractmethod - def add_table(self, tel_id, input_table): - """ - Add a table to this interpolator - This method reads input tables and creates instances of the needed interpolators - to be added to _interpolators. The first index of _interpolators needs to be - tel_id, the second needs to be the name of the parameter that is to be interpolated - - Parameters - ---------- - tel_id : int - Telescope id - input_table : astropy.table.Table - Table of pointing values, expected columns - are always ``time`` as ``Time`` column and - other columns for the data that is to be interpolated - """ - - pass - - def _check_tables(self, input_table): - missing = self.required_columns - set(input_table.colnames) - if len(missing) > 0: - raise ValueError(f"Table is missing required column(s): {missing}") - for col in self.expected_units: - unit = input_table[col].unit - if unit is None: - if self.expected_units[col] is not None: - raise ValueError( - f"{col} must have units compatible with '{self.expected_units[col].name}'" - ) - elif not self.expected_units[col].is_equivalent(unit): - if self.expected_units[col] is None: - raise ValueError(f"{col} must have units compatible with 'None'") - else: - raise ValueError( - f"{col} must have units compatible with '{self.expected_units[col].name}'" - ) - - def _check_interpolators(self, tel_id): - if tel_id not in self._interpolators: - if self.h5file is not None: - self._read_parameter_table(tel_id) # might need to be removed - else: - raise KeyError(f"No table available for tel_id {tel_id}") - - def _read_parameter_table(self, tel_id): - input_table = read_table( - self.h5file, - f"{self.telescope_data_group}/tel_{tel_id:03d}", - ) - self.add_table(tel_id, input_table) - - -class PointingInterpolator(Interpolator): - """ - Interpolator for pointing and pointing correction data - """ - - telescope_data_group = "/dl0/monitoring/telescope/pointing" - required_columns = frozenset(["time", "azimuth", "altitude"]) - expected_units = {"azimuth": u.rad, "altitude": u.rad} - - def __call__(self, tel_id, time): - """ - Interpolate alt/az for given time and tel_id. - - Parameters - ---------- - tel_id : int - telescope id - time : astropy.time.Time - time for which to interpolate the pointing - - Returns - ------- - altitude : astropy.units.Quantity[deg] - interpolated altitude angle - azimuth : astropy.units.Quantity[deg] - interpolated azimuth angle - """ - - self._check_interpolators(tel_id) - - mjd = time.tai.mjd - az = u.Quantity(self._interpolators[tel_id]["az"](mjd), u.rad, copy=False) - alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False) - return alt, az - - def add_table(self, tel_id, input_table): - """ - Add a table to this interpolator - - Parameters - ---------- - tel_id : int - Telescope id - input_table : astropy.table.Table - Table of pointing values, expected columns - are ``time`` as ``Time`` column, ``azimuth`` and ``altitude`` - as quantity columns for pointing and pointing correction data. - """ - - self._check_tables(input_table) - - if not isinstance(input_table["time"], Time): - raise TypeError("'time' column of pointing table must be astropy.time.Time") - - input_table = input_table.copy() - input_table.sort("time") - - az = input_table["azimuth"].quantity.to_value(u.rad) - # prepare azimuth for interpolation by "unwrapping": i.e. turning - # [359, 1] into [359, 361]. This assumes that if we get values like - # [359, 1] the telescope moved 2 degrees through 0, not 358 degrees - # the other way around. This should be true for all telescopes given - # the sampling speed of pointing values and their maximum movement speed. - # No telescope can turn more than 180° in 2 seconds. - az = np.unwrap(az) - alt = input_table["altitude"].quantity.to_value(u.rad) - mjd = input_table["time"].tai.mjd - self._interpolators[tel_id] = {} - self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options) - self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options) - - -class FlatFieldInterpolator(Interpolator): - """ - Interpolator for flatfield data - """ - - telescope_data_group = "dl1/calibration/gain" # TBD - required_columns = frozenset(["start_time", "end_time", "gain"]) - expected_units = {"gain": u.one} - - def __call__(self, tel_id, time): - """ - Interpolate flatfield data for a given time and tel_id. - - Parameters - ---------- - tel_id : int - telescope id - time : astropy.time.Time - time for which to interpolate the calibration data - - Returns - ------- - ffield : array [float] - interpolated flatfield data - """ - - self._check_interpolators(tel_id) - - ffield = self._interpolators[tel_id]["gain"](time) - return ffield - - def add_table(self, tel_id, input_table): - """ - Add a table to this interpolator - - Parameters - ---------- - tel_id : int - Telescope id - input_table : astropy.table.Table - Table of pointing values, expected columns - are ``time`` as ``Time`` column and "gain" - for the flatfield data - """ - - self._check_tables(input_table) - - input_table = input_table.copy() - input_table.sort("start_time") - start_time = input_table["start_time"] - end_time = input_table["end_time"] - gain = input_table["gain"] - self._interpolators[tel_id] = {} - self._interpolators[tel_id]["gain"] = ChunkFunction( - start_time, end_time, gain, **self.interp_options - ) - - -class PedestalInterpolator(Interpolator): - """ - Interpolator for Pedestal data - """ - - telescope_data_group = "dl1/calibration/pedestal" # TBD - required_columns = frozenset(["start_time", "end_time", "pedestal"]) - expected_units = {"pedestal": u.one} - - def __call__(self, tel_id, time): - """ - Interpolate pedestal or gain for a given time and tel_id. - - Parameters - ---------- - tel_id : int - telescope id - time : astropy.time.Time - time for which to interpolate the calibration data - - Returns - ------- - pedestal : array [float] - interpolated pedestal values - """ - - self._check_interpolators(tel_id) - - pedestal = self._interpolators[tel_id]["pedestal"](time) - return pedestal - - def add_table(self, tel_id, input_table): - """ - Add a table to this interpolator - - Parameters - ---------- - tel_id : int - Telescope id - input_table : astropy.table.Table - Table of pointing values, expected columns - are ``time`` as ``Time`` column and "pedestal" - for the pedestal data - """ - - self._check_tables(input_table) - - input_table = input_table.copy() - input_table.sort("start_time") - start_time = input_table["start_time"] - end_time = input_table["end_time"] - pedestal = input_table["pedestal"] - self._interpolators[tel_id] = {} - self._interpolators[tel_id]["pedestal"] = ChunkFunction( - start_time, end_time, pedestal, **self.interp_options - ) diff --git a/src/ctapipe/monitoring/interpolation.py b/src/ctapipe/monitoring/interpolation.py index 84064cbc1a3..caceccd8c43 100644 --- a/src/ctapipe/monitoring/interpolation.py +++ b/src/ctapipe/monitoring/interpolation.py @@ -15,6 +15,77 @@ ] +class ChunkFunction: + + """ + Chunk Interpolator for the gain and pedestals + Interpolates data so that for each time the value from the latest starting + valid chunk is given or the earliest available still valid chunk for any + pixels without valid data. + + Parameters + ---------- + values : None | np.array + Numpy array of the data that is to be interpolated. + The first dimension needs to be an index over time + times : None | np.array + Time values over which data are to be interpolated + need to be sorted and have same length as first dimension of values + """ + + def __init__( + self, + start_times, + end_times, + values, + bounds_error=True, + fill_value="extrapolate", + assume_sorted=True, + copy=False, + ): + self.values = values + self.start_times = start_times + self.end_times = end_times + self.bounds_error = bounds_error + self.fill_value = fill_value + + def __call__(self, point): + if point < self.start_times[0]: + if self.bounds_error: + raise ValueError("below the interpolation range") + + if self.fill_value == "extrapolate": + return self.values[0] + + else: + a = np.empty(self.values[0].shape) + a[:] = np.nan + return a + + elif point > self.end_times[-1]: + if self.bounds_error: + raise ValueError("above the interpolation range") + + if self.fill_value == "extrapolate": + return self.values[-1] + + else: + a = np.empty(self.values[0].shape) + a[:] = np.nan + return a + + else: + i = np.searchsorted( + self.start_times, point, side="left" + ) # Latest valid chunk + j = np.searchsorted( + self.end_times, point, side="left" + ) # Earliest valid chunk + return np.where( + np.isnan(self.values[i - 1]), self.values[j], self.values[i - 1] + ) # Give value for latest chunk unless its nan. If nan give earliest chunk value + + class Interpolator(Component, metaclass=ABCMeta): """ Interpolator parent class. @@ -186,3 +257,119 @@ def add_table(self, tel_id, input_table): self._interpolators[tel_id] = {} self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options) self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options) + + +class FlatFieldInterpolator(Interpolator): + """ + Interpolator for flatfield data + """ + + telescope_data_group = "dl1/calibration/gain" # TBD + required_columns = frozenset(["start_time", "end_time", "gain"]) + expected_units = {"gain": u.one} + + def __call__(self, tel_id, time): + """ + Interpolate flatfield data for a given time and tel_id. + + Parameters + ---------- + tel_id : int + telescope id + time : astropy.time.Time + time for which to interpolate the calibration data + + Returns + ------- + ffield : array [float] + interpolated flatfield data + """ + + self._check_interpolators(tel_id) + + ffield = self._interpolators[tel_id]["gain"](time) + return ffield + + def add_table(self, tel_id, input_table): + """ + Add a table to this interpolator + + Parameters + ---------- + tel_id : int + Telescope id + input_table : astropy.table.Table + Table of pointing values, expected columns + are ``time`` as ``Time`` column and "gain" + for the flatfield data + """ + + self._check_tables(input_table) + + input_table = input_table.copy() + input_table.sort("start_time") + start_time = input_table["start_time"] + end_time = input_table["end_time"] + gain = input_table["gain"] + self._interpolators[tel_id] = {} + self._interpolators[tel_id]["gain"] = ChunkFunction( + start_time, end_time, gain, **self.interp_options + ) + + +class PedestalInterpolator(Interpolator): + """ + Interpolator for Pedestal data + """ + + telescope_data_group = "dl1/calibration/pedestal" # TBD + required_columns = frozenset(["start_time", "end_time", "pedestal"]) + expected_units = {"pedestal": u.one} + + def __call__(self, tel_id, time): + """ + Interpolate pedestal or gain for a given time and tel_id. + + Parameters + ---------- + tel_id : int + telescope id + time : astropy.time.Time + time for which to interpolate the calibration data + + Returns + ------- + pedestal : array [float] + interpolated pedestal values + """ + + self._check_interpolators(tel_id) + + pedestal = self._interpolators[tel_id]["pedestal"](time) + return pedestal + + def add_table(self, tel_id, input_table): + """ + Add a table to this interpolator + + Parameters + ---------- + tel_id : int + Telescope id + input_table : astropy.table.Table + Table of pointing values, expected columns + are ``time`` as ``Time`` column and "pedestal" + for the pedestal data + """ + + self._check_tables(input_table) + + input_table = input_table.copy() + input_table.sort("start_time") + start_time = input_table["start_time"] + end_time = input_table["end_time"] + pedestal = input_table["pedestal"] + self._interpolators[tel_id] = {} + self._interpolators[tel_id]["pedestal"] = ChunkFunction( + start_time, end_time, pedestal, **self.interp_options + )