From 402b5303effc35148c0a9027954481d138906401 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Fri, 15 Nov 2024 16:26:34 -0700 Subject: [PATCH 1/5] level, for level interpolation changed from array to float. This enables use of Deriver on 0D xarray datasets (e.g. those returned by ``.sel(south_north=..., west_east=..., time=...)``). --- sup3r/preprocessing/derivers/base.py | 10 +++++----- sup3r/utilities/era_downloader.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sup3r/preprocessing/derivers/base.py b/sup3r/preprocessing/derivers/base.py index 03a6867d0..9a7670528 100644 --- a/sup3r/preprocessing/derivers/base.py +++ b/sup3r/preprocessing/derivers/base.py @@ -137,8 +137,10 @@ def check_registry( if not missing: logger.debug(msg, inputs) return self._run_compute(feature, method) - msg = ('Some of the method inputs reference %s itself. We will ' - 'try height interpolation instead.') + msg = ( + 'Some of the method inputs reference %s itself. We will ' + 'try height interpolation instead.' + ) if not can_derive: logger.debug(msg, feature) return None @@ -328,9 +330,7 @@ def do_level_interpolation( attrs = self.data[feat].attrs level = ( - [fstruct.height] - if fstruct.height is not None - else [fstruct.pressure] + fstruct.height if fstruct.height is not None else fstruct.pressure ) if ml_var is not None and sl_var is None: diff --git a/sup3r/utilities/era_downloader.py b/sup3r/utilities/era_downloader.py index 113661f34..1925b4dcf 100644 --- a/sup3r/utilities/era_downloader.py +++ b/sup3r/utilities/era_downloader.py @@ -198,7 +198,7 @@ def prep_var_lists(self, variables): msg = ( 'Both surface and pressure level variables were requested ' 'without requesting "orog" and "zg". Adding these to the ' - 'download' + 'download.' ) logger.info(msg) self.sfc_file_variables.append('geopotential') From dd69f429cbc9e8a26eb9224b3d07f77358996560 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Tue, 19 Nov 2024 10:12:27 -0700 Subject: [PATCH 2/5] some cleanup in bias_transforms.py. Enabled use of dask for these methods. --- sup3r/bias/bias_transforms.py | 522 ++++++++++--------- sup3r/bias/utilities.py | 34 +- sup3r/preprocessing/cachers/base.py | 46 +- sup3r/preprocessing/data_handlers/factory.py | 3 +- sup3r/preprocessing/loaders/nc.py | 5 +- sup3r/utilities/era_downloader.py | 7 +- tests/bias/test_presrat_bias_correction.py | 3 +- tests/bias/test_qdm_bias_correction.py | 13 +- 8 files changed, 355 insertions(+), 278 deletions(-) diff --git a/sup3r/bias/bias_transforms.py b/sup3r/bias/bias_transforms.py index c6da18011..07b39c4fe 100644 --- a/sup3r/bias/bias_transforms.py +++ b/sup3r/bias/bias_transforms.py @@ -14,6 +14,7 @@ import dask.array as da import numpy as np +import xarray as xr from rex.utilities.bc_utils import QuantileDeltaMapping from scipy.ndimage import gaussian_filter @@ -71,7 +72,7 @@ def _get_factors(target, shape, var_names, bias_fp, threshold=0.1): msg = f'Missing {" and ".join(missing)} in resource: {bias_fp}.' assert missing == [], msg # pylint: disable=E1136 - out = {k: res[var_names[k].lower()][...] for k in var_names} + out = {k: res[var_names[k].lower()].data for k in var_names} out['cfg'] = res.global_attrs return out @@ -472,12 +473,149 @@ def monthly_local_linear_bc( return out +def _apply_qdm( + subset, + base_params, + bias_params, + bias_fut_params, + current_time_idx, + dist='empirical', + sampling='linear', + log_base=10, + relative=True, + no_trend=False, + delta_denom_min=None, + delta_denom_zero=None, + delta_range=None, + max_workers=1, +): + """Run QuantileDeltaMapping routine for the given time index. Used in local + qdm correction and presrat. + + Parameters + ---------- + subset : np.ndarray | da.core.array + Subset of Sup3r input data to be bias corrected, assumed to be 3D with + shape (spatial, spatial, temporal) for a single feature. + base_params : np.ndarray + 4D array of **observed historical** distribution parameters created + from a multi-year set of data where the shape is + (space, space, time, N). This can be the + output of a parametric distribution fit like + ``scipy.stats.weibull_min.fit()`` where N is the number of parameters + for that distribution, or this can define the x-values of N points from + an empirical CDF that will be linearly interpolated between. If this is + an empirical CDF, this must include the 0th and 100th percentile values + and have even percentile spacing between values. + bias_params : np.ndarray + Same requirements as params_oh. This input arg is for the **modeled + historical distribution**. + bias_fut_params : np.ndarray | None + Same requirements as params_oh. This input arg is for the **modeled + future distribution**. If this is None, this defaults to params_mh + (no future data, just corrected to modeled historical distribution) + current_time_idx : int + Time index for the current qdm application. + dist : str + Probability distribution name to use to model the data which + determines how the param args are used. This can "empirical" or any + continuous distribution name from ``scipy.stats``. + sampling : str | np.ndarray + If dist="empirical", this is an option for how the quantiles were + sampled to produce the params inputs, e.g., how to sample the + y-axis of the distribution (see sampling functions in + ``rex.utilities.bc_utils``). "linear" will do even spacing, "log" + will concentrate samples near quantile=0, and "invlog" will + concentrate samples near quantile=1. Can also be a 1D array of dist + inputs if being used from reV, but they must all be the same + option. + log_base : int | float | np.ndarray + Log base value if sampling is "log" or "invlog". A higher value + will concentrate more samples at the extreme sides of the + distribution. Can also be a 1D array of dist inputs if being used + from reV, but they must all be the same option. + relative : bool | np.ndarray + Flag to preserve relative rather than absolute changes in + quantiles. relative=False (default) will multiply by the change in + quantiles while relative=True will add. See Equations 4-6 from + Cannon et al., 2015 for more details. Can also be a 1D array of + dist inputs if being used from reV, but they must all be the same + option. + no_trend : bool, default=False + An option to ignore the trend component of the correction, thus + resulting in an ordinary Quantile Mapping, i.e. corrects the bias by + comparing the distributions of the biased dataset with a reference + datasets, without reinforcing the zero rate or applying the k-factor. + See ``params_mf`` of + :class:`rex.utilities.bc_utils.QuantileDeltaMapping`. Note that this + assumes that params_mh is the data distribution representative for the + target data. + delta_denom_min : float | None + Option to specify a minimum value for the denominator term in the + calculation of a relative delta value. This prevents division by a + very small number making delta blow up and resulting in very large + output bias corrected values. See equation 4 of Cannon et al., 2015 + for the delta term. If this is not set, the ``zero_rate_threshold`` + calculated as part of the presrat bias calculation will be used + delta_range : tuple | None + Option to set a (min, max) on the delta term in QDM. This can help + prevent QDM from making non-realistic increases/decreases in + otherwise physical values. See equation 4 of Cannon et al., 2015 for + the delta term. + delta_denom_zero : float | None + Option to specify a value to replace zeros in the denominator term + in the calculation of a relative delta value. This prevents + division by a very small number making delta blow up and resulting + in very large output bias corrected values. See equation 4 of + Cannon et al., 2015 for the delta term. + max_workers : int | None + Max number of workers to use for QDM process pool + """ + + # Naming following the paper: observed historical + oh = base_params[:, :, current_time_idx] + # Modeled historical + mh = bias_params[:, :, current_time_idx] + # Modeled future + mf = bias_fut_params[:, :, current_time_idx] + + # This satisfies the rex's QDM design + mf = None if no_trend else np.reshape(mf, (-1, mf.shape[-1])) + # The distributions at this point, after selected the respective + # time window with `window_idx`, are 3D (space, space, N-params) + # Collapse 3D (space, space, N) into 2D (space**2, N) + QDM = QuantileDeltaMapping( + params_oh=np.reshape(oh, (-1, oh.shape[-1])), + params_mh=np.reshape(mh, (-1, mh.shape[-1])), + params_mf=mf, + dist=dist, + relative=relative, + sampling=sampling, + log_base=log_base, + delta_denom_min=delta_denom_min, + delta_denom_zero=delta_denom_zero, + delta_range=delta_range, + ) + + # input 3D shape (spatial, spatial, temporal) + # QDM expects input arr with shape (time, space) + tmp = np.reshape(subset.data, (-1, subset.shape[-1])).T + # Apply QDM correction + tmp = da.blockwise( + QDM, 'ij', tmp, 'ij', dtype=np.float32, max_workers=max_workers + ) + + # Reorgnize array back from (time, space) + # to (spatial, spatial, temporal) + return da.reshape(tmp.T, subset.shape) + + def local_qdm_bc( - data: np.ndarray, + data: xr.DataArray, lat_lon: np.ndarray, base_dset: str, feature_name: str, - bias_fp, + bias_fp: str, date_range_kwargs: dict, lr_padded_slice=None, threshold=0.1, @@ -499,7 +637,7 @@ def local_qdm_bc( Parameters ---------- - data : Union[np.ndarray, da.core.Array] + data : xr.DataArray Sup3r input data to be bias corrected, assumed to be 3D with shape (spatial, spatial, temporal) for a single feature. lat_lon : np.ndarray @@ -606,9 +744,11 @@ def local_qdm_bc( msg = f'data was expected to be a 3D array but got shape {data.shape}' assert data.ndim == 3, msg time_index = make_time_index_from_kws(date_range_kwargs) - msg = (f'Time should align with data 3rd dimension but got data ' - f'{data.shape} and time_index length ' - f'{time_index.size}: {time_index}') + msg = ( + f'Time should align with data 3rd dimension but got data ' + f'{data.shape} and time_index length ' + f'{time_index.size}: {time_index}' + ) assert data.shape[-1] == time_index.size, msg params = _get_spatial_bc_quantiles( @@ -618,74 +758,58 @@ def local_qdm_bc( bias_fp=bias_fp, threshold=threshold, ) - data = np.asarray(data) - base = np.asarray(params['base']) - bias = np.asarray(params['bias']) - bias_fut = np.asarray(params['bias_fut']) + cfg = params['cfg'] + base_params = params['base'] + bias_params = params['bias'] + bias_fut_params = params['bias_fut'] if lr_padded_slice is not None: spatial_slice = (lr_padded_slice[0], lr_padded_slice[1]) - base = base[spatial_slice] - bias = bias[spatial_slice] - bias_fut = bias_fut[spatial_slice] - - output = np.full_like(data, np.nan) - nearest_window_idx = [ - np.argmin(abs(d - cfg['time_window_center'])) - for d in time_index.day_of_year - ] - for window_idx in set(nearest_window_idx): - # Naming following the paper: observed historical - oh = base[:, :, window_idx] - # Modeled historical - mh = bias[:, :, window_idx] - # Modeled future - mf = bias_fut[:, :, window_idx] - - # This satisfies the rex's QDM design - mf = None if no_trend else mf.reshape(-1, mf.shape[-1]) - # The distributions at this point, after selected the respective - # time window with `window_idx`, are 3D (space, space, N-params) - # Collapse 3D (space, space, N) into 2D (space**2, N) - QDM = QuantileDeltaMapping( - oh.reshape(-1, oh.shape[-1]), - mh.reshape(-1, mh.shape[-1]), - mf, - dist=cfg['dist'], + base_params = base_params[spatial_slice] + bias_params = bias_params[spatial_slice] + bias_fut_params = bias_fut_params[spatial_slice] + + data_unbiased = da.full_like(data, np.nan) + closest_time_idx = abs( + cfg['time_window_center'][:, np.newaxis] + - np.array(time_index.day_of_year) + ) + closest_time_idx = closest_time_idx.argmin(axis=0) + + for nt in set(closest_time_idx): + subset_idx = closest_time_idx == nt + subset = _apply_qdm( + subset=data[:, :, subset_idx], + base_params=base_params, + bias_params=bias_params, + bias_fut_params=bias_fut_params, + current_time_idx=nt, + dist=cfg.get('dist', 'empirical'), + sampling=cfg.get('sampling', 'linear'), + log_base=cfg.get('log_base', 10), relative=relative, - sampling=cfg['sampling'], - log_base=cfg['log_base'], + no_trend=no_trend, delta_denom_min=delta_denom_min, delta_denom_zero=delta_denom_zero, delta_range=delta_range, + max_workers=max_workers, ) - - subset_idx = nearest_window_idx == window_idx - subset = data[:, :, subset_idx] - # input 3D shape (spatial, spatial, temporal) - # QDM expects input arr with shape (time, space) - tmp = subset.reshape(-1, subset.shape[-1]).T - # Apply QDM correction - tmp = QDM(tmp, max_workers=max_workers) - # Reorgnize array back from (time, space) - # to (spatial, spatial, temporal) - tmp = tmp.T.reshape(subset.shape) - # Position output respecting original time axis sequence - output[:, :, subset_idx] = tmp + data_unbiased[:, :, subset_idx] = subset if out_range is not None: - output = np.maximum(output, np.min(out_range)) - output = np.minimum(output, np.max(out_range)) + data_unbiased = np.maximum(data_unbiased, np.min(out_range)) + data_unbiased = np.minimum(data_unbiased, np.max(out_range)) - if np.isnan(output).any(): - msg = ('QDM bias correction resulted in NaN values! If this is a ' - 'relative QDM, you may try setting ``delta_denom_min`` or ' - '``delta_denom_zero``') + if da.isnan(data_unbiased).any(): + msg = ( + 'QDM bias correction resulted in NaN values! If this is a ' + 'relative QDM, you may try setting ``delta_denom_min`` or ' + '``delta_denom_zero``' + ) logger.error(msg) raise RuntimeError(msg) - - return output + return data_unbiased def _get_spatial_bc_presrat( @@ -819,178 +943,24 @@ def _get_spatial_bc_presrat( ) -def _apply_presrat_bc(data, time_index, base_params, bias_params, - bias_fut_params, bias_tau_fut, k_factor, - time_window_center, dist='empirical', sampling='invlog', - log_base=10, relative=True, no_trend=False, - delta_denom_min=None, delta_range=None, out_range=None, - max_workers=1): - """Run PresRat to bias correct data from input parameters and not from bias - correction file on disk. - - Parameters - ---------- - data : np.ndarray - Sup3r input data to be bias corrected, assumed to be 3D with shape - (spatial, spatial, temporal) for a single feature. - time_index : pd.DatetimeIndex - A DatetimeIndex object associated with the input data temporal axis - (assumed 3rd axis e.g. axis=2). - base_params : np.ndarray - 4D array of **observed historical** distribution parameters created - from a multi-year set of data where the shape is - (space, space, time, N). This can be the - output of a parametric distribution fit like - ``scipy.stats.weibull_min.fit()`` where N is the number of parameters - for that distribution, or this can define the x-values of N points from - an empirical CDF that will be linearly interpolated between. If this is - an empirical CDF, this must include the 0th and 100th percentile values - and have even percentile spacing between values. - bias_params : np.ndarray - Same requirements as params_oh. This input arg is for the **modeled - historical distribution**. - bias_fut_params : np.ndarray | None - Same requirements as params_oh. This input arg is for the **modeled - future distribution**. If this is None, this defaults to params_mh - (no future data, just corrected to modeled historical distribution) - bias_tau_fut : np.ndarray - Zero precipitation threshold for future data calculated from presrat - without temporal dependence with shape (spatial, spatial, 1) - k_factor : np.ndarray - K factor from the presrat method with shape (spatial, spatial, N) where - N is the number of time observations at which the bias correction is - calculated - time_window_center : np.ndarray - Sequence of days of the year equally spaced and shifted by half - window size, thus `ntimes`=12 results in approximately [15, 45, - ...]. It includes the fraction of a day, thus 15.5 is equivalent - to January 15th, 12:00h. Shape is (N,) - dist : str - Probability distribution name to use to model the data which - determines how the param args are used. This can "empirical" or any - continuous distribution name from ``scipy.stats``. - sampling : str | np.ndarray - If dist="empirical", this is an option for how the quantiles were - sampled to produce the params inputs, e.g., how to sample the - y-axis of the distribution (see sampling functions in - ``rex.utilities.bc_utils``). "linear" will do even spacing, "log" - will concentrate samples near quantile=0, and "invlog" will - concentrate samples near quantile=1. Can also be a 1D array of dist - inputs if being used from reV, but they must all be the same - option. - log_base : int | float | np.ndarray - Log base value if sampling is "log" or "invlog". A higher value - will concentrate more samples at the extreme sides of the - distribution. Can also be a 1D array of dist inputs if being used - from reV, but they must all be the same option. - relative : bool | np.ndarray - Flag to preserve relative rather than absolute changes in - quantiles. relative=False (default) will multiply by the change in - quantiles while relative=True will add. See Equations 4-6 from - Cannon et al., 2015 for more details. Can also be a 1D array of - dist inputs if being used from reV, but they must all be the same - option. - no_trend : bool, default=False - An option to ignore the trend component of the correction, thus - resulting in an ordinary Quantile Mapping, i.e. corrects the bias by - comparing the distributions of the biased dataset with a reference - datasets, without reinforcing the zero rate or applying the k-factor. - See ``params_mf`` of - :class:`rex.utilities.bc_utils.QuantileDeltaMapping`. Note that this - assumes that params_mh is the data distribution representative for the - target data. - delta_denom_min : float | None - Option to specify a minimum value for the denominator term in the - calculation of a relative delta value. This prevents division by a - very small number making delta blow up and resulting in very large - output bias corrected values. See equation 4 of Cannon et al., 2015 - for the delta term. If this is not set, the ``zero_rate_threshold`` - calculated as part of the presrat bias calculation will be used - delta_range : tuple | None - Option to set a (min, max) on the delta term in QDM. This can help - prevent QDM from making non-realistic increases/decreases in - otherwise physical values. See equation 4 of Cannon et al., 2015 for - the delta term. - out_range : None | tuple - Option to set floor/ceiling values on the output data. - max_workers : int | None - Max number of workers to use for QDM process pool - """ - - data_unbiased = np.full_like(data, np.nan) - closest_time_idx = abs(time_window_center[:, np.newaxis] - - np.array(time_index.day_of_year)) - closest_time_idx = closest_time_idx.argmin(axis=0) - - for nt in set(closest_time_idx): - subset_idx = closest_time_idx == nt - subset = data[:, :, subset_idx] - oh = base_params[:, :, nt] - mh = bias_params[:, :, nt] - mf = bias_fut_params[:, :, nt] - - mf = None if no_trend else mf.reshape(-1, mf.shape[-1]) - - # The distributions are 3D (space, space, N-params) - # Collapse 3D (space, space, N) into 2D (space**2, N) - QDM = QuantileDeltaMapping(oh.reshape(-1, oh.shape[-1]), - mh.reshape(-1, mh.shape[-1]), - mf, - dist=dist, - relative=relative, - sampling=sampling, - log_base=log_base, - delta_denom_min=delta_denom_min, - delta_range=delta_range, - ) - - # input 3D shape (spatial, spatial, temporal) - # QDM expects input arr with shape (time, space) - tmp = subset.reshape(-1, subset.shape[-1]).T - # Apply QDM correction - tmp = QDM(tmp, max_workers=max_workers) - # Reorgnize array back from (time, space) - # to (spatial, spatial, temporal) - subset = tmp.T.reshape(subset.shape) - - # If no trend, it doesn't make sense to correct for zero rate or - # apply the k-factor, but limit to QDM only. - if not no_trend: - subset = np.where(subset < bias_tau_fut, 0, subset) - subset = subset * np.asarray(k_factor[:, :, nt:nt + 1]) - - data_unbiased[:, :, subset_idx] = subset - - if out_range is not None: - data_unbiased = np.maximum(data_unbiased, np.min(out_range)) - data_unbiased = np.minimum(data_unbiased, np.max(out_range)) - - if np.isnan(data_unbiased).any(): - msg = ('Presrat bias correction resulted in NaN values! If this is a ' - 'relative QDM, you may try setting ``delta_denom_min`` or ' - '``delta_denom_zero``') - logger.error(msg) - raise RuntimeError(msg) - - return data_unbiased - - -def local_presrat_bc(data: np.ndarray, - lat_lon: np.ndarray, - base_dset: str, - feature_name: str, - bias_fp, - date_range_kwargs: dict, - lr_padded_slice=None, - threshold=0.1, - relative=True, - no_trend=False, - delta_denom_min=None, - delta_range=None, - k_range=None, - out_range=None, - max_workers=1, - ): +def local_presrat_bc( + data: np.ndarray, + lat_lon: np.ndarray, + base_dset: str, + feature_name: str, + bias_fp, + date_range_kwargs: dict, + lr_padded_slice=None, + threshold=0.1, + relative=True, + no_trend=False, + delta_denom_min=None, + delta_denom_zero=None, + delta_range=None, + k_range=None, + out_range=None, + max_workers=1, +): """Bias correction using PresRat Parameters @@ -1050,6 +1020,12 @@ def local_presrat_bc(data: np.ndarray, output bias corrected values. See equation 4 of Cannon et al., 2015 for the delta term. If this is not set, the ``zero_rate_threshold`` calculated as part of the presrat bias calculation will be used + delta_denom_zero : float | None + Option to specify a value to replace zeros in the denominator term + in the calculation of a relative delta value. This prevents + division by a very small number making delta blow up and resulting + in very large output bias corrected values. See equation 4 of + Cannon et al., 2015 for the delta term. delta_range : tuple | None Option to set a (min, max) on the delta term in QDM. This can help prevent QDM from making non-realistic increases/decreases in @@ -1065,25 +1041,22 @@ def local_presrat_bc(data: np.ndarray, time_index = make_time_index_from_kws(date_range_kwargs) msg = f'data was expected to be a 3D array but got shape {data.shape}' assert data.ndim == 3, msg - msg = (f'Time should align with data 3rd dimension but got data ' - f'{data.shape} and time_index length ' - f'{time_index.size}: {time_index}') + msg = ( + f'Time should align with data 3rd dimension but got data ' + f'{data.shape} and time_index length ' + f'{time_index.size}: {time_index}' + ) assert data.shape[-1] == time_index.size, msg params = _get_spatial_bc_presrat( lat_lon, base_dset, feature_name, bias_fp, threshold ) cfg = params['cfg'] - time_window_center = cfg['time_window_center'] - data = np.asarray(data) - base_params = np.asarray(params['base']) - bias_params = np.asarray(params['bias']) - bias_fut_params = np.asarray(params['bias_fut']) + base_params = params['base'] + bias_params = params['bias'] + bias_fut_params = params['bias_fut'] bias_tau_fut = np.asarray(params['bias_tau_fut']) k_factor = params['k_factor'] - dist = cfg['dist'] - sampling = cfg['sampling'] - log_base = cfg['log_base'] zero_rate_threshold = cfg['zero_rate_threshold'] delta_denom_min = delta_denom_min or zero_rate_threshold @@ -1091,8 +1064,10 @@ def local_presrat_bc(data: np.ndarray, k_factor = np.maximum(k_factor, np.min(k_range)) k_factor = np.minimum(k_factor, np.max(k_range)) - logger.debug(f'Presrat K Factor has shape {k_factor.shape} and ranges ' - f'from {k_factor.min()} to {k_factor.max()}') + logger.debug( + f'Presrat K Factor has shape {k_factor.shape} and ranges ' + f'from {k_factor.min()} to {k_factor.max()}' + ) if lr_padded_slice is not None: spatial_slice = (lr_padded_slice[0], lr_padded_slice[1]) @@ -1100,15 +1075,50 @@ def local_presrat_bc(data: np.ndarray, bias_params = bias_params[spatial_slice] bias_fut_params = bias_fut_params[spatial_slice] - data_unbiased = _apply_presrat_bc(data, time_index, base_params, - bias_params, bias_fut_params, - bias_tau_fut, k_factor, - time_window_center, dist=dist, - sampling=sampling, log_base=log_base, - relative=relative, no_trend=no_trend, - delta_denom_min=delta_denom_min, - delta_range=delta_range, - out_range=out_range, - max_workers=max_workers) + data_unbiased = da.full_like(data, np.nan) + closest_time_idx = abs( + cfg['time_window_center'][:, np.newaxis] + - np.array(time_index.day_of_year) + ) + closest_time_idx = closest_time_idx.argmin(axis=0) + + for nt in set(closest_time_idx): + subset_idx = closest_time_idx == nt + subset = _apply_qdm( + subset=data[:, :, subset_idx], + base_params=base_params, + bias_params=bias_params, + bias_fut_params=bias_fut_params, + current_time_idx=nt, + dist=cfg.get('dist', 'empirical'), + sampling=cfg.get('sampling', 'linear'), + log_base=cfg.get('log_base', 10), + relative=relative, + no_trend=no_trend, + delta_denom_min=delta_denom_min, + delta_denom_zero=delta_denom_zero, + delta_range=delta_range, + max_workers=max_workers, + ) + # If no trend, it doesn't make sense to correct for zero rate or + # apply the k-factor, but limit to QDM only. + if not no_trend: + subset = np.where(subset < bias_tau_fut, 0, subset) + subset *= k_factor[:, :, nt : nt + 1] + + data_unbiased[:, :, subset_idx] = subset + + if out_range is not None: + data_unbiased = np.maximum(data_unbiased, np.min(out_range)) + data_unbiased = np.minimum(data_unbiased, np.max(out_range)) + + if da.isnan(data_unbiased).any(): + msg = ( + 'Presrat bias correction resulted in NaN values! If this is a ' + 'relative QDM, you may try setting ``delta_denom_min`` or ' + '``delta_denom_zero``' + ) + logger.error(msg) + raise RuntimeError(msg) return data_unbiased diff --git a/sup3r/bias/utilities.py b/sup3r/bias/utilities.py index ffa0a73e5..817620dde 100644 --- a/sup3r/bias/utilities.py +++ b/sup3r/bias/utilities.py @@ -107,6 +107,11 @@ def qdm_bc( relative=True, threshold=0.1, no_trend=False, + delta_denom_min=None, + delta_denom_zero=None, + delta_range=None, + out_range=None, + max_workers=1 ): """Bias Correction using Quantile Delta Mapping @@ -149,6 +154,27 @@ def qdm_bc( Note that this assumes that "bias_{feature}_params" (``params_mh``) is the data distribution representative for the target data. + delta_denom_min : float | None + Option to specify a minimum value for the denominator term in the + calculation of a relative delta value. This prevents division by a + very small number making delta blow up and resulting in very large + output bias corrected values. See equation 4 of Cannon et al., 2015 + for the delta term. + delta_denom_zero : float | None + Option to specify a value to replace zeros in the denominator term + in the calculation of a relative delta value. This prevents + division by a very small number making delta blow up and resulting + in very large output bias corrected values. See equation 4 of + Cannon et al., 2015 for the delta term. + delta_range : tuple | None + Option to set a (min, max) on the delta term in QDM. This can help + prevent QDM from making non-realistic increases/decreases in + otherwise physical values. See equation 4 of Cannon et al., 2015 for + the delta term. + out_range : None | tuple + Option to set floor/ceiling values on the output data. + max_workers: int | None + Max number of workers to use for QDM process pool """ if isinstance(bc_files, str): @@ -172,7 +198,7 @@ def qdm_bc( ) ) handler.data[feature] = local_qdm_bc( - handler.data[feature][...], + handler.data[feature], handler.lat_lon, bias_feature, feature, @@ -181,6 +207,12 @@ def qdm_bc( threshold=threshold, relative=relative, no_trend=no_trend, + delta_denom_min=delta_denom_min, + delta_denom_zero=delta_denom_zero, + delta_range=delta_range, + out_range=out_range, + max_workers=max_workers + ) completed.append(feature) diff --git a/sup3r/preprocessing/cachers/base.py b/sup3r/preprocessing/cachers/base.py index 3792c5eec..2a19fc3c7 100644 --- a/sup3r/preprocessing/cachers/base.py +++ b/sup3r/preprocessing/cachers/base.py @@ -208,7 +208,19 @@ def parse_chunks(feature, chunks, dims): @classmethod def get_chunksizes(cls, dset, data, chunks): """Get chunksizes after rechunking (could be undetermined beforehand - if ``chunks == 'auto'``) and return rechunked data.""" + if ``chunks == 'auto'``) and return rechunked data. + + Parameters + ---------- + dset : str + Name of feature to get chunksizes for. + data : Sup3rX | xr.Dataset + ``Sup3rX`` or ``xr.Dataset`` containing data to be cached. + chunks : dict | None | 'auto' + Dictionary of chunksizes either to use for all features or, if the + dictionary includes feature keys, feature specific chunksizes. Can + also be None or 'auto'. + """ data_var = data.coords[dset] if dset in data.coords else data[dset] fchunk = cls.parse_chunks(dset, chunks, data_var.dims) if isinstance(fchunk, dict): @@ -233,10 +245,23 @@ def get_chunksizes(cls, dset, data, chunks): return data_var, chunksizes @classmethod - def add_coord_meta(cls, out_file, data): + def add_coord_meta(cls, out_file, data, meta=None): """Add flattened coordinate meta to out_file. This is used for h5 - caching.""" - meta = pd.DataFrame() + caching. + + Parameters + ---------- + out_file : str + Name of output file. + data : Sup3rX | xr.Dataset + Data being written to the given ``out_file``. + meta : pd.DataFrame | None + Optional additional meta information to be written to the given + ``out_file``. If this is None then only coordinate info will be + included in the meta written to the ``out_file`` + """ + if meta is None or (isinstance(meta, dict) and not meta): + meta = pd.DataFrame() for coord in Dimension.coords_2d(): if coord in data: meta[coord] = data[coord].data.flatten() @@ -280,18 +305,21 @@ def write_h5( attrs : dict | None Optional attributes to write to file. Can specify dataset specific attributes by adding a dictionary with the dataset name as a key. - e.g. {**global_attrs, dset: {...}} + e.g. {**global_attrs, dset: {...}}. Can also include a global meta + dataframe that will then be added to the coordinate meta. verbose : bool Dummy arg to match ``write_netcdf`` signature """ - if len(data.dims) == 3: + if len(data.dims) == 3 and Dimension.TIME in data.dims: data = data.transpose(Dimension.TIME, *Dimension.dims_2d()) if features == 'all': features = list(data.data_vars) features = features if isinstance(features, list) else [features] chunks = chunks or 'auto' global_attrs = data.attrs.copy() - global_attrs.update(attrs or {}) + attrs = attrs or {} + meta = attrs.pop('meta', {}) + global_attrs.update(attrs) attrs = {k: safe_cast(v) for k, v in global_attrs.items()} with h5py.File(out_file, mode) as f: for k, v in attrs.items(): @@ -335,7 +363,7 @@ def write_h5( scheduler='threads', num_workers=max_workers, ) - cls.add_coord_meta(out_file=out_file, data=data) + cls.add_coord_meta(out_file=out_file, data=data, meta=meta) @staticmethod def get_chunk_slices(chunks, shape): @@ -383,7 +411,7 @@ def write_netcdf_chunks( _mem_check(), ) for i, chunk_slice in enumerate(chunk_slices): - msg = f'Writing chunk {i} / {len(chunk_slices)} to {out_file}' + msg = f'Writing chunk {i + 1} / {len(chunk_slices)} to {out_file}' msg = None if not verbose else msg chunk = data_var.data[chunk_slice] task = dask.delayed(cls.write_chunk)( diff --git a/sup3r/preprocessing/data_handlers/factory.py b/sup3r/preprocessing/data_handlers/factory.py index 99ef6d184..1115ec792 100644 --- a/sup3r/preprocessing/data_handlers/factory.py +++ b/sup3r/preprocessing/data_handlers/factory.py @@ -111,7 +111,8 @@ def __init__( Keyword arguments for nan handling. If 'mask', time steps with nans will be dropped. Otherwise this should be a dict of kwargs which will be passed to - :py:meth:`sup3r.preprocessing.accessor.Sup3rX.interpolate_na`. + :py:meth:`sup3r.preprocessing.accessor.Sup3rX.interpolate_na`. e.g. + {'method': 'linear', 'dim': 'time'} BaseLoader : Callable Base level file loader wrapped by :class:`~sup3r.preprocessing.loaders.Loader`. This is usually diff --git a/sup3r/preprocessing/loaders/nc.py b/sup3r/preprocessing/loaders/nc.py index dc80bbebb..205e47d02 100644 --- a/sup3r/preprocessing/loaders/nc.py +++ b/sup3r/preprocessing/loaders/nc.py @@ -124,7 +124,10 @@ def _rechunk_dsets(self, res): res.data_vars.""" for dset in [*list(res.coords), *list(res.data_vars)]: chunks = self._parse_chunks(dims=res[dset].dims, feature=dset) - if chunks != 'auto': + + # specifying chunks to xarray.open_mfdataset doesn't automatically + # apply to coordinates so we do that here + if chunks != 'auto' or dset in Dimension.coords_2d(): res[dset] = res[dset].chunk(chunks) return res diff --git a/sup3r/utilities/era_downloader.py b/sup3r/utilities/era_downloader.py index 1925b4dcf..9593c0db9 100644 --- a/sup3r/utilities/era_downloader.py +++ b/sup3r/utilities/era_downloader.py @@ -323,6 +323,11 @@ def download_file( Whether to overwrite existing file """ if os.path.exists(out_file) and not cls._can_skip_file(out_file): + logger.info( + 'Previous download of %s failed. Removing %s.', + out_file, + out_file, + ) os.remove(out_file) if not cls._can_skip_file(out_file) or overwrite: @@ -807,7 +812,7 @@ def _can_skip_file(cls, file): try: _ = Loader(file) except Exception as e: - msg = 'Could not open %s. %s Will redownload.' + msg = 'Could not open %s. %s. Will redownload.' logger.warning(msg, file, e) warn(msg % (file, e)) openable = False diff --git a/tests/bias/test_presrat_bias_correction.py b/tests/bias/test_presrat_bias_correction.py index 593521646..7137965ef 100644 --- a/tests/bias/test_presrat_bias_correction.py +++ b/tests/bias/test_presrat_bias_correction.py @@ -642,7 +642,8 @@ def test_presrat_transform(presrat_params, precip_fut): - unbiased zero rate is not smaller the input zero rate """ # local_presrat_bc expects time in the last dimension. - data = precip_fut.transpose('lat', 'lon', 'time').values + + data = precip_fut.transpose('lat', 'lon', 'time') time = pd.to_datetime(precip_fut.time) latlon = np.stack( xr.broadcast(precip_fut['lat'], precip_fut['lon'] - 360), axis=-1 diff --git a/tests/bias/test_qdm_bias_correction.py b/tests/bias/test_qdm_bias_correction.py index d2dcc11b0..d5722d43b 100644 --- a/tests/bias/test_qdm_bias_correction.py +++ b/tests/bias/test_qdm_bias_correction.py @@ -312,23 +312,20 @@ def test_qdm_transform_notrend(tmp_path, dist_params): assert np.allclose(corrected, unbiased, equal_nan=True) -def test_handler_qdm_bc(fp_fut_cc, dist_params): - """qdm_bc() method from DataHandler - - WIP: Confirm it runs, but don't verify much yet. - """ +def test_qdm_bc_method(fp_fut_cc, dist_params): + """Tesat qdm_bc standalone method""" Handler = DataHandler(fp_fut_cc, 'rsds') original = Handler.data.as_array().copy() qdm_bc(Handler, dist_params, 'ghi') corrected = Handler.data.as_array() + original = compute_if_dask(original) + corrected = compute_if_dask(corrected) assert not np.isnan(corrected).all(), "Can't compare if only NaN" idx = ~(np.isnan(original) | np.isnan(corrected)) # Where it is not NaN, it must have differences. - assert not np.allclose( - compute_if_dask(original)[idx], compute_if_dask(corrected)[idx] - ) + assert not np.allclose(original[idx], corrected[idx]) def test_bc_identity(tmp_path, fp_fut_cc, dist_params): From 9e400154b524802c4a475062f301b109326f2065 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Tue, 19 Nov 2024 15:09:52 -0700 Subject: [PATCH 3/5] blockwise operation causing issues with compatible shapes --- sup3r/bias/bias_transforms.py | 71 +++++++++++++++-------------------- 1 file changed, 31 insertions(+), 40 deletions(-) diff --git a/sup3r/bias/bias_transforms.py b/sup3r/bias/bias_transforms.py index 07b39c4fe..dbfaf10f4 100644 --- a/sup3r/bias/bias_transforms.py +++ b/sup3r/bias/bias_transforms.py @@ -478,7 +478,6 @@ def _apply_qdm( base_params, bias_params, bias_fut_params, - current_time_idx, dist='empirical', sampling='linear', log_base=10, @@ -514,8 +513,6 @@ def _apply_qdm( Same requirements as params_oh. This input arg is for the **modeled future distribution**. If this is None, this defaults to params_mh (no future data, just corrected to modeled historical distribution) - current_time_idx : int - Time index for the current qdm application. dist : str Probability distribution name to use to model the data which determines how the param args are used. This can "empirical" or any @@ -572,22 +569,20 @@ def _apply_qdm( Max number of workers to use for QDM process pool """ - # Naming following the paper: observed historical - oh = base_params[:, :, current_time_idx] - # Modeled historical - mh = bias_params[:, :, current_time_idx] - # Modeled future - mf = bias_fut_params[:, :, current_time_idx] - # This satisfies the rex's QDM design - mf = None if no_trend else np.reshape(mf, (-1, mf.shape[-1])) + bias_fut_params = ( + None + if no_trend + else np.reshape(bias_fut_params, (-1, bias_fut_params.shape[-1])) + ) # The distributions at this point, after selected the respective # time window with `window_idx`, are 3D (space, space, N-params) # Collapse 3D (space, space, N) into 2D (space**2, N) + QDM = QuantileDeltaMapping( - params_oh=np.reshape(oh, (-1, oh.shape[-1])), - params_mh=np.reshape(mh, (-1, mh.shape[-1])), - params_mf=mf, + params_oh=np.reshape(base_params, (-1, base_params.shape[-1])), + params_mh=np.reshape(bias_params, (-1, bias_params.shape[-1])), + params_mf=bias_fut_params, dist=dist, relative=relative, sampling=sampling, @@ -596,15 +591,11 @@ def _apply_qdm( delta_denom_zero=delta_denom_zero, delta_range=delta_range, ) - # input 3D shape (spatial, spatial, temporal) # QDM expects input arr with shape (time, space) tmp = np.reshape(subset.data, (-1, subset.shape[-1])).T # Apply QDM correction - tmp = da.blockwise( - QDM, 'ij', tmp, 'ij', dtype=np.float32, max_workers=max_workers - ) - + tmp = QDM(tmp, max_workers=max_workers) # Reorgnize array back from (time, space) # to (spatial, spatial, temporal) return da.reshape(tmp.T, subset.shape) @@ -762,29 +753,29 @@ def local_qdm_bc( cfg = params['cfg'] base_params = params['base'] bias_params = params['bias'] - bias_fut_params = params['bias_fut'] + bias_fut_params = params.get('bias_fut', None) if lr_padded_slice is not None: spatial_slice = (lr_padded_slice[0], lr_padded_slice[1]) base_params = base_params[spatial_slice] bias_params = bias_params[spatial_slice] - bias_fut_params = bias_fut_params[spatial_slice] + if bias_fut_params is not None: + bias_fut_params = bias_fut_params[spatial_slice] data_unbiased = da.full_like(data, np.nan) - closest_time_idx = abs( - cfg['time_window_center'][:, np.newaxis] - - np.array(time_index.day_of_year) - ) - closest_time_idx = closest_time_idx.argmin(axis=0) + closest_time_idx = [ + np.argmin(abs(d - cfg['time_window_center'])) + for d in time_index.day_of_year + ] for nt in set(closest_time_idx): subset_idx = closest_time_idx == nt + mf = None if bias_fut_params is None else bias_fut_params[:, :, nt] subset = _apply_qdm( subset=data[:, :, subset_idx], - base_params=base_params, - bias_params=bias_params, - bias_fut_params=bias_fut_params, - current_time_idx=nt, + base_params=base_params[:, :, nt], + bias_params=bias_params[:, :, nt], + bias_fut_params=mf, dist=cfg.get('dist', 'empirical'), sampling=cfg.get('sampling', 'linear'), log_base=cfg.get('log_base', 10), @@ -1073,23 +1064,23 @@ def local_presrat_bc( spatial_slice = (lr_padded_slice[0], lr_padded_slice[1]) base_params = base_params[spatial_slice] bias_params = bias_params[spatial_slice] - bias_fut_params = bias_fut_params[spatial_slice] + if bias_fut_params is not None: + bias_fut_params = bias_fut_params[spatial_slice] data_unbiased = da.full_like(data, np.nan) - closest_time_idx = abs( - cfg['time_window_center'][:, np.newaxis] - - np.array(time_index.day_of_year) - ) - closest_time_idx = closest_time_idx.argmin(axis=0) + closest_time_idx = [ + np.argmin(abs(d - cfg['time_window_center'])) + for d in time_index.day_of_year + ] for nt in set(closest_time_idx): subset_idx = closest_time_idx == nt + mf = None if bias_fut_params is None else bias_fut_params[:, :, nt] subset = _apply_qdm( subset=data[:, :, subset_idx], - base_params=base_params, - bias_params=bias_params, - bias_fut_params=bias_fut_params, - current_time_idx=nt, + base_params=base_params[:, :, nt], + bias_params=bias_params[:, :, nt], + bias_fut_params=mf, dist=cfg.get('dist', 'empirical'), sampling=cfg.get('sampling', 'linear'), log_base=cfg.get('log_base', 10), From 420c2296de02d90f4d9ae6a7fae72959539a8910 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Wed, 20 Nov 2024 14:08:05 -0700 Subject: [PATCH 4/5] use np.reshape instead of da.reshape incase input is np.ndarray rather than dask array. this will preserve dask array chunks. --- sup3r/bias/bias_transforms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sup3r/bias/bias_transforms.py b/sup3r/bias/bias_transforms.py index dbfaf10f4..6d9b6f98c 100644 --- a/sup3r/bias/bias_transforms.py +++ b/sup3r/bias/bias_transforms.py @@ -598,7 +598,7 @@ def _apply_qdm( tmp = QDM(tmp, max_workers=max_workers) # Reorgnize array back from (time, space) # to (spatial, spatial, temporal) - return da.reshape(tmp.T, subset.shape) + return np.reshape(tmp.T, subset.shape) def local_qdm_bc( From 8027b19bcdb693e45ae2cd28da776c52eedd4a8b Mon Sep 17 00:00:00 2001 From: bnb32 Date: Wed, 20 Nov 2024 15:47:51 -0700 Subject: [PATCH 5/5] self multiplication issue with dask array --- sup3r/bias/bias_transforms.py | 5 +++-- sup3r/utilities/interpolation.py | 12 +++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sup3r/bias/bias_transforms.py b/sup3r/bias/bias_transforms.py index 6d9b6f98c..6229cf010 100644 --- a/sup3r/bias/bias_transforms.py +++ b/sup3r/bias/bias_transforms.py @@ -1094,8 +1094,9 @@ def local_presrat_bc( # If no trend, it doesn't make sense to correct for zero rate or # apply the k-factor, but limit to QDM only. if not no_trend: - subset = np.where(subset < bias_tau_fut, 0, subset) - subset *= k_factor[:, :, nt : nt + 1] + subset = np.where( + subset < bias_tau_fut, 0, subset * k_factor[:, :, nt : nt + 1] + ) data_unbiased[:, :, subset_idx] = subset diff --git a/sup3r/utilities/interpolation.py b/sup3r/utilities/interpolation.py index fa3da71cd..dd0d8c1bb 100644 --- a/sup3r/utilities/interpolation.py +++ b/sup3r/utilities/interpolation.py @@ -65,7 +65,17 @@ def _lin_interp(cls, lev_samps, var_samps, level): 0, da.map_blocks(lambda x, y: x / y, (level - lev_samps[0]), diff), ) - return (1 - alpha) * var_samps[0] + alpha * var_samps[1] + indices = 'ijk'[:lev_samps[0].ndim] + return da.blockwise( + lambda x, y, a: x * (1 - a) + y * a, + indices, + var_samps[0], + indices, + var_samps[1], + indices, + alpha, + indices, + ) @classmethod def _log_interp(cls, lev_samps, var_samps, level):