Skip to content

Commit

Permalink
Update eva.py
Browse files Browse the repository at this point in the history
Add empirical return level/period functions, add GEV parameter spatial plot, update fit_gev for fixed params, check_gev_relative_fit and add option to drop the maximum value before fitting GEV parameters
  • Loading branch information
stellema committed Jan 21, 2025
1 parent 5289730 commit b6d745a
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 23 deletions.
235 changes: 215 additions & 20 deletions unseen/eva.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Extreme value analysis functions."""

import argparse
from cartopy.crs import PlateCarree
from cartopy.mpl.gridliner import LatitudeFormatter, LongitudeFormatter
from lmoments3 import distr
import matplotlib.pyplot as plt
from matplotlib import colormaps
from matplotlib.dates import date2num
from matplotlib.ticker import AutoMinorLocator
import numpy as np
from scipy.optimize import minimize
from scipy.stats import genextreme, ks_1samp, cramervonmises
from scipy.stats import genextreme, ecdf, ks_1samp, cramervonmises
from scipy.stats.distributions import chi2
import warnings
from xarray import apply_ufunc, DataArray
Expand Down Expand Up @@ -64,11 +66,14 @@ def fit_gev(
fitstart="LMM",
loc1=0,
scale1=0,
retry_fit=True,
fshape=None,
floc=None,
fscale=None,
retry_fit=False,
assert_good_fit=False,
pick_best_model=False,
alpha=0.05,
method="Nelder-Mead",
optimizer="Nelder-Mead",
goodness_of_fit_kwargs=dict(test="ks"),
):
"""Estimate stationary or nonstationary GEV distribution parameters.
Expand Down Expand Up @@ -100,7 +105,7 @@ def fit_gev(
Mutually exclusive with `stationary` and/or `assert_good_fit`.
alpha : float, default 0.05
Fit test p-value threshold for stationary fit (relative/goodness of fit)
method : {'Nelder-Mead', 'L-BFGS-B', 'TNC', 'SLSQP', 'Powell',
optimizer : {'Nelder-Mead', 'L-BFGS-B', 'TNC', 'SLSQP', 'Powell',
'trust-constr', 'COBYLA'}, default 'Nelder-Mead'
Optimization method for nonstationary fit
goodness_of_fit_kwargs : dict, optional
Expand Down Expand Up @@ -145,11 +150,14 @@ def _fit_1d(
fitstart,
loc1,
scale1,
fshape,
floc,
fscale,
retry_fit,
assert_good_fit,
pick_best_model,
alpha,
method,
optimizer,
goodness_of_fit_kwargs,
):
"""Estimate distribution parameters."""
Expand Down Expand Up @@ -210,23 +218,38 @@ def _fit_1d(
if not stationary:
# Temporarily reverse shape sign (scipy uses different sign convention)
dparams_ns_i = [-dparams_i[0], dparams_i[1], loc1, dparams_i[2], scale1]

dof = [3, 5]
for fixed in [fshape, floc, fscale]:
if fixed is not None:
dof[0] -= 1
# Optimisation bounds (scale parameter must be non-negative)

bounds = [(None, None)] * 5
bounds[3] = (0, None) # Positive scale parameter
bounds[0] = (fshape, fshape)
bounds[1] = (floc, floc)

if fscale is None:
# Allow positive scale parameter (not fixed)
bounds[3] = (0, None)
else:
bounds[3] = (fscale, fscale)

# Trend parameters
if loc1 is None:
dparams_ns_i[2] = 0
bounds[2] = (0, 0) # Only allow trend in scale
dof[1] -= 1
if scale1 is None:
dparams_ns_i[4] = 0
bounds[4] = (0, 0) # Only allow trend in location
dof[1] -= 1

# Minimise the negative log-likelihood function to get optimal dparams
res = minimize(
_gev_nllf,
dparams_ns_i,
args=(data, covariate),
method=method,
method=optimizer,
bounds=bounds,
)
dparams_ns = np.array([i for i in res.x], dtype="float64")
Expand All @@ -236,7 +259,13 @@ def _fit_1d(
# Stationary and nonstationary model relative goodness of fit
if pick_best_model:
dparams = get_best_GEV_model_1d(
data, dparams, dparams_ns, covariate, alpha, test=pick_best_model
data,
dparams,
dparams_ns,
covariate,
alpha,
test=pick_best_model,
dof=dof,
)
else:
dparams = dparams_ns
Expand Down Expand Up @@ -554,7 +583,7 @@ def _fit_test_genextreme(data, dparams, **kwargs):
return pvalue


def check_gev_relative_fit(data, L1, L2, test, alpha=0.05):
def check_gev_relative_fit(data, L1, L2, test, alpha=0.05, dof=[3, 5]):
"""Test relative fit of stationary and nonstationary GEV distribution.
Parameters
Expand All @@ -579,8 +608,7 @@ def check_gev_relative_fit(data, L1, L2, test, alpha=0.05):
Hydrology, 547, 557-574. https://doi.org/10.1016/j.jhydrol.2017.02.005
"""

dof = [3, 5] # Degrees of freedom of each model

# Degrees of freedom of each model
if test.casefold() == "lrt":
# Likelihood ratio test statistic
LR = -2 * (L2 - L1)
Expand All @@ -600,7 +628,9 @@ def check_gev_relative_fit(data, L1, L2, test, alpha=0.05):
return result


def get_best_GEV_model_1d(data, dparams, dparams_ns, covariate, alpha, test):
def get_best_GEV_model_1d(
data, dparams, dparams_ns, covariate, alpha, test, dof=[3, 5]
):
"""Get the best GEV model based on a relative fit test."""
# Calculate the stationary GEV parameters
shape, loc, scale = dparams
Expand All @@ -609,7 +639,7 @@ def get_best_GEV_model_1d(data, dparams, dparams_ns, covariate, alpha, test):
L1 = _gev_nllf([-shape, loc, scale], data)
L2 = _gev_nllf([-dparams_ns[0], *dparams_ns[1:]], data, covariate)

result = check_gev_relative_fit(data, L1, L2, test=test, alpha=alpha)
result = check_gev_relative_fit(data, L1, L2, test=test, alpha=alpha, dof=dof)
if not result:
# Return the stationary parameters with no trend
dparams = np.array([shape, loc, 0, scale, 0], dtype="float64")
Expand Down Expand Up @@ -742,14 +772,87 @@ def get_return_level(return_period, dparams=None, covariate=None, **kwargs):
return return_level


def get_empirical_return_period(da, event, core_dim="time"):
"""Calculate the empirical return period of an event.
Parameters
----------
da : xarray.DataArray
Input data
event : float
Return level of the event (e.g., 100 mm of rainfall)
core_dim : str, default "time"
The core dimension in which to estimate the return period
Returns
-------
ri : xarray.DataArray
The event recurrence interval (e.g., 100 year return period)
"""

def _empirical_return_period(da, event):
"""Empirical return period of an event (1D)."""
res = ecdf(da)
return 1 / res.sf.evaluate(event)

ri = apply_ufunc(
_empirical_return_period,
da,
event,
input_core_dims=[[core_dim], []],
output_core_dims=[[]],
vectorize=True,
dask="parallelized",
output_dtypes=["float64"],
)
return ri


def get_empirical_return_level(da, return_period, core_dim="time"):
"""Calculate the empirical return period of an event.
Parameters
----------
da : xarray.DataArray
Input data
return_period : float
Return period (e.g., 100 year return period)
core_dim : str, default "time"
The core dimension in which to estimate the return period
Returns
-------
return_level : xarray.DataArray
The event return level (e.g., 100 mm of rainfall)
"""

def _empirical_return_level(da, period):
"""Empirical return level of an event (1D)."""
sf = ecdf(da).sf
probability = 1 - (1 / period)
return np.interp(probability, sf.probabilities, sf.quantiles)

return_level = apply_ufunc(
_empirical_return_level,
da,
return_period,
input_core_dims=[[core_dim], []],
output_core_dims=[[]],
vectorize=True,
dask="parallelized",
output_dtypes=["float64"],
)
return return_level


def aep_to_ari(aep):
"""Convert from aep (%) to ari (years)
Details: http://www.bom.gov.au/water/designRainfalls/ifd-arr87/glossary.shtml
Stolen from https://github.com/climate-innovation-hub/frequency-analysis/blob/master/eva.py
"""

assert aep < 100, "aep to be expressed as a percentage (must be < 100)"
assert np.all(aep < 100), "AEP to be expressed as a percentage (must be < 100)"
aep = aep / 100

return 1 / (-np.log(1 - aep))
Expand Down Expand Up @@ -805,7 +908,7 @@ def gev_confidence_interval(
ci_bounds : xarray.DataArray
Confidence intervals with lower and upper bounds along dim 'quantile'
"""
# todo: add max_shape_ratio
# todo: add max_shape_ratio?
# Replace core dim with the one from the fit_kwargs if it exists
core_dim = fit_kwargs.pop("core_dim", core_dim)

Expand All @@ -814,8 +917,8 @@ def gev_confidence_interval(
dparams = fit_gev(data, core_dim=core_dim, **fit_kwargs)
shape, loc, scale = unpack_gev_params(dparams)

# Generate random indices for resampling
if bootstrap_method == "parametric":
# Generate bootstrapped data using the GEV distribution
boot_data = apply_ufunc(
genextreme.rvs,
shape,
Expand All @@ -830,9 +933,12 @@ def gev_confidence_interval(
boot_data = boot_data.transpose("k", core_dim, ...)

elif bootstrap_method == "non-parametric":
# todo: replace with rng.choice
resample_indices = rng.integers(
0, data[core_dim].size, (n_resamples, data[core_dim].size)
# Resample data with replacements
resample_indices = np.array(
[
rng.choice(data[core_dim].size, size=data[core_dim].size, replace=True)
for i in range(n_resamples)
]
)
indexer = DataArray(resample_indices, dims=("k", core_dim))
boot_data = data.isel({core_dim: indexer})
Expand Down Expand Up @@ -1304,6 +1410,80 @@ def plot_stacked_histogram(
return ax, bins


def spatial_plot_gev_parameters(
dparams,
dataset_name=None,
outfile=None,
):
"""Plot spatial maps of GEV parameters (shape, loc, scale).
Parameters
----------
dparams : xarray.DataArray
Stationary or nonstationary GEV parameters
dataset_name : str, optional
Name of the dataset
outfile : str, optional
Output file name
"""
# Rename shape parameter
params = ["shape", *list(dparams.dparams.values[1:])]
dparams["dparams_new"] = ("dparams", params)
dparams = (
dparams.swap_dims({"dparams": "dparams_new"})
.drop("dparams")
.rename(dparams_new="dparams")
)
params = dparams.dparams.values
stationary = True if params.size == 3 else False

# Adjust subplots for stationary or nonstationary parameters
nrows = 1 if stationary else 2
figsize = (14, 6) if stationary else (12, 6)

fig, axes = plt.subplots(
nrows, 3, figsize=figsize, subplot_kw={"projection": PlateCarree()}
)
axes = axes.flat

# Plot each parameter
for i, ax in enumerate(axes[: params.size]):
dparams.sel(dparams=params[i]).plot(
ax=ax,
transform=PlateCarree(),
robust=True,
cbar_kwargs=dict(label=params[i], fraction=0.038, pad=0.04),
)
# Add coastlines and lat/lon labels
ax.coastlines()
ax.set_title(f"{params[i]}")
ax.set_xlabel(None)
ax.set_ylabel(None)
ax.xaxis.set_major_formatter(LongitudeFormatter())
ax.yaxis.set_major_formatter(LatitudeFormatter())
ax.xaxis.set_minor_locator(AutoMinorLocator())
ax.yaxis.set_minor_locator(AutoMinorLocator())

# Fix xarray facet plot with cartopy bug (v2024.6.0)
subplotspec = ax.get_subplotspec()
ax.xaxis.set_visible(True)
if subplotspec.is_first_col():
ax.yaxis.set_visible(True)

if dataset_name:
fig.suptitle(f"{dataset_name} GEV parameters", y=0.8 if stationary else 0.99)

if not stationary:
# Hide the empty subplot
axes[-1].set_visible(False)

plt.tight_layout()
if outfile:
plt.savefig(outfile, bbox_inches="tight", dpi=200)
else:
plt.show()


def _parse_command_line():
"""Parse the command line for input arguments"""

Expand Down Expand Up @@ -1348,6 +1528,7 @@ def _parse_command_line():
),
help="Initial guess method (or estimate) of the GEV parameters",
)

parser.add_argument(
"--retry_fit",
action="store_true",
Expand Down Expand Up @@ -1389,6 +1570,12 @@ def _parse_command_line():
action=general_utils.store_dict,
help="Keyword arguments for opening min_lead file",
)
parser.add_argument(
"--drop_max",
action="store_true",
default=False,
help="Drop the maximum value before fitting",
)
parser.add_argument(
"--ensemble_dim",
type=str,
Expand Down Expand Up @@ -1454,8 +1641,16 @@ def _main():
# Stack dimensions along new "sample" dimension
if all([dim in ds[args.var].dims for dim in args.stack_dims]):
ds = ds.stack(**{"sample": args.stack_dims}, create_index=False)
ds = ds.chunk(dict(sample=-1)) # fixes CAFE large chunk error
args.core_dim = "sample"

# Drop the maximum value before fitting
if args.drop_max:
# Drop the maximum value along each non-core dimension
ds[args.var] = ds[args.var].where(
ds[args.var].load() != ds[args.var].max(dim=args.core_dim).load()
)

if args.nonstationary:
covariate = _format_covariate(ds[args.var], ds[args.covariate], args.core_dim)
else:
Expand Down
Loading

0 comments on commit b6d745a

Please sign in to comment.