diff --git a/docs/api-reference/monitoring/aggregator.rst b/docs/api-reference/monitoring/aggregator.rst new file mode 100644 index 00000000000..0420d543afa --- /dev/null +++ b/docs/api-reference/monitoring/aggregator.rst @@ -0,0 +1,11 @@ +.. _stats_aggregator: + +********************* +Statistics Aggregator +********************* + + +Reference/API +============= + +.. automodapi:: ctapipe.monitoring.aggregator diff --git a/docs/api-reference/monitoring/index.rst b/docs/api-reference/monitoring/index.rst new file mode 100644 index 00000000000..9d1fbc3890e --- /dev/null +++ b/docs/api-reference/monitoring/index.rst @@ -0,0 +1,30 @@ +.. _monitoring: + +********************************** +Monitoring (`~ctapipe.monitoring`) +********************************** + +.. currentmodule:: ctapipe.monitoring + +Monitoring data are time-series used to monitor the status or quality of hardware, software algorithms, the environment, or other data products. These contain values recorded periodically at different rates, and can be thought of as a set of tables with rows identified by a time-stamp. They are potentially acquired during the day or nighttime operation of the array and during subsequent data processing, but ataverage rates much slower than Event data and faster than the length of a typical observation block. Examples include telescope tracking positions, trigger rates, camera sensor conditions, weather conditions, and the status or quality-control data of a particular hardware or software component. + +This module provides some code to help to generate monitoring data from processed event data, particularly for the purposes of calibration and data quality assessment. + +Currently, only code related to :ref:`stats_aggregator` is implemented here. + + +Submodules +========== + +.. toctree:: + :maxdepth: 1 + :glob: + + aggregator + + +Reference/API +============= + +.. automodapi:: ctapipe.monitoring + :no-inheritance-diagram: diff --git a/docs/changes/2554.feature.rst b/docs/changes/2554.feature.rst new file mode 100644 index 00000000000..2e6a6356b3a --- /dev/null +++ b/docs/changes/2554.feature.rst @@ -0,0 +1 @@ +Add API to extract the statistics from a sequence of images. diff --git a/docs/conf.py b/docs/conf.py index e302a8c3e0c..8941db319e4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -392,24 +392,24 @@ def setup(app): # Configuration for intersphinx intersphinx_mapping = { - "astropy": ("https://docs.astropy.org/en/stable/", None), - "bokeh": ("https://docs.bokeh.org/en/latest/", None), - "cython": ("https://docs.cython.org/en/stable/", None), - "iminuit": ("https://scikit-hep.org/iminuit/", None), - "ipywidgets": ("https://ipywidgets.readthedocs.io/en/stable/", None), - "joblib": ("https://joblib.readthedocs.io/en/stable/", None), - "matplotlib": ("https://matplotlib.org/stable/", None), - "numba": ("https://numba.readthedocs.io/en/stable/", None), - "numpy": ("https://numpy.org/doc/stable/", None), - "pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None), - "psutil": ("https://psutil.readthedocs.io/en/stable/", None), - "pytables": ("https://www.pytables.org/", None), - "pytest": ("https://docs.pytest.org/en/stable/", None), - "python": ("https://docs.python.org/3/", None), - "scipy": ("https://docs.scipy.org/doc/scipy/", None), - "setuptools": ("https://setuptools.pypa.io/en/stable/", None), - "sklearn": ("https://scikit-learn.org/stable/", None), - "traitlets": ("https://traitlets.readthedocs.io/en/stable/", None), + "astropy": ("https://docs.astropy.org/en/stable", None), + "bokeh": ("https://docs.bokeh.org/en/latest", None), + "cython": ("https://docs.cython.org/en/stable", None), + "iminuit": ("https://scikit-hep.org/iminuit", None), + "ipywidgets": ("https://ipywidgets.readthedocs.io/en/stable", None), + "joblib": ("https://joblib.readthedocs.io/en/stable", None), + "matplotlib": ("https://matplotlib.org/stable", None), + "numba": ("https://numba.readthedocs.io/en/stable", None), + "numpy": ("https://numpy.org/doc/stable", None), + "pandas": ("https://pandas.pydata.org/pandas-docs/stable", None), + "psutil": ("https://psutil.readthedocs.io/en/stable", None), + "pytables": ("https://www.pytables.org", None), + "pytest": ("https://docs.pytest.org/en/stable", None), + "python": ("https://docs.python.org/3", None), + "scipy": ("https://docs.scipy.org/doc/scipy", None), + "setuptools": ("https://setuptools.pypa.io/en/stable", None), + "sklearn": ("https://scikit-learn.org/stable", None), + "traitlets": ("https://traitlets.readthedocs.io/en/stable", None), } diff --git a/src/ctapipe/containers.py b/src/ctapipe/containers.py index 6dbf74049d0..311142311a3 100644 --- a/src/ctapipe/containers.py +++ b/src/ctapipe/containers.py @@ -57,6 +57,7 @@ "TriggerContainer", "WaveformCalibrationContainer", "StatisticsContainer", + "ImageStatisticsContainer", "IntensityStatisticsContainer", "PeakTimeStatisticsContainer", "SchedulingBlockContainer", @@ -421,7 +422,28 @@ class MorphologyContainer(Container): class StatisticsContainer(Container): - """Store descriptive statistics""" + """Store descriptive statistics of a chunk of images""" + + n_events = Field(-1, "number of events used for the extraction of the statistics") + mean = Field( + None, + "mean of a pixel-wise quantity for each channel" + "Type: float; Shape: (n_channels, n_pixel)", + ) + median = Field( + None, + "median of a pixel-wise quantity for each channel" + "Type: float; Shape: (n_channels, n_pixel)", + ) + std = Field( + None, + "standard deviation of a pixel-wise quantity for each channel" + "Type: float; Shape: (n_channels, n_pixel)", + ) + + +class ImageStatisticsContainer(Container): + """Store descriptive image statistics""" max = Field(np.float32(nan), "value of pixel with maximum intensity") min = Field(np.float32(nan), "value of pixel with minimum intensity") @@ -431,11 +453,11 @@ class StatisticsContainer(Container): kurtosis = Field(nan, "kurtosis of intensity") -class IntensityStatisticsContainer(StatisticsContainer): +class IntensityStatisticsContainer(ImageStatisticsContainer): default_prefix = "intensity" -class PeakTimeStatisticsContainer(StatisticsContainer): +class PeakTimeStatisticsContainer(ImageStatisticsContainer): default_prefix = "peak_time" diff --git a/src/ctapipe/image/statistics.py b/src/ctapipe/image/statistics.py index bd38f02a377..deb209efd08 100644 --- a/src/ctapipe/image/statistics.py +++ b/src/ctapipe/image/statistics.py @@ -3,7 +3,7 @@ import numpy as np from numba import float32, float64, guvectorize, int64, njit -from ..containers import StatisticsContainer +from ..containers import ImageStatisticsContainer __all__ = [ "arg_n_largest", @@ -88,8 +88,8 @@ def kurtosis(data, mean=None, std=None, fisher=True): def descriptive_statistics( - values, container_class=StatisticsContainer -) -> StatisticsContainer: + values, container_class=ImageStatisticsContainer +) -> ImageStatisticsContainer: """compute intensity statistics of an image""" mean = values.mean() std = values.std() diff --git a/src/ctapipe/image/tests/test_statistics.py b/src/ctapipe/image/tests/test_statistics.py index 006c9c0dbce..4403e05ca0a 100644 --- a/src/ctapipe/image/tests/test_statistics.py +++ b/src/ctapipe/image/tests/test_statistics.py @@ -49,14 +49,14 @@ def test_kurtosis(): def test_return_type(): - from ctapipe.containers import PeakTimeStatisticsContainer, StatisticsContainer + from ctapipe.containers import ImageStatisticsContainer, PeakTimeStatisticsContainer from ctapipe.image import descriptive_statistics rng = np.random.default_rng(0) data = rng.normal(5, 2, 1000) stats = descriptive_statistics(data) - assert isinstance(stats, StatisticsContainer) + assert isinstance(stats, ImageStatisticsContainer) stats = descriptive_statistics(data, container_class=PeakTimeStatisticsContainer) assert isinstance(stats, PeakTimeStatisticsContainer) diff --git a/src/ctapipe/monitoring/aggregator.py b/src/ctapipe/monitoring/aggregator.py new file mode 100644 index 00000000000..5f2e5933bbf --- /dev/null +++ b/src/ctapipe/monitoring/aggregator.py @@ -0,0 +1,177 @@ +""" +Algorithms to compute aggregated time-series statistics from columns of an event table. + +These classes take as input an events table, divide it into time chunks, which +may optionally overlap, and compute various aggregated statistics for each +chunk. The statistics include the count, mean, median, and standard deviation. The result +is a monitoring table with columns describing the start and stop time of the chunk +and the aggregated statistic values. +""" + +__all__ = [ + "StatisticsAggregator", + "PlainAggregator", + "SigmaClippingAggregator", +] + +from abc import abstractmethod +from collections import defaultdict + +import numpy as np +from astropy.stats import sigma_clipped_stats +from astropy.table import Table + +from ctapipe.containers import StatisticsContainer +from ctapipe.core import TelescopeComponent +from ctapipe.core.traits import Int + + +class StatisticsAggregator(TelescopeComponent): + """ + Base component to handle the computation of aggregated statistic values from a table + containing e.g. charges, peak times and/or charge variances (images). + """ + + chunk_size = Int( + 2500, + help="Size of the chunk used for the computation of aggregated statistic values", + ).tag(config=True) + + def __call__( + self, + table, + masked_pixels_of_sample=None, + chunk_shift=None, + col_name="image", + ) -> Table: + """ + Divide table into chunks and compute aggregated statistic values. + + This function divides the input table into overlapping or non-overlapping chunks of size ``chunk_size`` + and call the relevant function of the particular aggregator to compute aggregated statistic values. + The chunks are generated in a way that ensures they do not overflow the bounds of the table. + - If ``chunk_shift`` is None, chunks will not overlap, but the last chunk is ensured to be + of size `chunk_size`, even if it means the last two chunks will overlap. + - If ``chunk_shift`` is provided, it will determine the number of samples to shift between the start + of consecutive chunks resulting in an overlap of chunks. Chunks that overflows the bounds + of the table are not considered. + + Parameters + ---------- + table : astropy.table.Table + table with images of shape (n_images, n_channels, n_pix) + and timestamps of shape (n_images, ) + masked_pixels_of_sample : ndarray, optional + boolean array of masked pixels of shape (n_pix, ) that are not available for processing + chunk_shift : int, optional + number of samples to shift between the start of consecutive chunks + col_name : string + column name in the table + + Returns + ------- + astropy.table.Table + table containing the start and end values as timestamps and event IDs + as well as the aggregated statistic values (mean, median, std) for each chunk + """ + + # Check if the statistics of the table is sufficient to compute at least one complete chunk. + if len(table) < self.chunk_size: + raise ValueError( + f"The length of the provided table ({len(table)}) is insufficient to meet the required statistics for a single chunk of size ({self.chunk_size})." + ) + # Check if the chunk_shift is smaller than the chunk_size + if chunk_shift is not None and chunk_shift > self.chunk_size: + raise ValueError( + f"The chunk_shift ({chunk_shift}) must be smaller than the chunk_size ({self.chunk_size})." + ) + + # Function to split the table into appropriated chunks + def _get_chunks(table, chunk_shift): + # Calculate the range step: Use chunk_shift if provided, otherwise use chunk_size + step = chunk_shift or self.chunk_size + + # Generate chunks that do not overflow + for i in range(0, len(table) - self.chunk_size + 1, step): + yield table[i : i + self.chunk_size] + + # If chunk_shift is None, ensure the last chunk is of size chunk_size, if needed + if chunk_shift is None and len(table) % self.chunk_size != 0: + yield table[-self.chunk_size :] + + # Compute aggregated statistic values for each chunk of images + units = {col: table[col_name].unit for col in ("mean", "median", "std")} + data = defaultdict(list) + for chunk in _get_chunks(table, chunk_shift): + stats = self.compute_stats(chunk[col_name].data, masked_pixels_of_sample) + data["time_start"].append(chunk["time_mono"][0]) + data["time_end"].append(chunk["time_mono"][-1]) + data["event_id_start"].append(chunk["event_id"][0]) + data["event_id_end"].append(chunk["event_id"][-1]) + data["n_events"].append(stats.n_events) + data["mean"].append(stats.mean) + data["median"].append(stats.median) + data["std"].append(stats.std) + + return Table(data, units=units) + + @abstractmethod + def compute_stats(self, images, masked_pixels_of_sample) -> StatisticsContainer: + pass + + +class PlainAggregator(StatisticsAggregator): + """ + Compute aggregated statistic values from a chunk of images using numpy functions + """ + + def compute_stats(self, images, masked_pixels_of_sample) -> StatisticsContainer: + # Mask broken pixels + masked_images = np.ma.array(images, mask=masked_pixels_of_sample) + + # Compute the mean, median, and std over the chunk per pixel + pixel_mean = np.ma.mean(masked_images, axis=0).filled(np.nan) + pixel_median = np.ma.median(masked_images, axis=0).filled(np.nan) + pixel_std = np.ma.std(masked_images, axis=0).filled(np.nan) + + return StatisticsContainer( + n_events=masked_images.shape[0], + mean=pixel_mean, + median=pixel_median, + std=pixel_std, + ) + + +class SigmaClippingAggregator(StatisticsAggregator): + """ + Compute aggregated statistic values from a chunk of images using astropy's sigma clipping functions + """ + + max_sigma = Int( + default_value=4, + help="Maximal value for the sigma clipping outlier removal", + ).tag(config=True) + iterations = Int( + default_value=5, + help="Number of iterations for the sigma clipping outlier removal", + ).tag(config=True) + + def compute_stats(self, images, masked_pixels_of_sample) -> StatisticsContainer: + # Mask broken pixels + masked_images = np.ma.array(images, mask=masked_pixels_of_sample) + + # Compute the mean, median, and std over the chunk per pixel + pixel_mean, pixel_median, pixel_std = sigma_clipped_stats( + masked_images, + sigma=self.max_sigma, + maxiters=self.iterations, + cenfunc="mean", + axis=0, + ) + + return StatisticsContainer( + n_events=masked_images.shape[0], + mean=pixel_mean, + median=pixel_median, + std=pixel_std, + ) diff --git a/src/ctapipe/monitoring/tests/__init__.py b/src/ctapipe/monitoring/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/ctapipe/monitoring/tests/test_aggregator.py b/src/ctapipe/monitoring/tests/test_aggregator.py new file mode 100644 index 00000000000..8d15b2a7c8a --- /dev/null +++ b/src/ctapipe/monitoring/tests/test_aggregator.py @@ -0,0 +1,145 @@ +""" +Tests for StatisticsAggregator and related functions +""" + +import numpy as np +import pytest +from astropy.table import Table +from astropy.time import Time + +from ctapipe.monitoring.aggregator import PlainAggregator, SigmaClippingAggregator + + +def test_aggregators(example_subarray): + """test basic functionality of the StatisticsAggregators""" + + # Create dummy data for testing + times = Time( + np.linspace(60117.911, 60117.9258, num=5000), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=5000, dtype=int) + rng = np.random.default_rng(0) + ped_data = rng.normal(2.0, 5.0, size=(5000, 2, 1855)) + charge_data = rng.normal(77.0, 10.0, size=(5000, 2, 1855)) + time_data = rng.normal(18.0, 5.0, size=(5000, 2, 1855)) + # Create tables + ped_table = Table( + [times, event_ids, ped_data], + names=("time_mono", "event_id", "image"), + ) + charge_table = Table( + [times, event_ids, charge_data], + names=("time_mono", "event_id", "image"), + ) + time_table = Table( + [times, event_ids, time_data], + names=("time_mono", "event_id", "peak_time"), + ) + # Initialize the aggregators + chunk_size = 2500 + ped_aggregator = SigmaClippingAggregator( + subarray=example_subarray, chunk_size=chunk_size + ) + ff_charge_aggregator = SigmaClippingAggregator( + subarray=example_subarray, chunk_size=chunk_size + ) + ff_time_aggregator = PlainAggregator( + subarray=example_subarray, chunk_size=chunk_size + ) + + # Compute the statistical values + ped_stats = ped_aggregator(table=ped_table) + charge_stats = ff_charge_aggregator(table=charge_table) + time_stats = ff_time_aggregator(table=time_table, col_name="peak_time") + + # Check if the start and end values are properly set for the timestamps and event IDs + # and if the number of events used for the computation of aggregated statistic values is equal the size of the chunk + assert ped_stats[0]["time_start"] == times[0] + assert time_stats[0]["event_id_start"] == event_ids[0] + assert ped_stats[1]["time_end"] == times[-1] + assert time_stats[1]["event_id_end"] == event_ids[-1] + np.testing.assert_allclose(ped_stats["n_events"], chunk_size) + + # Check if the calculated statistical values are reasonable + # for a camera with two gain channels + np.testing.assert_allclose(ped_stats[0]["mean"], 2.0, atol=1.5) + np.testing.assert_allclose(charge_stats[0]["mean"], 77.0, atol=1.5) + np.testing.assert_allclose(time_stats[0]["mean"], 18.0, atol=1.5) + + np.testing.assert_allclose(ped_stats[1]["median"], 2.0, atol=1.5) + np.testing.assert_allclose(charge_stats[1]["median"], 77.0, atol=1.5) + np.testing.assert_allclose(time_stats[1]["median"], 18.0, atol=1.5) + + np.testing.assert_allclose(ped_stats[0]["std"], 5.0, atol=1.5) + np.testing.assert_allclose(charge_stats[0]["std"], 10.0, atol=1.5) + np.testing.assert_allclose(time_stats[0]["std"], 5.0, atol=1.5) + + +def test_chunk_shift(example_subarray): + """test the chunk shift option and the boundary case for the last chunk""" + + # Create dummy data for testing + times = Time( + np.linspace(60117.911, 60117.9258, num=5500), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=5500, dtype=int) + rng = np.random.default_rng(0) + charge_data = rng.normal(77.0, 10.0, size=(5500, 2, 1855)) + # Create table + charge_table = Table( + [times, event_ids, charge_data], + names=("time_mono", "event_id", "image"), + ) + # Initialize the aggregator + aggregator = SigmaClippingAggregator(subarray=example_subarray, chunk_size=2500) + # Compute aggregated statistic values + chunk_stats = aggregator(table=charge_table) + chunk_stats_shift = aggregator(table=charge_table, chunk_shift=2000) + # Check if three chunks are used for the computation of aggregated statistic values as the last chunk overflows + assert len(chunk_stats) == 3 + # Check if two chunks are used for the computation of aggregated statistic values as the last chunk is dropped + assert len(chunk_stats_shift) == 2 + # Check if ValueError is raised when the chunk_size is larger than the length of table + with pytest.raises(ValueError): + _ = aggregator(table=charge_table[1000:1500]) + # Check if ValueError is raised when the chunk_shift is smaller than the chunk_size + with pytest.raises(ValueError): + _ = aggregator(table=charge_table, chunk_shift=3000) + + +def test_with_outliers(example_subarray): + """test the robustness of the aggregators in the presence of outliers""" + + # Create dummy data for testing + times = Time( + np.linspace(60117.911, 60117.9258, num=5000), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=5000, dtype=int) + rng = np.random.default_rng(0) + ped_data = rng.normal(2.0, 5.0, size=(5000, 2, 1855)) + # Insert fake outliers that will skrew the mean value + ped_data[12, 0, :] = 100000.0 + ped_data[16, 0, :] = 100000.0 + ped_data[18, 1, :] = 100000.0 + ped_data[28, 1, :] = 100000.0 + # Create table + ped_table = Table( + [times, event_ids, ped_data], + names=("time_mono", "event_id", "image"), + ) + # Initialize the aggregators + sigmaclipping_aggregator = SigmaClippingAggregator( + subarray=example_subarray, chunk_size=2500 + ) + plain_aggregator = PlainAggregator(subarray=example_subarray, chunk_size=2500) + + # Compute aggregated statistic values + sigmaclipping_chunk_stats = sigmaclipping_aggregator(table=ped_table) + plain_chunk_stats = plain_aggregator(table=ped_table) + + # Check if SigmaClippingAggregator is robust to a few fake outliers as expected + np.testing.assert_allclose(sigmaclipping_chunk_stats[0]["mean"], 2.0, atol=1.5) + + # Check if PlainAggregator is not robust to a few fake outliers as expected + with pytest.raises(AssertionError): + np.testing.assert_allclose(plain_chunk_stats[0]["mean"], 2.0, atol=1.5)