diff --git a/src/ctapipe/image/tests/test_statistics.py b/src/ctapipe/image/tests/test_statistics.py index b9773edc9a7..06581027c74 100644 --- a/src/ctapipe/image/tests/test_statistics.py +++ b/src/ctapipe/image/tests/test_statistics.py @@ -1,6 +1,14 @@ +import astropy.units as u import numpy as np import scipy.stats +from ctapipe.containers import ( + ArrayEventContainer, + HillasParametersContainer, + ImageParametersContainer, + MorphologyContainer, +) + def test_statistics(): from ctapipe.image import descriptive_statistics @@ -60,3 +68,29 @@ def test_return_type(): stats = descriptive_statistics(data, container_class=PeakTimeStatisticsContainer) assert isinstance(stats, PeakTimeStatisticsContainer) + + +def test_feature_aggregator(): + from ctapipe.image import FeatureAggregator + + event = ArrayEventContainer() + for tel_id, length, n_islands in zip((2, 7, 11), (0.3, 0.5, 0.4), (2, 1, 3)): + event.dl1.tel[tel_id].parameters = ImageParametersContainer( + hillas=HillasParametersContainer(length=length * u.deg), + morphology=MorphologyContainer(n_islands=n_islands), + ) + + features = [ + ("hillas", "length"), + ("morphology", "n_islands"), + ] + aggregate_featuers = FeatureAggregator(image_parameters=features) + aggregate_featuers(event) + assert event.dl1.aggregate["hillas_length"].max == 0.5 * u.deg + assert event.dl1.aggregate["hillas_length"].min == 0.3 * u.deg + assert u.isclose(event.dl1.aggregate["hillas_length"].mean, 0.4 * u.deg) + assert u.isclose(event.dl1.aggregate["hillas_length"].std, 0.081649658 * u.deg) + assert event.dl1.aggregate["morphology_n_islands"].max == 3 + assert event.dl1.aggregate["morphology_n_islands"].min == 1 + assert np.isclose(event.dl1.aggregate["morphology_n_islands"].mean, 2) + assert np.isclose(event.dl1.aggregate["morphology_n_islands"].std, 0.81649658) diff --git a/src/ctapipe/tools/tests/test_aggregate_features.py b/src/ctapipe/tools/tests/test_aggregate_features.py new file mode 100644 index 00000000000..ebc25b89f57 --- /dev/null +++ b/src/ctapipe/tools/tests/test_aggregate_features.py @@ -0,0 +1,81 @@ +import json + +import pytest + +from ctapipe.core import ToolConfigurationError, run_tool +from ctapipe.io import TableLoader + + +def test_aggregate_features(dl2_shower_geometry_file_lapalma, tmp_path): + from ctapipe.tools.aggregate_features import AggregateFeatures + + input_path = dl2_shower_geometry_file_lapalma + output_path = tmp_path / "aggregated.dl1.h5" + config_path = tmp_path / "config.json" + + with pytest.raises( + ToolConfigurationError, + match="No image parameters to aggregate are specified.", + ): + run_tool( + AggregateFeatures(), + argv=[ + f"--input={input_path}", + f"--output={output_path}", + ], + raises=True, + ) + + config = { + "FeatureAggregator": { + "FeatureGenerator": { + "features": [("hillas_abs_skewness", "abs(hillas_skewness)")] + }, + "image_parameters": [ + ("hillas", "length"), + ("timing", "slope"), + ("HillasReconstructor", "tel_impact_distance"), + ("hillas", "abs_skewness"), + ], + } + } + with config_path.open("w") as f: + json.dump(config, f) + + # test "overwrite" works + with pytest.raises(ToolConfigurationError, match="exists, but overwrite=False"): + run_tool( + AggregateFeatures(), + argv=[ + f"--input={input_path}", + f"--output={output_path}", + f"--config={config_path}", + ], + raises=True, + ) + + ret = run_tool( + AggregateFeatures(), + argv=[ + f"--input={input_path}", + f"--output={output_path}", + f"--config={config_path}", + "--overwrite", + ], + raises=True, + ) + assert ret == 0 + + with TableLoader(output_path) as loader: + events = loader.read_subarray_events( + dl1_aggregates=True, + dl2=False, + simulated=False, + ) + assert "hillas_length_max" in events.colnames + assert "hillas_length_min" in events.colnames + assert "hillas_length_mean" in events.colnames + assert "hillas_length_std" in events.colnames + assert "timing_slope_mean" in events.colnames + assert "HillasReconstructor_tel_impact_distance_mean" in events.colnames + assert "hillas_abs_skewness_mean" in events.colnames diff --git a/src/ctapipe/vectorization/tests/test_aggregate.py b/src/ctapipe/vectorization/tests/test_aggregate.py new file mode 100644 index 00000000000..14c3f578c4f --- /dev/null +++ b/src/ctapipe/vectorization/tests/test_aggregate.py @@ -0,0 +1,50 @@ +import numpy as np +from astropy.table import Table + +from ctapipe.io import TableLoader, read_table +from ctapipe.io.tests.test_table_loader import check_equal_array_event_order + + +def test_get_subarray_index(dl1_parameters_file): + from ctapipe.vectorization import get_subarray_index + + opts = dict(simulated=False, true_parameters=False, dl2=False, pointing=False) + with TableLoader(dl1_parameters_file, **opts) as loader: + tel_events = loader.read_telescope_events() + + subarray_obs_ids, subarray_event_ids, _, _ = get_subarray_index(tel_events) + trigger = read_table(dl1_parameters_file, "/dl1/event/subarray/trigger") + + assert len(subarray_obs_ids) == len(subarray_event_ids) + assert len(subarray_obs_ids) == len(trigger) + check_equal_array_event_order( + Table({"obs_id": subarray_obs_ids, "event_id": subarray_event_ids}), trigger + ) + + +def test_mean_std_ufunc(dl1_parameters_file): + from ctapipe.vectorization import get_subarray_index, weighted_mean_std_ufunc + + opts = dict(simulated=False, true_parameters=False, dl2=False, pointing=False) + with TableLoader(dl1_parameters_file, **opts) as loader: + tel_events = loader.read_telescope_events() + + valid = np.isfinite(tel_events["hillas_length"]) + obs_ids, event_ids, m, tel_to_subarray_idx = get_subarray_index(tel_events) + n_array_events = len(obs_ids) + + # test only default uniform weights, + # other weights are tested in test_stereo_combination + mean, std = weighted_mean_std_ufunc( + tel_events["hillas_length"], valid, n_array_events, tel_to_subarray_idx, m + ) + + # check if result is identical with np.mean/np.std + true_mean = np.full(n_array_events, np.nan) + true_std = np.full(n_array_events, np.nan) + grouped = tel_events.group_by(["obs_id", "event_id"]) + true_mean = grouped["hillas_length"].groups.aggregate(np.nanmean) + true_std = grouped["hillas_length"].groups.aggregate(np.nanstd) + + assert np.allclose(mean, true_mean, equal_nan=True) + assert np.allclose(std, true_std, equal_nan=True)