From f45babec4b8318f46dd34ee08037a03b7f1df82e Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Tue, 12 Dec 2023 10:28:53 +0100 Subject: [PATCH] Add docstrings and tests --- ctapipe/conftest.py | 35 ++++++- ctapipe/io/hdf5eventsource.py | 1 + ctapipe/io/pointing.py | 55 +++++++++-- ctapipe/io/tests/test_hdf5eventsource.py | 11 +++ ctapipe/io/tests/test_pointing.py | 113 +++++++++++++++++++++++ ctapipe/io/tests/test_table_loader.py | 32 +------ 6 files changed, 208 insertions(+), 39 deletions(-) create mode 100644 ctapipe/io/tests/test_pointing.py diff --git a/ctapipe/conftest.py b/ctapipe/conftest.py index 7efaa9925c9..442bc670608 100644 --- a/ctapipe/conftest.py +++ b/ctapipe/conftest.py @@ -1,12 +1,14 @@ """ common pytest fixtures for tests in ctapipe """ - +import shutil from copy import deepcopy import astropy.units as u +import numpy as np import pytest from astropy.coordinates import EarthLocation +from astropy.table import Table from pytest_astropy_header.display import PYTEST_HEADER_MODULES from ctapipe.core import run_tool @@ -645,3 +647,34 @@ def disp_reconstructor_path(model_tmp_path, gamma_train_clf): def reference_location(): """a dummy EarthLocation to use for SubarrayDescriptions""" return EarthLocation(lon=-17 * u.deg, lat=28 * u.deg, height=2200 * u.m) + + +@pytest.fixture(scope="session") +def dl1_mon_pointing_file(dl1_file, dl1_tmp_path): + from ctapipe.instrument import SubarrayDescription + from ctapipe.io import read_table, write_table + + path = dl1_tmp_path / "dl1_mon_ponting.dl1.h5" + shutil.copy(dl1_file, path) + + events = read_table(path, "/dl1/event/subarray/trigger") + subarray = SubarrayDescription.from_hdf(path) + + # create some dummy monitoring data + time = events["time"] + start, stop = time[[0, -1]] + duration = (stop - start).to_value(u.s) + + # start a bit before, go a bit longer + dt = np.arange(-1, duration + 2, 1) * u.s + time_mon = start + dt + + alt = (69 + 2 * dt / dt[-1]) * u.deg + az = (180 + 5 * dt / dt[-1]) * u.deg + + table = Table({"time": time_mon, "azimuth": az, "altitude": alt}) + + for tel_id in subarray.tel: + write_table(table, path, f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}") + + return path diff --git a/ctapipe/io/hdf5eventsource.py b/ctapipe/io/hdf5eventsource.py index ebea05a10bf..0e0e1cbfeb1 100644 --- a/ctapipe/io/hdf5eventsource.py +++ b/ctapipe/io/hdf5eventsource.py @@ -588,6 +588,7 @@ def _generator(self): h5file=self.file_, parent=self, ) + tel_pointing_finder = None else: pointing_interpolator = None diff --git a/ctapipe/io/pointing.py b/ctapipe/io/pointing.py index 9efcee831e3..4e108191344 100644 --- a/ctapipe/io/pointing.py +++ b/ctapipe/io/pointing.py @@ -3,6 +3,7 @@ import astropy.units as u import numpy as np import tables +from astropy.time import Time from scipy.interpolate import interp1d from ctapipe.core import Component, traits @@ -29,10 +30,10 @@ class PointingInterpolator(Component): default_value=False, ).tag(config=True) - def __init__(self, h5file, **kwargs): + def __init__(self, h5file=None, **kwargs): super().__init__(**kwargs) - if not isinstance(h5file, tables.File): + if h5file is not None and not isinstance(h5file, tables.File): raise TypeError("h5file must be a tables.File") self.h5file = h5file @@ -49,24 +50,57 @@ def __init__(self, h5file, **kwargs): self._alt_interpolators = {} self._az_interpolators = {} - def _read_pointing_table(self, tel_id): - pointing_table = read_table( - self.h5file, - f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}", - ) + def add_table(self, tel_id, pointing_table): + """ + Add a table to this interpolator + + Parameters + ---------- + tel_id : int + Telescope id + pointing_table : astropy.table.Table + Table of pointing values, expected columns + are ``time`` as ``Time`` column, ``azimuth`` and ``altitude`` + as quantity columns. + """ + missing = {"time", "azimuth", "altitude"} - set(pointing_table.colnames) + if len(missing) > 0: + raise ValueError(f"Table is missing required column(s): {missing}") + + if not isinstance(pointing_table["time"], Time): + raise TypeError("'time' column of pointing table must be astropy.time.Time") + + for col in ("azimuth", "altitude"): + unit = pointing_table[col].unit + if unit is None or not u.rad.is_equivalent(unit): + raise ValueError(f"{col} must have units compatible with 'rad'") # sort first, so it's not done twice for each interpolator pointing_table.sort("time") + # interpolate in mjd TAI. Float64 mjd is precise enough for pointing + # and TAI is contiguous, so no issues with leap seconds. mjd = pointing_table["time"].tai.mjd az = pointing_table["azimuth"].quantity.to_value(u.rad) - # prepare azimuth for interpolation "unwrapping", i.e. turning 359, 1 into 359, 361 + # prepare azimuth for interpolation by "unwrapping": i.e. turning + # [359, 1] into [359, 361]. This assumes that if we get values like + # [359, 1] the telescope moved 2 degrees through 0, not 358 degrees + # the other way around. This should be true for all telescopes given + # the sampling speed of pointing values and their maximum movement speed. + # No telescope can turn more than 180° in 2 seconds. az = np.unwrap(az) alt = pointing_table["altitude"].quantity.to_value(u.rad) self._az_interpolators[tel_id] = interp1d(mjd, az, **self.interp_options) self._alt_interpolators[tel_id] = interp1d(mjd, alt, **self.interp_options) + def _read_pointing_table(self, tel_id): + pointing_table = read_table( + self.h5file, + f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}", + ) + self.add_table(tel_id, pointing_table) + def __call__(self, tel_id, time): """ Interpolate alt/az for given time and tel_id. @@ -86,7 +120,10 @@ def __call__(self, tel_id, time): interpolated azimuth angle """ if tel_id not in self._az_interpolators: - self._read_pointing_table(tel_id) + if self.h5file is not None: + self._read_pointing_table(tel_id) + else: + raise KeyError(f"No pointing table available for tel_id {tel_id}") mjd = time.tai.mjd az = u.Quantity(self._az_interpolators[tel_id](mjd), u.rad, copy=False) diff --git a/ctapipe/io/tests/test_hdf5eventsource.py b/ctapipe/io/tests/test_hdf5eventsource.py index f426e024413..bc8b1aa33d9 100644 --- a/ctapipe/io/tests/test_hdf5eventsource.py +++ b/ctapipe/io/tests/test_hdf5eventsource.py @@ -251,3 +251,14 @@ def test_dl1_camera_frame(dl1_camera_frame_file): assert sim.true_parameters.hillas.intensity is not None assert tel_id is not None, "did not test any events" + + +def test_interpolate_pointing(dl1_mon_pointing_file): + from ctapipe.io import HDF5EventSource + + with HDF5EventSource(dl1_mon_pointing_file) as source: + for e in source: + assert set(e.pointing.tel.keys()) == set(e.trigger.tels_with_trigger) + for pointing in e.pointing.tel.values(): + assert not np.isnan(pointing.azimuth) + assert not np.isnan(pointing.altitude) diff --git a/ctapipe/io/tests/test_pointing.py b/ctapipe/io/tests/test_pointing.py new file mode 100644 index 00000000000..6817083944f --- /dev/null +++ b/ctapipe/io/tests/test_pointing.py @@ -0,0 +1,113 @@ +import astropy.units as u +import numpy as np +import pytest +import tables +from astropy.table import Table +from astropy.time import Time + + +def test_simple(): + """Test pointing interpolation""" + from ctapipe.io.pointing import PointingInterpolator + + t0 = Time("2022-01-01T00:00:00") + + table = Table( + { + "time": t0 + np.arange(0.0, 10.1, 2.0) * u.s, + "azimuth": np.linspace(0.0, 10.0, 6) * u.deg, + "altitude": np.linspace(70.0, 60.0, 6) * u.deg, + }, + ) + + interpolator = PointingInterpolator() + interpolator.add_table(1, table) + + alt, az = interpolator(tel_id=1, time=t0 + 1 * u.s) + assert u.isclose(alt, 69 * u.deg) + assert u.isclose(az, 1 * u.deg) + + with pytest.raises(KeyError): + interpolator(tel_id=2, time=t0 + 1 * u.s) + + +def test_azimuth_switchover(): + """Test pointing interpolation""" + from ctapipe.io.pointing import PointingInterpolator + + t0 = Time("2022-01-01T00:00:00") + + table = Table( + { + "time": t0 + [0, 1, 2] * u.s, + "azimuth": [359, 1, 3] * u.deg, + "altitude": [60, 61, 62] * u.deg, + }, + ) + + interpolator = PointingInterpolator() + interpolator.add_table(1, table) + + alt, az = interpolator(tel_id=1, time=t0 + 0.5 * u.s) + assert u.isclose(az, 360 * u.deg) + assert u.isclose(alt, 60.5 * u.deg) + + +def test_invalid_input(): + """Test invalid pointing tables raise nice errors""" + from ctapipe.io.pointing import PointingInterpolator + + wrong_time = Table( + { + "time": [1, 2, 3] * u.s, + "azimuth": [1, 2, 3] * u.deg, + "altitude": [1, 2, 3] * u.deg, + } + ) + + interpolator = PointingInterpolator() + with pytest.raises(TypeError, match="astropy.time.Time"): + interpolator.add_table(1, wrong_time) + + wrong_unit = Table( + { + "time": Time(1.7e9 + np.arange(3), format="unix"), + "azimuth": [1, 2, 3] * u.m, + "altitude": [1, 2, 3] * u.deg, + } + ) + with pytest.raises(ValueError, match="compatible with 'rad'"): + interpolator.add_table(1, wrong_unit) + + wrong_unit = Table( + { + "time": Time(1.7e9 + np.arange(3), format="unix"), + "azimuth": [1, 2, 3] * u.deg, + "altitude": [1, 2, 3], + } + ) + with pytest.raises(ValueError, match="compatible with 'rad'"): + interpolator.add_table(1, wrong_unit) + + +def test_hdf5(tmp_path): + from ctapipe.io import write_table + from ctapipe.io.pointing import PointingInterpolator + + t0 = Time("2022-01-01T00:00:00") + + table = Table( + { + "time": t0 + np.arange(0.0, 10.1, 2.0) * u.s, + "azimuth": np.linspace(0.0, 10.0, 6) * u.deg, + "altitude": np.linspace(70.0, 60.0, 6) * u.deg, + }, + ) + + path = tmp_path / "pointing.h5" + write_table(table, path, "/dl0/monitoring/telescope/pointing/tel_001") + with tables.open_file(path) as h5file: + interpolator = PointingInterpolator(h5file) + alt, az = interpolator(tel_id=1, time=t0 + 1 * u.s) + assert u.isclose(alt, 69 * u.deg) + assert u.isclose(az, 1 * u.deg) diff --git a/ctapipe/io/tests/test_table_loader.py b/ctapipe/io/tests/test_table_loader.py index 8f066c6d02c..15164a0f302 100644 --- a/ctapipe/io/tests/test_table_loader.py +++ b/ctapipe/io/tests/test_table_loader.py @@ -1,5 +1,3 @@ -import shutil - import astropy.units as u import numpy as np import pytest @@ -440,34 +438,10 @@ def test_order_merged(): check_equal_array_event_order(table, tel_trigger[mask]) -def test_interpolate_pointing(dl1_file, tmp_path): - from ctapipe.io import TableLoader, write_table - - path = tmp_path / dl1_file.name - shutil.copy(dl1_file, path) - - with TableLoader(dl1_file) as loader: - events = loader.read_telescope_events() - subarray = loader.subarray - - # create some dummy monitoring data - time = events["time"] - start, stop = time[[0, -1]] - duration = (stop - start).to_value(u.s) - - # start a bit before, go a bit longer - dt = np.arange(-1, duration + 2, 1) * u.s - time_mon = start + dt - - alt = (69 + 2 * dt / dt[-1]) * u.deg - az = (180 + 5 * dt / dt[-1]) * u.deg - - table = Table({"time": time_mon, "azimuth": az, "altitude": alt}) - - for tel_id in subarray.tel: - write_table(table, path, f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}") +def test_interpolate_pointing(dl1_mon_pointing_file, tmp_path): + from ctapipe.io import TableLoader - with TableLoader(path, interpolate_pointing=True) as loader: + with TableLoader(dl1_mon_pointing_file, interpolate_pointing=True) as loader: events = loader.read_telescope_events([4]) assert len(events) > 0 assert "telescope_pointing_azimuth" in events.colnames