Skip to content

Commit

Permalink
Merge pull request #2554 from cta-observatory/stats_extractor
Browse files Browse the repository at this point in the history
Add StatisticsAggregator API
  • Loading branch information
maxnoe authored Aug 2, 2024
2 parents 9fd15c2 + ea058a0 commit 447f44c
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 26 deletions.
11 changes: 11 additions & 0 deletions docs/api-reference/monitoring/aggregator.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.. _stats_aggregator:

*********************
Statistics Aggregator
*********************


Reference/API
=============

.. automodapi:: ctapipe.monitoring.aggregator
30 changes: 30 additions & 0 deletions docs/api-reference/monitoring/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
.. _monitoring:

**********************************
Monitoring (`~ctapipe.monitoring`)
**********************************

.. currentmodule:: ctapipe.monitoring

Monitoring data are time-series used to monitor the status or quality of hardware, software algorithms, the environment, or other data products. These contain values recorded periodically at different rates, and can be thought of as a set of tables with rows identified by a time-stamp. They are potentially acquired during the day or nighttime operation of the array and during subsequent data processing, but ataverage rates much slower than Event data and faster than the length of a typical observation block. Examples include telescope tracking positions, trigger rates, camera sensor conditions, weather conditions, and the status or quality-control data of a particular hardware or software component.

This module provides some code to help to generate monitoring data from processed event data, particularly for the purposes of calibration and data quality assessment.

Currently, only code related to :ref:`stats_aggregator` is implemented here.


Submodules
==========

.. toctree::
:maxdepth: 1
:glob:

aggregator


Reference/API
=============

.. automodapi:: ctapipe.monitoring
:no-inheritance-diagram:
1 change: 1 addition & 0 deletions docs/changes/2554.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add API to extract the statistics from a sequence of images.
36 changes: 18 additions & 18 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,24 +392,24 @@ def setup(app):

