From b6d745abb284f860487784117b681a6e6e98e6b2 Mon Sep 17 00:00:00 2001 From: Annette Stellema <40450353+stellema@users.noreply.github.com> Date: Tue, 21 Jan 2025 15:40:15 +1100 Subject: [PATCH] Update eva.py Add empirical return level/period functions, add GEV parameter spatial plot, update fit_gev for fixed params, check_gev_relative_fit and add option to drop the maximum value before fitting GEV parameters --- unseen/eva.py | 235 +++++++++++++++++++++++++++++++++++---- unseen/tests/test_eva.py | 7 +- 2 files changed, 219 insertions(+), 23 deletions(-) diff --git a/unseen/eva.py b/unseen/eva.py index 940f9c0..5b65b8d 100644 --- a/unseen/eva.py +++ b/unseen/eva.py @@ -1,6 +1,8 @@ """Extreme value analysis functions.""" import argparse +from cartopy.crs import PlateCarree +from cartopy.mpl.gridliner import LatitudeFormatter, LongitudeFormatter from lmoments3 import distr import matplotlib.pyplot as plt from matplotlib import colormaps @@ -8,7 +10,7 @@ from matplotlib.ticker import AutoMinorLocator import numpy as np from scipy.optimize import minimize -from scipy.stats import genextreme, ks_1samp, cramervonmises +from scipy.stats import genextreme, ecdf, ks_1samp, cramervonmises from scipy.stats.distributions import chi2 import warnings from xarray import apply_ufunc, DataArray @@ -64,11 +66,14 @@ def fit_gev( fitstart="LMM", loc1=0, scale1=0, - retry_fit=True, + fshape=None, + floc=None, + fscale=None, + retry_fit=False, assert_good_fit=False, pick_best_model=False, alpha=0.05, - method="Nelder-Mead", + optimizer="Nelder-Mead", goodness_of_fit_kwargs=dict(test="ks"), ): """Estimate stationary or nonstationary GEV distribution parameters. @@ -100,7 +105,7 @@ def fit_gev( Mutually exclusive with `stationary` and/or `assert_good_fit`. alpha : float, default 0.05 Fit test p-value threshold for stationary fit (relative/goodness of fit) - method : {'Nelder-Mead', 'L-BFGS-B', 'TNC', 'SLSQP', 'Powell', + optimizer : {'Nelder-Mead', 'L-BFGS-B', 'TNC', 'SLSQP', 'Powell', 'trust-constr', 'COBYLA'}, default 'Nelder-Mead' Optimization method for nonstationary fit goodness_of_fit_kwargs : dict, optional @@ -145,11 +150,14 @@ def _fit_1d( fitstart, loc1, scale1, + fshape, + floc, + fscale, retry_fit, assert_good_fit, pick_best_model, alpha, - method, + optimizer, goodness_of_fit_kwargs, ): """Estimate distribution parameters.""" @@ -210,23 +218,38 @@ def _fit_1d( if not stationary: # Temporarily reverse shape sign (scipy uses different sign convention) dparams_ns_i = [-dparams_i[0], dparams_i[1], loc1, dparams_i[2], scale1] - + dof = [3, 5] + for fixed in [fshape, floc, fscale]: + if fixed is not None: + dof[0] -= 1 # Optimisation bounds (scale parameter must be non-negative) + bounds = [(None, None)] * 5 - bounds[3] = (0, None) # Positive scale parameter + bounds[0] = (fshape, fshape) + bounds[1] = (floc, floc) + + if fscale is None: + # Allow positive scale parameter (not fixed) + bounds[3] = (0, None) + else: + bounds[3] = (fscale, fscale) + + # Trend parameters if loc1 is None: dparams_ns_i[2] = 0 bounds[2] = (0, 0) # Only allow trend in scale + dof[1] -= 1 if scale1 is None: dparams_ns_i[4] = 0 bounds[4] = (0, 0) # Only allow trend in location + dof[1] -= 1 # Minimise the negative log-likelihood function to get optimal dparams res = minimize( _gev_nllf, dparams_ns_i, args=(data, covariate), - method=method, + method=optimizer, bounds=bounds, ) dparams_ns = np.array([i for i in res.x], dtype="float64") @@ -236,7 +259,13 @@ def _fit_1d( # Stationary and nonstationary model relative goodness of fit if pick_best_model: dparams = get_best_GEV_model_1d( - data, dparams, dparams_ns, covariate, alpha, test=pick_best_model + data, + dparams, + dparams_ns, + covariate, + alpha, + test=pick_best_model, + dof=dof, ) else: dparams = dparams_ns @@ -554,7 +583,7 @@ def _fit_test_genextreme(data, dparams, **kwargs): return pvalue -def check_gev_relative_fit(data, L1, L2, test, alpha=0.05): +def check_gev_relative_fit(data, L1, L2, test, alpha=0.05, dof=[3, 5]): """Test relative fit of stationary and nonstationary GEV distribution. Parameters @@ -579,8 +608,7 @@ def check_gev_relative_fit(data, L1, L2, test, alpha=0.05): Hydrology, 547, 557-574. https://doi.org/10.1016/j.jhydrol.2017.02.005 """ - dof = [3, 5] # Degrees of freedom of each model - + # Degrees of freedom of each model if test.casefold() == "lrt": # Likelihood ratio test statistic LR = -2 * (L2 - L1) @@ -600,7 +628,9 @@ def check_gev_relative_fit(data, L1, L2, test, alpha=0.05): return result -def get_best_GEV_model_1d(data, dparams, dparams_ns, covariate, alpha, test): +def get_best_GEV_model_1d( + data, dparams, dparams_ns, covariate, alpha, test, dof=[3, 5] +): """Get the best GEV model based on a relative fit test.""" # Calculate the stationary GEV parameters shape, loc, scale = dparams @@ -609,7 +639,7 @@ def get_best_GEV_model_1d(data, dparams, dparams_ns, covariate, alpha, test): L1 = _gev_nllf([-shape, loc, scale], data) L2 = _gev_nllf([-dparams_ns[0], *dparams_ns[1:]], data, covariate) - result = check_gev_relative_fit(data, L1, L2, test=test, alpha=alpha) + result = check_gev_relative_fit(data, L1, L2, test=test, alpha=alpha, dof=dof) if not result: # Return the stationary parameters with no trend dparams = np.array([shape, loc, 0, scale, 0], dtype="float64") @@ -742,6 +772,79 @@ def get_return_level(return_period, dparams=None, covariate=None, **kwargs): return return_level +def get_empirical_return_period(da, event, core_dim="time"): + """Calculate the empirical return period of an event. + + Parameters + ---------- + da : xarray.DataArray + Input data + event : float + Return level of the event (e.g., 100 mm of rainfall) + core_dim : str, default "time" + The core dimension in which to estimate the return period + + Returns + ------- + ri : xarray.DataArray + The event recurrence interval (e.g., 100 year return period) + """ + + def _empirical_return_period(da, event): + """Empirical return period of an event (1D).""" + res = ecdf(da) + return 1 / res.sf.evaluate(event) + + ri = apply_ufunc( + _empirical_return_period, + da, + event, + input_core_dims=[[core_dim], []], + output_core_dims=[[]], + vectorize=True, + dask="parallelized", + output_dtypes=["float64"], + ) + return ri + + +def get_empirical_return_level(da, return_period, core_dim="time"): + """Calculate the empirical return period of an event. + + Parameters + ---------- + da : xarray.DataArray + Input data + return_period : float + Return period (e.g., 100 year return period) + core_dim : str, default "time" + The core dimension in which to estimate the return period + + Returns + ------- + return_level : xarray.DataArray + The event return level (e.g., 100 mm of rainfall) + """ + + def _empirical_return_level(da, period): + """Empirical return level of an event (1D).""" + sf = ecdf(da).sf + probability = 1 - (1 / period) + return np.interp(probability, sf.probabilities, sf.quantiles) + + return_level = apply_ufunc( + _empirical_return_level, + da, + return_period, + input_core_dims=[[core_dim], []], + output_core_dims=[[]], + vectorize=True, + dask="parallelized", + output_dtypes=["float64"], + ) + return return_level + + def aep_to_ari(aep): """Convert from aep (%) to ari (years) @@ -749,7 +852,7 @@ def aep_to_ari(aep): Stolen from https://github.com/climate-innovation-hub/frequency-analysis/blob/master/eva.py """ - assert aep < 100, "aep to be expressed as a percentage (must be < 100)" + assert np.all(aep < 100), "AEP to be expressed as a percentage (must be < 100)" aep = aep / 100 return 1 / (-np.log(1 - aep)) @@ -805,7 +908,7 @@ def gev_confidence_interval( ci_bounds : xarray.DataArray Confidence intervals with lower and upper bounds along dim 'quantile' """ - # todo: add max_shape_ratio + # todo: add max_shape_ratio? # Replace core dim with the one from the fit_kwargs if it exists core_dim = fit_kwargs.pop("core_dim", core_dim) @@ -814,8 +917,8 @@ def gev_confidence_interval( dparams = fit_gev(data, core_dim=core_dim, **fit_kwargs) shape, loc, scale = unpack_gev_params(dparams) - # Generate random indices for resampling if bootstrap_method == "parametric": + # Generate bootstrapped data using the GEV distribution boot_data = apply_ufunc( genextreme.rvs, shape, @@ -830,9 +933,12 @@ def gev_confidence_interval( boot_data = boot_data.transpose("k", core_dim, ...) elif bootstrap_method == "non-parametric": - # todo: replace with rng.choice - resample_indices = rng.integers( - 0, data[core_dim].size, (n_resamples, data[core_dim].size) + # Resample data with replacements + resample_indices = np.array( + [ + rng.choice(data[core_dim].size, size=data[core_dim].size, replace=True) + for i in range(n_resamples) + ] ) indexer = DataArray(resample_indices, dims=("k", core_dim)) boot_data = data.isel({core_dim: indexer}) @@ -1304,6 +1410,80 @@ def plot_stacked_histogram( return ax, bins +def spatial_plot_gev_parameters( + dparams, + dataset_name=None, + outfile=None, +): + """Plot spatial maps of GEV parameters (shape, loc, scale). + + Parameters + ---------- + dparams : xarray.DataArray + Stationary or nonstationary GEV parameters + dataset_name : str, optional + Name of the dataset + outfile : str, optional + Output file name + """ + # Rename shape parameter + params = ["shape", *list(dparams.dparams.values[1:])] + dparams["dparams_new"] = ("dparams", params) + dparams = ( + dparams.swap_dims({"dparams": "dparams_new"}) + .drop("dparams") + .rename(dparams_new="dparams") + ) + params = dparams.dparams.values + stationary = True if params.size == 3 else False + + # Adjust subplots for stationary or nonstationary parameters + nrows = 1 if stationary else 2 + figsize = (14, 6) if stationary else (12, 6) + + fig, axes = plt.subplots( + nrows, 3, figsize=figsize, subplot_kw={"projection": PlateCarree()} + ) + axes = axes.flat + + # Plot each parameter + for i, ax in enumerate(axes[: params.size]): + dparams.sel(dparams=params[i]).plot( + ax=ax, + transform=PlateCarree(), + robust=True, + cbar_kwargs=dict(label=params[i], fraction=0.038, pad=0.04), + ) + # Add coastlines and lat/lon labels + ax.coastlines() + ax.set_title(f"{params[i]}") + ax.set_xlabel(None) + ax.set_ylabel(None) + ax.xaxis.set_major_formatter(LongitudeFormatter()) + ax.yaxis.set_major_formatter(LatitudeFormatter()) + ax.xaxis.set_minor_locator(AutoMinorLocator()) + ax.yaxis.set_minor_locator(AutoMinorLocator()) + + # Fix xarray facet plot with cartopy bug (v2024.6.0) + subplotspec = ax.get_subplotspec() + ax.xaxis.set_visible(True) + if subplotspec.is_first_col(): + ax.yaxis.set_visible(True) + + if dataset_name: + fig.suptitle(f"{dataset_name} GEV parameters", y=0.8 if stationary else 0.99) + + if not stationary: + # Hide the empty subplot + axes[-1].set_visible(False) + + plt.tight_layout() + if outfile: + plt.savefig(outfile, bbox_inches="tight", dpi=200) + else: + plt.show() + + def _parse_command_line(): """Parse the command line for input arguments""" @@ -1348,6 +1528,7 @@ def _parse_command_line(): ), help="Initial guess method (or estimate) of the GEV parameters", ) + parser.add_argument( "--retry_fit", action="store_true", @@ -1389,6 +1570,12 @@ def _parse_command_line(): action=general_utils.store_dict, help="Keyword arguments for opening min_lead file", ) + parser.add_argument( + "--drop_max", + action="store_true", + default=False, + help="Drop the maximum value before fitting", + ) parser.add_argument( "--ensemble_dim", type=str, @@ -1454,8 +1641,16 @@ def _main(): # Stack dimensions along new "sample" dimension if all([dim in ds[args.var].dims for dim in args.stack_dims]): ds = ds.stack(**{"sample": args.stack_dims}, create_index=False) + ds = ds.chunk(dict(sample=-1)) # fixes CAFE large chunk error args.core_dim = "sample" + # Drop the maximum value before fitting + if args.drop_max: + # Drop the maximum value along each non-core dimension + ds[args.var] = ds[args.var].where( + ds[args.var].load() != ds[args.var].max(dim=args.core_dim).load() + ) + if args.nonstationary: covariate = _format_covariate(ds[args.var], ds[args.covariate], args.core_dim) else: diff --git a/unseen/tests/test_eva.py b/unseen/tests/test_eva.py index e436f3c..03a08ef 100644 --- a/unseen/tests/test_eva.py +++ b/unseen/tests/test_eva.py @@ -149,9 +149,10 @@ def test_fit_gev_1d_assert_good_fit(example_da_gev): @pytest.mark.parametrize("example_da_gev", ["xarray"], indirect=True) +@pytest.mark.parametrize("test", ["lrt", "aic", "bic"]) @pytest.mark.parametrize("trend", [False, True]) -def test_fit_ns_gev_1d_pick_best_model_bic(example_da_gev, trend): - """Run non-stationary GEV fit & check 'BIC' test returns nonstationary params.""" +def test_fit_ns_gev_1d_pick_best_model(example_da_gev, test, trend): + """Run non-stationary GEV fit & check 'lrt' test returns nonstationary params.""" data, _ = example_da_gev if trend: # Add a large positive linear trend @@ -164,7 +165,7 @@ def test_fit_ns_gev_1d_pick_best_model_bic(example_da_gev, trend): stationary=False, core_dim="time", covariate=covariate, - pick_best_model="bic", + pick_best_model=test, ) if trend: assert np.all(dparams[2] > 0) # Positive trend in location