diff --git a/docs/changes/2422.feature.rst b/docs/changes/2422.feature.rst new file mode 100644 index 00000000000..8eb28e7f3fd --- /dev/null +++ b/docs/changes/2422.feature.rst @@ -0,0 +1,2 @@ +Implement the overburden-to height a.s.l. transformation function in the atmosphere module +and test that round-trip returns original value. diff --git a/src/ctapipe/atmosphere.py b/src/ctapipe/atmosphere.py index 884e03965a5..0fcd865054c 100644 --- a/src/ctapipe/atmosphere.py +++ b/src/ctapipe/atmosphere.py @@ -10,12 +10,13 @@ """ import abc +import warnings from dataclasses import dataclass from functools import partial import numpy as np from astropy import units as u -from astropy.table import Table +from astropy.table import QTable, Table from scipy.interpolate import interp1d __all__ = [ @@ -29,6 +30,16 @@ 1, } +GRAMMAGE_UNIT = u.g / u.cm**2 +DENSITY_UNIT = u.g / u.cm**3 + + +class SlantDepthZenithRangeWarning(UserWarning): + """ + Issued when the zenith angle of an event is beyond + the range where the slant depth computation is correct (approx. 70 deg) + """ + class AtmosphereDensityProfile(abc.ABC): """ @@ -58,20 +69,37 @@ def integral(self, height: u.Quantity) -> u.Quantity: """ - def line_of_sight_integral( - self, distance: u.Quantity, zenith_angle=0 * u.deg, output_units=u.g / u.cm**2 + @abc.abstractmethod + def height_from_overburden(self, overburden: u.Quantity) -> u.Quantity: + r"""Get the height a.s.l. from the mass overburden in the atmosphere. + Inverse of the integral function + + Returns + ------- + u.Quantity["m"]: + Height a.s.l. for given overburden + + """ + + def slant_depth_from_height( + self, + height: u.Quantity, + zenith_angle=0 * u.deg, + output_units=GRAMMAGE_UNIT, ): - r"""Line-of-sight integral from the shower distance to infinity, along + r"""Line-of-sight integral from height to infinity, along the direction specified by the zenith angle. This is sometimes called the *slant depth*. The atmosphere here is assumed to be Cartesian, the - curvature of the Earth is not taken into account. + curvature of the Earth is not taken into account. This approximation breaks down + for large zenith angles (>70 deg), in which case this function + does not give correct results. Inverse of height_from_slant_depth function. - .. math:: X(h, \Psi) = \int_{h}^{\infty} \rho(h' \cos{\Psi}) dh' + .. math:: X(h, \Psi) = \int_{h}^{\infty} \rho(h') dh' / \cos{\Psi} Parameters ---------- - distance: u.Quantity["length"] - line-of-site distance from observer to point + height: u.Quantity["length"] + height a.s.l. at which to start integral zenith_angle: u.Quantity["angle"] zenith angle of observation output_units: u.Unit @@ -79,9 +107,43 @@ def line_of_sight_integral( """ - return ( - self.integral(distance * np.cos(zenith_angle)) / np.cos(zenith_angle) - ).to(output_units) + if np.any(zenith_angle > 70 * u.deg): + warnings.warn( + "Zenith angle too high for accurate slant depth", + SlantDepthZenithRangeWarning, + ) + + return (self.integral(height) / np.cos(zenith_angle)).to(output_units) + + def height_from_slant_depth( + self, + slant_depth: u.Quantity, + zenith_angle=0 * u.deg, + output_units=u.m, + ): + r"""Calculates height a.s.l. in the atmosphere from traversed slant depth + taking into account the shower zenith angle. + + Parameters + ---------- + slant_depth: u.Quantity["grammage"] + line-of-site distance from observer to point + zenith_angle: u.Quantity["angle"] + zenith angle of observation + output_units: u.Unit + unit to output (must be convertible to m) + + """ + + if np.any(zenith_angle > 70 * u.deg): + warnings.warn( + "Zenith angle too high for accurate slant depth", + SlantDepthZenithRangeWarning, + ) + + return (self.height_from_overburden(slant_depth * np.cos(zenith_angle))).to( + output_units + ) def peek(self): """ @@ -102,21 +164,21 @@ def peek(self): axis[0].set_ylabel(f"Density / {density.unit.to_string('latex')}") axis[0].grid(True) - distance = np.linspace(1, 100, 500) * u.km + heights = np.linspace(1, 100, 500) * u.km for zenith_angle in [0, 40, 50, 70] * u.deg: - column_density = self.line_of_sight_integral(distance, zenith_angle) - axis[1].plot(distance, column_density, label=f"$\\Psi$={zenith_angle}") + column_density = self.slant_depth_from_height(heights, zenith_angle) + axis[1].plot(heights, column_density, label=f"$\\Psi$={zenith_angle}") axis[1].legend(loc="best") - axis[1].set_xlabel(f"Distance / {distance.unit.to_string('latex')}") + axis[1].set_xlabel(f"Distance / {heights.unit.to_string('latex')}") axis[1].set_ylabel(f"Column Density / {column_density.unit.to_string('latex')}") axis[1].set_yscale("log") axis[1].grid(True) - zenith_angle = np.linspace(0, 80, 20) * u.deg - for distance in [0, 5, 10, 20] * u.km: - column_density = self.line_of_sight_integral(distance, zenith_angle) - axis[2].plot(zenith_angle, column_density, label=f"Height={distance}") + zenith_angle = np.linspace(0, 70, 20) * u.deg + for height in [0, 5, 10, 20] * u.km: + column_density = self.slant_depth_from_height(height, zenith_angle) + axis[2].plot(zenith_angle, column_density, label=f"Height={height}") axis[2].legend(loc="best") axis[2].set_xlabel( @@ -134,6 +196,8 @@ def from_table(cls, table: Table): """return a subclass of AtmosphereDensityProfile from a serialized table""" + table = QTable(table, copy=False) + if "TAB_TYPE" not in table.meta: raise ValueError("expected a TAB_TYPE metadata field") @@ -177,19 +241,36 @@ class ExponentialAtmosphereDensityProfile(AtmosphereDensityProfile): """ scale_height: u.Quantity = 8 * u.km - scale_density: u.Quantity = 0.00125 * u.g / (u.cm**3) + scale_density: u.Quantity = 0.00125 * u.g / u.cm**3 @u.quantity_input(height=u.m) def __call__(self, height) -> u.Quantity: - return self.scale_density * np.exp(-height / self.scale_height) + return np.where( + height >= 0 * u.m, + self.scale_density * np.exp(-height / self.scale_height), + np.nan, + ) @u.quantity_input(height=u.m) def integral( self, height, ) -> u.Quantity: - return ( - self.scale_density * self.scale_height * np.exp(-height / self.scale_height) + return np.where( + height >= 0 * u.m, + self.scale_density + * self.scale_height + * np.exp(-height / self.scale_height), + np.nan, + ) + + @u.quantity_input(overburden=u.g / u.cm**2) + def height_from_overburden(self, overburden) -> u.Quantity: + return np.where( + overburden <= self.scale_height * self.scale_density, + -self.scale_height + * np.log(overburden / (self.scale_height * self.scale_density)), + np.nan, ) @@ -231,7 +312,7 @@ def __init__(self, table: Table): """ Parameters ---------- - table: Table + table : Table Table with columns `height`, `density`, and `column_density` """ @@ -239,23 +320,32 @@ def __init__(self, table: Table): if col not in table.colnames: raise ValueError(f"Missing expected column: {col} in table") - self.table = table[ + valid = ( (table["height"] >= 0) & (table["density"] > 0) & (table["column_density"] > 0) - ] + ) + self.table = QTable(table[valid], copy=False) # interpolation is done in log-y to minimize spline wobble + log_density = np.log10(self.table["density"].to_value(DENSITY_UNIT)) + log_column_density = np.log10( + self.table["column_density"].to_value(GRAMMAGE_UNIT) + ) + height_km = self.table["height"].to_value(u.km) - self._density_interp = interp1d( - self.table["height"].to("km").value, - np.log10(self.table["density"].to("g cm-3").value), + interp_kwargs = dict( kind="cubic", + bounds_error=False, + ) + self._density_interp = interp1d( + height_km, log_density, fill_value=(np.nan, -np.inf), **interp_kwargs ) self._col_density_interp = interp1d( - self.table["height"].to("km").value, - np.log10(self.table["column_density"].to("g cm-2").value), - kind="cubic", + height_km, log_column_density, fill_value=(np.nan, -np.inf), **interp_kwargs + ) + self._height_interp = interp1d( + log_column_density, height_km, fill_value=(np.inf, np.nan), **interp_kwargs ) # ensure it can be read back @@ -264,15 +354,18 @@ def __init__(self, table: Table): @u.quantity_input(height=u.m) def __call__(self, height) -> u.Quantity: - return u.Quantity( - 10 ** self._density_interp(height.to_value(u.km)), u.g / u.cm**3 - ) + log_density = self._density_interp(height.to_value(u.km)) + return u.Quantity(10**log_density, DENSITY_UNIT, copy=False) @u.quantity_input(height=u.m) def integral(self, height) -> u.Quantity: - return u.Quantity( - 10 ** self._col_density_interp(height.to_value(u.km)), u.g / u.cm**2 - ) + log_col_density = self._col_density_interp(height.to_value(u.km)) + return u.Quantity(10**log_col_density, GRAMMAGE_UNIT, copy=False) + + @u.quantity_input(overburden=u.g / (u.cm * u.cm)) + def height_from_overburden(self, overburden) -> u.Quantity: + log_overburden = np.log10(overburden.to_value(GRAMMAGE_UNIT)) + return u.Quantity(self._height_interp(log_overburden), u.km, copy=False) def __repr__(self): return ( @@ -290,6 +383,11 @@ def _exponential(h, a, b, c): return a + b * np.exp(-h / c) +def _inv_exponential(T, a, b, c): + "inverse function for exponential atmosphere" + return -c * np.log((T - a) / b) + + def _d_exponential(h, a, b, c): """derivative of exponential atmosphere""" return -b / c * np.exp(-h / c) @@ -297,7 +395,12 @@ def _d_exponential(h, a, b, c): def _linear(h, a, b, c): """linear atmosphere""" - return a - b * h / c + return np.where(h < a * c / b, a - b * h / c, 0) + + +def _inv_linear(T, a, b, c): + "inverse function for linear atmosphere" + return np.where(T > 0, -c / b * (T - a), np.inf) def _d_linear(h, a, b, c): @@ -305,6 +408,10 @@ def _d_linear(h, a, b, c): return -b / c +def _nan_func(x, unit): + return np.nan * unit + + class FiveLayerAtmosphereDensityProfile(AtmosphereDensityProfile): r""" CORSIKA 5-layer atmosphere model @@ -324,21 +431,28 @@ class FiveLayerAtmosphereDensityProfile(AtmosphereDensityProfile): """ def __init__(self, table: Table): - self.table = table + self.table = QTable(table, copy=False) - param_a = self.table["a"].to("g/cm2") - param_b = self.table["b"].to("g/cm2") - param_c = self.table["c"].to("km") + param_a = self.table["a"].to(GRAMMAGE_UNIT) + param_b = self.table["b"].to(GRAMMAGE_UNIT) + param_c = self.table["c"].to(u.km) # build list of column density functions and their derivatives: self._funcs = [ partial(f, a=param_a[i], b=param_b[i], c=param_c[i]) for i, f in enumerate([_exponential] * 4 + [_linear]) ] + self._funcs.insert(0, partial(_nan_func, unit=GRAMMAGE_UNIT)) + self._inv_funcs = [ + partial(f, a=param_a[4 - i], b=param_b[4 - i], c=param_c[4 - i]) + for i, f in enumerate([_inv_linear] + 4 * [_inv_exponential]) + ] + self._inv_funcs.append(partial(_nan_func, unit=u.m)) self._d_funcs = [ partial(f, a=param_a[i], b=param_b[i], c=param_c[i]) for i, f in enumerate([_d_exponential] * 4 + [_d_linear]) ] + self._d_funcs.insert(0, partial(_nan_func, unit=DENSITY_UNIT)) @classmethod def from_array(cls, array: np.ndarray, metadata: dict = None): @@ -350,7 +464,7 @@ def from_array(cls, array: np.ndarray, metadata: dict = None): if array.shape != (5, 5): raise ValueError("expected ndarray with shape (5,5)") - table = Table( + table = QTable( array, names=["height", "a", "b", "c", "1/c"], units=["cm", "g/cm2", "g/cm2", "cm", "cm-1"], @@ -367,8 +481,8 @@ def from_array(cls, array: np.ndarray, metadata: dict = None): @u.quantity_input(height=u.m) def __call__(self, height) -> u.Quantity: - which_func = np.digitize(height, self.table["height"]) - 1 - condlist = [which_func == i for i in range(5)] + which_func = np.digitize(height, self.table["height"]) + condlist = [which_func == i for i in range(6)] return u.Quantity( -1 * np.piecewise( @@ -376,19 +490,32 @@ def __call__(self, height) -> u.Quantity: condlist=condlist, funclist=self._d_funcs, ) - ).to(u.g / u.cm**3) + ).to(DENSITY_UNIT) @u.quantity_input(height=u.m) def integral(self, height) -> u.Quantity: - which_func = np.digitize(height, self.table["height"]) - 1 - condlist = [which_func == i for i in range(5)] + which_func = np.digitize(height, self.table["height"]) + condlist = [which_func == i for i in range(6)] return u.Quantity( np.piecewise( x=height, condlist=condlist, funclist=self._funcs, ) - ).to(u.g / u.cm**2) + ).to(GRAMMAGE_UNIT) + + @u.quantity_input(overburden=u.g / (u.cm * u.cm)) + def height_from_overburden(self, overburden) -> u.Quantity: + overburdens_at_heights = np.flip(self.integral(self.table["height"].to(u.km))) + which_func = np.digitize(overburden, overburdens_at_heights) + condlist = [which_func == i for i in range(6)] + return u.Quantity( + np.piecewise( + x=overburden, + condlist=condlist, + funclist=self._inv_funcs, + ) + ).to(u.km) def __repr__(self): return ( diff --git a/src/ctapipe/tests/test_atmosphere.py b/src/ctapipe/tests/test_atmosphere.py index 2fc0648488c..64013336451 100644 --- a/src/ctapipe/tests/test_atmosphere.py +++ b/src/ctapipe/tests/test_atmosphere.py @@ -26,12 +26,29 @@ def get_simtel_profile_from_eventsource(): return source.atmosphere_density_profile -@pytest.fixture(scope="session", name="table_profile") -def fixture_table_profile(): +@pytest.fixture(scope="session") +def table_profile(): """a table profile for testing""" return get_simtel_profile_from_eventsource() +@pytest.fixture(scope="session") +def layer5_profile(): + """a 5 layer profile for testing""" + fit_reference = np.array( + [ + [0.00 * 100000, -140.508, 1178.05, 994186, 0], + [9.75 * 100000, -18.4377, 1265.08, 708915, 0], + [19.00 * 100000, 0.217565, 1349.22, 636143, 0], + [46.00 * 100000, -0.000201796, 703.745, 721128, 0], + [106.00 * 100000, 0.000763128, 1, 1.57247e10, 0], + ] + ) + + profile_5 = atmo.FiveLayerAtmosphereDensityProfile.from_array(fit_reference) + return profile_5 + + def get_simtel_fivelayer_profile(): """ get a sample 3-layer profile @@ -65,7 +82,7 @@ def test_models(density_model): # check we can also compute the integral column_density = density_model.integral(10 * u.km) - assert column_density.unit.is_equivalent(u.g / u.cm**2) + assert column_density.unit.is_equivalent(atmo.GRAMMAGE_UNIT) assert np.isclose( density_model.integral(1 * u.km), density_model.integral(1000 * u.m) @@ -79,9 +96,9 @@ def test_exponential_model(): """check exponential models""" density_model = atmo.ExponentialAtmosphereDensityProfile( - scale_height=10 * u.m, scale_density=0.00125 * u.g / u.cm**3 + scale_height=10 * u.m, scale_density=0.00125 * atmo.DENSITY_UNIT ) - assert np.isclose(density_model(1_000_000 * u.km), 0 * u.g / u.cm**3) + assert np.isclose(density_model(1_000_000 * u.km), 0 * atmo.DENSITY_UNIT) assert np.isclose(density_model(0 * u.km), density_model.scale_density) @@ -98,7 +115,7 @@ def test_table_model_interpolation(table_profile): assert np.isfinite(table_profile.integral(height_fine)).all() -def test_against_reference(): +def test_against_reference(layer5_profile): """ Test five-layer and table methods against a reference analysis from SimTelArray. Data from communication with K. Bernloehr. @@ -111,25 +128,180 @@ def test_against_reference(): "atmosphere_profile_comparison_from_simtelarray" ) - fit_reference = np.array( - [ - [0.00 * 100000, -140.508, 1178.05, 994186, 0], - [9.75 * 100000, -18.4377, 1265.08, 708915, 0], - [19.00 * 100000, 0.217565, 1349.22, 636143, 0], - [46.00 * 100000, -0.000201796, 703.745, 721128, 0], - [106.00 * 100000, 0.000763128, 1, 1.57247e10, 0], - ] - ) - - profile_5 = atmo.FiveLayerAtmosphereDensityProfile.from_array(fit_reference) - height = reference_table["Altitude_km"].to("km") np.testing.assert_allclose( - 1.0 - profile_5(height) / reference_table["rho_5"], 0, atol=1e-5 + 1.0 - layer5_profile(height) / reference_table["rho_5"], 0, atol=1e-5 ) np.testing.assert_allclose( - 1.0 - profile_5.line_of_sight_integral(height) / reference_table["thick_5"], + 1.0 + - layer5_profile.slant_depth_from_height(height) / reference_table["thick_5"], 0, atol=1e-5, ) + + +def test_slant_depth_from_height(table_profile): + """ + Test observer altitude and zenith angle dependence of LoS integral function + """ + + assert table_profile.slant_depth_from_height( + 20 * u.km + ) < table_profile.slant_depth_from_height(10 * u.km) + + assert table_profile.slant_depth_from_height( + 10 * u.km, zenith_angle=30 * u.deg + ) > table_profile.slant_depth_from_height(10 * u.km, zenith_angle=0 * u.deg) + + los_integral_z_0 = table_profile.slant_depth_from_height( + table_profile.table["height"].to("km")[5:15] + ) + + assert np.allclose( + los_integral_z_0, + table_profile.table["column_density"].to("g cm-2")[5:15], + rtol=1e-3, + ) + + los_integral_z_20 = table_profile.slant_depth_from_height( + table_profile.table["height"].to("km")[5:15], + zenith_angle=20 * u.deg, + ) + + assert np.allclose( + los_integral_z_20, + table_profile.table["column_density"].to("g cm-2")[5:15] + / np.cos(np.deg2rad(20)), + rtol=1e-3, + ) + + +def test_height_overburden_roundtrip(table_profile, layer5_profile): + """ + Test that successive application of height to overburden + and overburden to height functions return original values + """ + + layer_5_heights = u.Quantity([5, 15, 30, 70, 110] * u.km) + + for height in layer_5_heights: + circle_height_5_layer = layer5_profile.height_from_overburden( + layer5_profile.integral(height) + ) + + assert np.allclose(circle_height_5_layer, height, rtol=0.005) + + # Exponential atmosphere + density_model = atmo.ExponentialAtmosphereDensityProfile( + scale_height=10 * u.km, scale_density=0.00125 * atmo.DENSITY_UNIT + ) + + circle_height_exponential = density_model.height_from_overburden( + density_model.integral(47 * u.km) + ) + + assert np.allclose(circle_height_exponential, 47 * u.km, rtol=0.0001) + + circle_height_table = table_profile.height_from_overburden( + table_profile.integral(47 * u.km) + ) + + assert np.allclose(circle_height_table, 47 * u.km, rtol=0.0001) + + +def test_height_slant_depth_roundtrip(table_profile, layer5_profile): + """ + Test that successive application of height to overburden + and overburden to height functions return original values + """ + + layer_5_heights = u.Quantity([5, 15, 30, 70, 110] * u.km) + + for height in layer_5_heights: + circle_height_5_layer = layer5_profile.height_from_slant_depth( + layer5_profile.slant_depth_from_height(height) + ) + + assert np.allclose(circle_height_5_layer, height, rtol=0.005) + + for height in layer_5_heights: + circle_height_5_layer_z_20 = layer5_profile.height_from_slant_depth( + layer5_profile.slant_depth_from_height(height, zenith_angle=20 * u.deg), + zenith_angle=20 * u.deg, + ) + + assert np.allclose(circle_height_5_layer_z_20, height, rtol=0.005) + + # Exponential atmosphere + density_model = atmo.ExponentialAtmosphereDensityProfile( + scale_height=10 * u.km, scale_density=0.00125 * atmo.DENSITY_UNIT + ) + + circle_height_exponential = density_model.height_from_slant_depth( + density_model.slant_depth_from_height(47 * u.km) + ) + + assert np.allclose(circle_height_exponential, 47 * u.km, rtol=0.0001) + + circle_height_exponential_z_20 = density_model.height_from_slant_depth( + density_model.slant_depth_from_height(47 * u.km, zenith_angle=20 * u.deg), + zenith_angle=20 * u.deg, + ) + + assert np.allclose(circle_height_exponential_z_20, 47 * u.km, rtol=0.0001) + + circle_height_table = table_profile.height_from_slant_depth( + table_profile.slant_depth_from_height(47 * u.km) + ) + + assert np.allclose(circle_height_table, 47 * u.km, rtol=0.0001) + + circle_height_table_z_20 = table_profile.height_from_slant_depth( + table_profile.slant_depth_from_height(47 * u.km, zenith_angle=20 * u.deg), + zenith_angle=20 * u.deg, + ) + + assert np.allclose(circle_height_table_z_20, 47 * u.km, rtol=0.0001) + + +def test_out_of_range_table(table_profile): + with pytest.warns(RuntimeWarning, match="divide by zero"): + assert np.isposinf(table_profile.height_from_overburden(0 * atmo.GRAMMAGE_UNIT)) + + assert np.isnan(table_profile.height_from_overburden(2000 * atmo.GRAMMAGE_UNIT)) + + assert table_profile(150 * u.km).value == 0.0 + assert np.isnan(table_profile(-0.1 * u.km)) + + assert table_profile.integral(150 * u.km).value == 0.0 + assert np.isnan(table_profile.integral(-0.1 * u.km)) + + +def test_out_of_range_exponential(): + density_model = atmo.ExponentialAtmosphereDensityProfile( + scale_height=10 * u.km, scale_density=0.00125 * atmo.DENSITY_UNIT + ) + + with pytest.warns(RuntimeWarning, match="divide by zero"): + assert np.isposinf(density_model.height_from_overburden(0 * atmo.GRAMMAGE_UNIT)) + + assert np.isnan(density_model.height_from_overburden(2000 * atmo.GRAMMAGE_UNIT)) + + assert np.isnan(density_model(-0.1 * u.km)) + + assert np.isnan(density_model.integral(-0.1 * u.km)) + + +def test_out_of_range_five_layer(layer5_profile): + # Five-layer atmosphere + + assert np.isposinf(layer5_profile.height_from_overburden(0 * atmo.GRAMMAGE_UNIT)) + + assert np.isnan(layer5_profile.height_from_overburden(2000 * atmo.GRAMMAGE_UNIT)) + + assert np.allclose(layer5_profile(150 * u.km).value, 0.0, atol=1e-9) + assert np.isnan(layer5_profile(-0.1 * u.km)) + + assert np.allclose(layer5_profile.integral(150 * u.km).value, 0.0, atol=1e-9) + assert np.isnan(layer5_profile.integral(-0.1 * u.km))