# Configuration for intersphinx
intersphinx_mapping = {
"astropy": ("https://docs.astropy.org/en/stable/", None),
"bokeh": ("https://docs.bokeh.org/en/latest/", None),
"cython": ("https://docs.cython.org/en/stable/", None),
"iminuit": ("https://scikit-hep.org/iminuit/", None),
"ipywidgets": ("https://ipywidgets.readthedocs.io/en/stable/", None),
"joblib": ("https://joblib.readthedocs.io/en/stable/", None),
"matplotlib": ("https://matplotlib.org/stable/", None),
"numba": ("https://numba.readthedocs.io/en/stable/", None),
"numpy": ("https://numpy.org/doc/stable/", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"psutil": ("https://psutil.readthedocs.io/en/stable/", None),
"pytables": ("https://www.pytables.org/", None),
"pytest": ("https://docs.pytest.org/en/stable/", None),
"python": ("https://docs.python.org/3/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/", None),
"setuptools": ("https://setuptools.pypa.io/en/stable/", None),
"sklearn": ("https://scikit-learn.org/stable/", None),
"traitlets": ("https://traitlets.readthedocs.io/en/stable/", None),
"astropy": ("https://docs.astropy.org/en/stable", None),
"bokeh": ("https://docs.bokeh.org/en/latest", None),
"cython": ("https://docs.cython.org/en/stable", None),
"iminuit": ("https://scikit-hep.org/iminuit", None),
"ipywidgets": ("https://ipywidgets.readthedocs.io/en/stable", None),
"joblib": ("https://joblib.readthedocs.io/en/stable", None),
"matplotlib": ("https://matplotlib.org/stable", None),
"numba": ("https://numba.readthedocs.io/en/stable", None),
"numpy": ("https://numpy.org/doc/stable", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable", None),
"psutil": ("https://psutil.readthedocs.io/en/stable", None),
"pytables": ("https://www.pytables.org", None),
"pytest": ("https://docs.pytest.org/en/stable", None),
"python": ("https://docs.python.org/3", None),
"scipy": ("https://docs.scipy.org/doc/scipy", None),
"setuptools": ("https://setuptools.pypa.io/en/stable", None),
"sklearn": ("https://scikit-learn.org/stable", None),
"traitlets": ("https://traitlets.readthedocs.io/en/stable", None),
}


Expand Down
28 changes: 25 additions & 3 deletions src/ctapipe/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"TriggerContainer",
"WaveformCalibrationContainer",
"StatisticsContainer",
"ImageStatisticsContainer",
"IntensityStatisticsContainer",
"PeakTimeStatisticsContainer",
"SchedulingBlockContainer",
Expand Down Expand Up @@ -421,7 +422,28 @@ class MorphologyContainer(Container):


class StatisticsContainer(Container):
"""Store descriptive statistics"""
"""Store descriptive statistics of a chunk of images"""

n_events = Field(-1, "number of events used for the extraction of the statistics")
mean = Field(
None,
"mean of a pixel-wise quantity for each channel"
"Type: float; Shape: (n_channels, n_pixel)",
)
median = Field(
None,
"median of a pixel-wise quantity for each channel"
"Type: float; Shape: (n_channels, n_pixel)",
)
std = Field(
None,
"standard deviation of a pixel-wise quantity for each channel"
"Type: float; Shape: (n_channels, n_pixel)",
)


class ImageStatisticsContainer(Container):
"""Store descriptive image statistics"""

max = Field(np.float32(nan), "value of pixel with maximum intensity")
min = Field(np.float32(nan), "value of pixel with minimum intensity")
Expand All @@ -431,11 +453,11 @@ class StatisticsContainer(Container):
kurtosis = Field(nan, "kurtosis of intensity")


class IntensityStatisticsContainer(StatisticsContainer):
class IntensityStatisticsContainer(ImageStatisticsContainer):
default_prefix = "intensity"


class PeakTimeStatisticsContainer(StatisticsContainer):
class PeakTimeStatisticsContainer(ImageStatisticsContainer):
default_prefix = "peak_time"


Expand Down
6 changes: 3 additions & 3 deletions src/ctapipe/image/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
from numba import float32, float64, guvectorize, int64, njit

from ..containers import StatisticsContainer
from ..containers import ImageStatisticsContainer

__all__ = [
"arg_n_largest",
Expand Down Expand Up @@ -88,8 +88,8 @@ def kurtosis(data, mean=None, std=None, fisher=True):


def descriptive_statistics(
values, container_class=StatisticsContainer
) -> StatisticsContainer:
values, container_class=ImageStatisticsContainer
) -> ImageStatisticsContainer:
"""compute intensity statistics of an image"""
mean = values.mean()
std = values.std()
Expand Down
4 changes: 2 additions & 2 deletions src/ctapipe/image/tests/test_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def test_kurtosis():


def test_return_type():
from ctapipe.containers import PeakTimeStatisticsContainer, StatisticsContainer
from ctapipe.containers import ImageStatisticsContainer, PeakTimeStatisticsContainer
from ctapipe.image import descriptive_statistics

rng = np.random.default_rng(0)
data = rng.normal(5, 2, 1000)

stats = descriptive_statistics(data)
assert isinstance(stats, StatisticsContainer)
assert isinstance(stats, ImageStatisticsContainer)

stats = descriptive_statistics(data, container_class=PeakTimeStatisticsContainer)
assert isinstance(stats, PeakTimeStatisticsContainer)
Expand Down
177 changes: 177 additions & 0 deletions src/ctapipe/monitoring/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""
Algorithms to compute aggregated time-series statistics from columns of an event table.
These classes take as input an events table, divide it into time chunks, which
may optionally overlap, and compute various aggregated statistics for each
chunk. The statistics include the count, mean, median, and standard deviation. The result
is a monitoring table with columns describing the start and stop time of the chunk
and the aggregated statistic values.
"""

__all__ = [
"StatisticsAggregator",
"PlainAggregator",
"SigmaClippingAggregator",
]

from abc import abstractmethod
from collections import defaultdict

import numpy as np
from astropy.stats import sigma_clipped_stats
from astropy.table import Table

from ctapipe.containers import StatisticsContainer
from ctapipe.core import TelescopeComponent
from ctapipe.core.traits import Int


class StatisticsAggregator(TelescopeComponent):
"""
Base component to handle the computation of aggregated statistic values from a table
containing e.g. charges, peak times and/or charge variances (images).
"""

chunk_size = Int(
2500,
help="Size of the chunk used for the computation of aggregated statistic values",
).tag(config=True)

def __call__(
self,
table,
masked_pixels_of_sample=None,
chunk_shift=None,
col_name="image",
) -> Table:
"""
Divide table into chunks and compute aggregated statistic values.
This function divides the input table into overlapping or non-overlapping chunks of size ``chunk_size``
and call the relevant function of the particular aggregator to compute aggregated statistic values.
The chunks are generated in a way that ensures they do not overflow the bounds of the table.
- If ``chunk_shift`` is None, chunks will not overlap, but the last chunk is ensured to be
of size `chunk_size`, even if it means the last two chunks will overlap.
- If ``chunk_shift`` is provided, it will determine the number of samples to shift between the start
of consecutive chunks resulting in an overlap of chunks. Chunks that overflows the bounds
of the table are not considered.
Parameters
----------
table : astropy.table.Table
table with images of shape (n_images, n_channels, n_pix)
and timestamps of shape (n_images, )
masked_pixels_of_sample : ndarray, optional
boolean array of masked pixels of shape (n_pix, ) that are not available for processing
chunk_shift : int, optional
number of samples to shift between the start of consecutive chunks
col_name : string
column name in the table
Returns
-------
astropy.table.Table
table containing the start and end values as timestamps and event IDs
as well as the aggregated statistic values (mean, median, std) for each chunk
"""

# Check if the statistics of the table is sufficient to compute at least one complete chunk.
if len(table) < self.chunk_size:
raise ValueError(
f"The length of the provided table ({len(table)}) is insufficient to meet the required statistics for a single chunk of size ({self.chunk_size})."
)
# Check if the chunk_shift is smaller than the chunk_size
if chunk_shift is not None and chunk_shift > self.chunk_size:
raise ValueError(
f"The chunk_shift ({chunk_shift}) must be smaller than the chunk_size ({self.chunk_size})."
)

# Function to split the table into appropriated chunks
def _get_chunks(table, chunk_shift):
# Calculate the range step: Use chunk_shift if provided, otherwise use chunk_size
step = chunk_shift or self.chunk_size

# Generate chunks that do not overflow
for i in range(0, len(table) - self.chunk_size + 1, step):
yield table[i : i + self.chunk_size]

# If chunk_shift is None, ensure the last chunk is of size chunk_size, if needed
if chunk_shift is None and len(table) % self.chunk_size != 0:
yield table[-self.chunk_size :]

# Compute aggregated statistic values for each chunk of images
units = {col: table[col_name].unit for col in ("mean", "median", "std")}
data = defaultdict(list)
for chunk in _get_chunks(table, chunk_shift):
stats = self.compute_stats(chunk[col_name].data, masked_pixels_of_sample)
data["time_start"].append(chunk["time_mono"][0])
data["time_end"].append(chunk["time_mono"][-1])
data["event_id_start"].append(chunk["event_id"][0])
data["event_id_end"].append(chunk["event_id"][-1])
data["n_events"].append(stats.n_events)
data["mean"].append(stats.mean)
data["median"].append(stats.median)
data["std"].append(stats.std)

return Table(data, units=units)

@abstractmethod
def compute_stats(self, images, masked_pixels_of_sample) -> StatisticsContainer:
pass


class PlainAggregator(StatisticsAggregator):
"""
Compute aggregated statistic values from a chunk of images using numpy functions
"""

def compute_stats(self, images, masked_pixels_of_sample) -> StatisticsContainer:
# Mask broken pixels
masked_images = np.ma.array(images, mask=masked_pixels_of_sample)

# Compute the mean, median, and std over the chunk per pixel
pixel_mean = np.ma.mean(masked_images, axis=0).filled(np.nan)
pixel_median = np.ma.median(masked_images, axis=0).filled(np.nan)
pixel_std = np.ma.std(masked_images, axis=0).filled(np.nan)

return StatisticsContainer(
n_events=masked_images.shape[0],
mean=pixel_mean,
median=pixel_median,
std=pixel_std,
)


class SigmaClippingAggregator(StatisticsAggregator):
"""
Compute aggregated statistic values from a chunk of images using astropy's sigma clipping functions
"""

max_sigma = Int(
default_value=4,
help="Maximal value for the sigma clipping outlier removal",
).tag(config=True)
iterations = Int(
default_value=5,
help="Number of iterations for the sigma clipping outlier removal",
).tag(config=True)

def compute_stats(self, images, masked_pixels_of_sample) -> StatisticsContainer:
# Mask broken pixels
masked_images = np.ma.array(images, mask=masked_pixels_of_sample)

# Compute the mean, median, and std over the chunk per pixel
pixel_mean, pixel_median, pixel_std = sigma_clipped_stats(
masked_images,
sigma=self.max_sigma,
maxiters=self.iterations,
cenfunc="mean",
axis=0,
)

return StatisticsContainer(
n_events=masked_images.shape[0],
mean=pixel_mean,
median=pixel_median,
std=pixel_std,
)
Empty file.
Loading

0 comments on commit 447f44c

Please sign in to comment.