diff --git a/skpro/distributions/qpd.py b/skpro/distributions/qpd.py index dc724c524..4b7c7c4c0 100644 --- a/skpro/distributions/qpd.py +++ b/skpro/distributions/qpd.py @@ -9,13 +9,8 @@ "setoguchi-naoki", ] # interface only. Cyclic boosting authors in cyclic_boosting package -import typing -import warnings from typing import Sequence -if typing.TYPE_CHECKING: - from cyclic_boosting.quantile_matching import J_QPD_S, J_QPD_B - import numpy as np import pandas as pd from scipy.stats import logistic, norm @@ -85,7 +80,6 @@ class QPD_Johnson(_DelegatedDistribution): # -------------- "authors": ["setoguchi-naoki", "felix-wick", "fkiraly"], "maintainers": ["setoguchi-naoki"], - "python_dependencies": ["cyclic_boosting>=1.4.0", "findiff"], # estimator tags # -------------- "capabilities:approx": ["pdfnorm", "energy"], @@ -158,9 +152,9 @@ def get_test_params(cls, parameter_set="default"): params2 = { "alpha": 0.1, "version": "normal", - "qv_low": [0.2, 0.2, 0.2], - "qv_median": [0.5, 0.5, 0.5], - "qv_high": [0.8, 0.8, 0.8], + "qv_low": [[-0.3], [-0.2], [-0.1]], + "qv_median": [[-0.1], [0.0], [0.1]], + "qv_high": [[0.2], [0.3], [0.4]], "index": pd.Index([1, 2, 5]), "columns": pd.Index(["a"]), } @@ -231,14 +225,22 @@ class QPD_S(BaseDistribution): _tags = { # packaging info # -------------- - "authors": ["setoguchi-naoki", "felix-wick"], + "authors": ["setoguchi-naoki", "felix-wick", "fkiraly"], "maintainers": ["setoguchi-naoki"], - "python_dependencies": ["cyclic_boosting>=1.4.0", "findiff"], # estimator tags # -------------- "capabilities:approx": ["pdfnorm", "energy"], "capabilities:exact": ["mean", "var", "cdf", "ppf", "pdf"], "distr:measuretype": "continuous", + "broadcast_init": "on", + "broadcast_params": [ + "alpha", + "qv_low", + "qv_median", + "qv_high", + "lower", + "upper", + ], } def __init__( @@ -263,138 +265,78 @@ def __init__( self.index = index self.columns = columns - from cyclic_boosting.quantile_matching import J_QPD_S + super().__init__(index=index, columns=columns) - qv_low, qv_median, qv_high = _prep_qpd_params(qv_low, qv_median, qv_high) + # precompute parameters for methods + phi = _resolve_phi(version) + self.phi = phi - if index is None: - index = pd.RangeIndex(qv_low.shape[0]) - self.index = index + qpd_params = _prep_qpd_vars(phi=phi, mode="S", **self._bc_params) + self._qpd_params = qpd_params - if columns is None: - columns = pd.RangeIndex(1) - self.columns = columns + def _ppf(self, p: np.ndarray): + """Quantile function = percent point function = inverse cdf.""" + lower = self._bc_params["lower"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + c = self._qpd_params["c"] + n = self._qpd_params["n"] + theta = self._qpd_params["theta"] - self._shape = (qv_low.shape[0], 1) + phi = self.phi - if version == "normal": - self.phi = norm() - elif version == "logistic": - self.phi = logistic() - else: - raise Exception("Invalid version.") - - if (np.any(qv_low > qv_median)) or np.any(qv_high < qv_median): - warnings.warn( - "The SPT values are not monotonically increasing, " - "each SPT is sorted by value", - stacklevel=2, - ) - idx = np.where((qv_low > qv_median), True, False) + np.where( - (qv_high < qv_median), True, False - ) - un_orderd_idx = np.argwhere(idx > 0).tolist() - warnings.warn(f"sorted index {un_orderd_idx}", stacklevel=2) - for idx in un_orderd_idx: - low, mid, high = sorted([qv_low[idx], qv_median[idx], qv_high[idx]]) - qv_low[idx] = low - qv_median[idx] = mid - qv_high[idx] = high - - self.qpd = J_QPD_S( - alpha=alpha, - qv_low=qv_low, - qv_median=qv_median, - qv_high=qv_high, - l=self.lower, - version=version, - ) - super().__init__(index=index, columns=columns) + in_sinh = np.arcsinh(phi.ppf(p) * delta) + np.arcsinh(n * c * delta) + in_exp = kappa * np.sinh(in_sinh) + ppf_arr = lower + theta * np.exp(in_exp) - def _mean(self): - """Return expected value of the distribution. - - Please set the upper and lower limits of the random variable correctly. - - Returns - ------- - pd.DataFrame with same rows, columns as `self` - expected value of distribution (entry-wise) - """ - params = self.get_params(deep=False) - lower = params["lower"] - upper = params["upper"] - index = params["index"] - x = np.linspace(lower, upper, num=int(1e3)) - cdf = self.qpd.cdf(x) - if cdf.ndim < 2: - cdf = cdf[:, np.newaxis] - loc = exp_func(x, cdf.T, index.shape[0]) - return loc - - def _var(self): - """Return element/entry-wise variance of the distribution. - - Please set the upper and lower limits of the random variable correctly. - - Returns - ------- - pd.DataFrame with same rows, columns as `self` - variance of distribution (entry-wise) - """ - params = self.get_params(deep=False) - lower = params["lower"] - upper = params["upper"] - index = params["index"] - mean = self.mean().values - x = np.linspace(lower, upper, num=int(1e3)) - cdf = self.qpd.cdf(x) - if cdf.ndim < 2: - cdf = cdf[:, np.newaxis] - var = var_func(x, mean, cdf.T, index.shape[0]) - return var + return ppf_arr def _pdf(self, x: np.ndarray): - """Probability density function. + """Probability density function.""" + lower = self._bc_params["lower"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + c = self._qpd_params["c"] + n = self._qpd_params["n"] + theta = self._qpd_params["theta"] - this fucntion transform cdf to pdf - because j-qpd's pdf calculation is bit complex - """ - return pdf_func(x, self.qpd) + phi = self.phi - def _ppf(self, p: np.ndarray): - """Quantile function = percent point function = inverse cdf.""" - params = self.get_params(deep=False) - index = params["index"] - columns = params["columns"] - qv_low = params["qv_low"] - p_unique = np.unique(p) # de-broadcast - ppf_all = ppf_func(p_unique, self.qpd) - ppf_map = np.tile(p_unique, (qv_low.size, 1)).T - ppf = np.zeros((index.shape[0], len(columns))) - for r in range(p.shape[0]): - for c in range(p.shape[1]): - t = np.where(ppf_map[:, c] == p[r][c]) - ppf_part = ppf_all[t][c] - ppf[r][c] = ppf_part - return ppf + # we work through the chain rule for the entire nested expression in cdf + x_ = (x - lower) / theta + x_der = 1 / theta + + in_arcsinh = np.log(x_) / kappa + in_arcsinh_der = x_der / (kappa * x_) + + in_sinh = np.arcsinh(in_arcsinh) - np.arcsinh(n * c * delta) + in_sinh_der = arcsinh_der(in_arcsinh) * in_arcsinh_der + + in_cdf = np.sinh(in_sinh) / delta + in_cdf_der = np.cosh(in_sinh) * in_sinh_der / delta + + # cdf_arr = phi.cdf(in_cdf) + cdf_arr_der = phi.pdf(in_cdf) * in_cdf_der + + pdf_arr = cdf_arr_der + return pdf_arr def _cdf(self, x: np.ndarray): """Cumulative distribution function.""" - params = self.get_params(deep=False) - index = params["index"] - columns = params["columns"] - qv_low = params["qv_low"] - x_unique = np.unique(x) # de-broadcast - cdf_all = cdf_func(x_unique, self.qpd) - cdf_map = np.tile(x_unique, (qv_low.size, 1)).T - cdf = np.zeros((index.shape[0], len(columns))) - for r in range(x.shape[0]): - for c in range(x.shape[1]): - t = np.where(cdf_map[:, c] == x[r][c]) - cdf_part = cdf_all[t][c] - cdf[r][c] = cdf_part - return cdf + lower = self._bc_params["lower"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + c = self._qpd_params["c"] + n = self._qpd_params["n"] + theta = self._qpd_params["theta"] + + phi = self.phi + + in_arcsinh = np.log((x - lower) / theta) / kappa + in_sinh = np.arcsinh(in_arcsinh) - np.arcsinh(n * c * delta) + cdf_arr = phi.cdf(np.sinh(in_sinh) / delta) + + return cdf_arr @classmethod def get_test_params(cls, parameter_set="default"): @@ -412,9 +354,9 @@ def get_test_params(cls, parameter_set="default"): params2 = { "alpha": 0.2, "version": "normal", - "qv_low": [-0.3, -0.3, -0.3], - "qv_median": [0.0, 0.0, 0.0], - "qv_high": [0.3, 0.3, 0.3], + "qv_low": [[-0.3], [-0.2], [-0.1]], + "qv_median": [[-0.1], [0.0], [0.1]], + "qv_high": [[0.2], [0.3], [0.4]], "lower": -0.5, "index": pd.RangeIndex(3), "columns": pd.Index(["a"]), @@ -433,7 +375,7 @@ class QPD_B(BaseDistribution): Parameters ---------- - alpha : float + alpha : float or array_like[float] lower quantile of SPT (upper is ``1 - alpha``) qv_low : float or array_like[float] quantile function value of ``alpha`` @@ -441,11 +383,11 @@ class QPD_B(BaseDistribution): quantile function value of quantile 0.5 qv_high : float or array_like[float] quantile function value of quantile ``1 - alpha`` - lower : float + lower : float or array_like[float] lower bound of semi-bounded range. This is used when estimating QPD and calculating expectation and variance - upper : float + upper : float or array_like[float] upper bound of semi-bounded range. This is used when estimating QPD and calculating expectation and variance @@ -471,14 +413,22 @@ class QPD_B(BaseDistribution): _tags = { # packaging info # -------------- - "authors": ["setoguchi-naoki", "felix-wick"], + "authors": ["setoguchi-naoki", "felix-wick", "fkiraly"], "maintainers": ["setoguchi-naoki"], - "python_dependencies": ["cyclic_boosting>=1.4.0", "findiff"], # estimator tags # -------------- "capabilities:approx": ["pdfnorm", "energy"], "capabilities:exact": ["mean", "var", "cdf", "ppf", "pdf"], "distr:measuretype": "continuous", + "broadcast_init": "on", + "broadcast_params": [ + "alpha", + "qv_low", + "qv_median", + "qv_high", + "lower", + "upper", + ], } def __init__( @@ -503,137 +453,80 @@ def __init__( self.index = index self.columns = columns - from cyclic_boosting.quantile_matching import J_QPD_B + super().__init__(index=index, columns=columns) - qv_low, qv_median, qv_high = _prep_qpd_params(qv_low, qv_median, qv_high) + # precompute parameters for methods + phi = _resolve_phi(version) + self.phi = phi - if index is None: - index = pd.RangeIndex(qv_low.shape[0]) - self.index = index + qpd_params = _prep_qpd_vars(phi=phi, mode="B", **self._bc_params) + self._qpd_params = qpd_params - if columns is None: - columns = pd.RangeIndex(1) - self.columns = columns + def _ppf(self, p: np.ndarray): + """Quantile function = percent point function = inverse cdf.""" + lower = self._bc_params["lower"] + rnge = self._qpd_params["rnge"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + c = self._qpd_params["c"] + n = self._qpd_params["n"] + xi = self._qpd_params["xi"] - if version == "normal": - self.phi = norm() - elif version == "logistic": - self.phi = logistic() - else: - raise Exception("Invalid version.") - - if (np.any(qv_low > qv_median)) or np.any(qv_high < qv_median): - warnings.warn( - "The SPT values are not monotonically increasing, " - "each SPT is sorted by value", - stacklevel=2, - ) - idx = np.where((qv_low > qv_median), True, False) + np.where( - (qv_high < qv_median), True, False - ) - un_orderd_idx = np.argwhere(idx > 0).tolist() - warnings.warn(f"sorted index {un_orderd_idx}", stacklevel=2) - for idx in un_orderd_idx: - low, mid, high = sorted([qv_low[idx], qv_median[idx], qv_high[idx]]) - qv_low[idx] = low - qv_median[idx] = mid - qv_high[idx] = high - - self.qpd = J_QPD_B( - alpha=alpha, - qv_low=qv_low, - qv_median=qv_median, - qv_high=qv_high, - l=self.lower, - u=self.upper, - version=version, - ) - super().__init__(index=index, columns=columns) + phi = self.phi - def _mean(self): - """Return expected value of the distribution. - - Please set the upper and lower limits of the random variable correctly. - - Returns - ------- - pd.DataFrame with same rows, columns as `self` - expected value of distribution (entry-wise) - """ - params = self.get_params(deep=False) - lower = params["lower"] - upper = params["upper"] - index = params["index"] - x = np.linspace(lower, upper, num=int(1e3)) - cdf = self.qpd.cdf(x) - if cdf.ndim < 2: - cdf = cdf[:, np.newaxis] - loc = exp_func(x, cdf.T, index.shape[0]) - return loc - - def _var(self): - """Return element/entry-wise variance of the distribution. - - Please set the upper and lower limits of the random variable correctly. - - Returns - ------- - pd.DataFrame with same rows, columns as `self` - variance of distribution (entry-wise) - """ - params = self.get_params(deep=False) - lower = params["lower"] - upper = params["upper"] - index = params["index"] - mean = self.mean().values - x = np.linspace(lower, upper, num=int(1e3)) - cdf = self.qpd.cdf(x) - if cdf.ndim < 2: - cdf = cdf[:, np.newaxis] - var = var_func(x, mean, cdf.T, index.shape[0]) - return var + in_cdf = xi + kappa * np.sinh(delta * (phi.ppf(p) + n * c)) + ppf_arr = lower + rnge * phi.cdf(in_cdf) + return ppf_arr def _pdf(self, x: np.ndarray): - """Probability density function. + """Probability density function.""" + lower = self._bc_params["lower"] + rnge = self._qpd_params["rnge"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + c = self._qpd_params["c"] + n = self._qpd_params["n"] + xi = self._qpd_params["xi"] - this fucntion transform cdf to pdf - because j-qpd's pdf calculation is bit complex - """ - return pdf_func(x, self.qpd) + phi = self.phi - def _ppf(self, p: np.ndarray): - """Quantile function = percent point function = inverse cdf.""" - params = self.get_params(deep=False) - index = params["index"] - columns = params["columns"] - qv_low = params["qv_low"] - p_unique = np.unique(p) # de-broadcast - ppf_all = ppf_func(p_unique, self.qpd) - ppf_map = np.tile(p_unique, (qv_low.size, 1)).T - ppf = np.zeros((index.shape[0], len(columns))) - for r in range(p.shape[0]): - for c in range(p.shape[1]): - t = np.where(ppf_map[:, c] == p[r][c]) - ppf_part = ppf_all[t][c] - ppf[r][c] = ppf_part - return ppf + # we work through the chain rule for the entire nested expression in cdf + x_ = (x - lower) / rnge + x_der = 1 / rnge + + phi_ppf = phi.ppf(x_) + # derivative of ppf at z is 1 / pdf(ppf(z)) + phi_ppf_der = x_der / phi.pdf(phi.ppf(x_)) + + in_arcsinh = (phi_ppf - xi) / kappa + in_arcsinh_der = phi_ppf_der / kappa + + in_cdf = np.arcsinh(in_arcsinh) / delta - n * c + in_cdf_der = arcsinh_der(in_arcsinh) * in_arcsinh_der / delta + + # cdf_arr = phi.cdf(in_cdf) + cdf_arr_der = phi.pdf(in_cdf) * in_cdf_der + + pdf_arr = cdf_arr_der + return pdf_arr def _cdf(self, x: np.ndarray): """Cumulative distribution function.""" - params = self.get_params(deep=False) - index = params["index"] - columns = params["columns"] - qv_low = params["qv_low"] - x_unique = np.unique(x) # de-broadcast - cdf_all = cdf_func(x_unique, self.qpd) - cdf_map = np.tile(x_unique, (qv_low.size, 1)).T - cdf = np.zeros((index.shape[0], len(columns))) - for r in range(x.shape[0]): - for c in range(x.shape[1]): - t = np.where(cdf_map[:, c] == x[r][c]) - cdf_part = cdf_all[t][c] - cdf[r][c] = cdf_part - return cdf + lower = self._bc_params["lower"] + rnge = self._qpd_params["rnge"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + c = self._qpd_params["c"] + n = self._qpd_params["n"] + xi = self._qpd_params["xi"] + + phi = self.phi + + phi_ppf = phi.ppf((x - lower) / rnge) + in_cdf = np.arcsinh((phi_ppf - xi) / kappa) / delta - n * c + cdf_arr = phi.cdf(in_cdf) + + return cdf_arr @classmethod def get_test_params(cls, parameter_set="default"): @@ -652,9 +545,9 @@ def get_test_params(cls, parameter_set="default"): params2 = { "alpha": 0.2, "version": "normal", - "qv_low": [-0.3, -0.3, -0.3], - "qv_median": [0.0, 0.0, 0.0], - "qv_high": [0.3, 0.3, 0.3], + "qv_low": [[-0.3], [-0.2], [-0.1]], + "qv_median": [[-0.1], [0.0], [0.1]], + "qv_high": [[0.2], [0.3], [0.4]], "lower": -0.5, "upper": 0.5, "index": pd.RangeIndex(3), @@ -712,14 +605,22 @@ class QPD_U(BaseDistribution): _tags = { # packaging info # -------------- - "authors": ["setoguchi-naoki", "felix-wick"], + "authors": ["setoguchi-naoki", "felix-wick", "fkiraly"], "maintainers": ["setoguchi-naoki"], - "python_dependencies": ["cyclic_boosting>=1.4.0", "findiff"], # estimator tags # -------------- "capabilities:approx": ["pdfnorm", "energy"], "capabilities:exact": ["mean", "var", "cdf", "ppf", "pdf"], "distr:measuretype": "continuous", + "broadcast_init": "on", + "broadcast_params": [ + "alpha", + "qv_low", + "qv_median", + "qv_high", + "lower", + "upper", + ], } def __init__( @@ -747,146 +648,65 @@ def __init__( self.index = index self.columns = columns - from cyclic_boosting.quantile_matching import J_QPD_extended_U + super().__init__(index=index, columns=columns) - qv_low, qv_median, qv_high = _prep_qpd_params(qv_low, qv_median, qv_high) + # precompute parameters for methods + phi = _resolve_phi(version) + self.phi = phi - if index is None: - index = pd.RangeIndex(qv_low.shape[0]) - self.index = index + qpd_params = _prep_qpd_vars(phi=phi, mode="U", **self._bc_params) + self._qpd_params = qpd_params - if columns is None: - columns = pd.RangeIndex(1) - self.columns = columns + def _ppf(self, p: np.ndarray): + """Quantile function = percent point function = inverse cdf.""" + alpha = self._bc_params["alpha"] + xi = self._qpd_params["xi"] + gamma = self._qpd_params["gamma"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] - if version == "normal": - self.phi = norm() - elif version == "logistic": - self.phi = logistic() - else: - raise Exception("Invalid version.") - - if (np.any(qv_low > qv_median)) or np.any(qv_high < qv_median): - warnings.warn( - "The SPT values are not monotonically increasing, " - "each SPT is sorted by value", - stacklevel=2, - ) - idx = np.where((qv_low > qv_median), True, False) + np.where( - (qv_high < qv_median), True, False - ) - un_orderd_idx = np.argwhere(idx > 0).tolist() - warnings.warn(f"sorted index {un_orderd_idx}", stacklevel=2) - for idx in un_orderd_idx: - low, mid, high = sorted([qv_low[idx], qv_median[idx], qv_high[idx]]) - qv_low[idx] = low - qv_median[idx] = mid - qv_high[idx] = high - - iter = np.nditer(qv_low, flags=["c_index"]) - for _i in iter: - jqpd = J_QPD_extended_U( - alpha=alpha, - qv_low=qv_low[iter.index], - qv_median=qv_median[iter.index], - qv_high=qv_high[iter.index], - version=version, - shape=dist_shape, - ) - self.qpd.append(jqpd) + phi = self.phi - super().__init__(index=index, columns=columns) + width = phi.ppf(1 - alpha) + qs = phi.ppf(p) / width - def _mean(self): - """Return expected value of the distribution. - - Please set the upper and lower limits of the random variable correctly. - - Returns - ------- - pd.DataFrame with same rows, columns as `self` - expected value of distribution (entry-wise) - """ - params = self.get_params(deep=False) - lower = params["lower"] - upper = params["upper"] - index = params["index"] - cdf_arr = [] - x = np.linspace(lower, upper, num=int(1e3)) - for qpd in self.qpd: - cdf_arr.append(qpd.cdf(x)) - cdf = np.asarray(cdf_arr) - if cdf.ndim < 2: - cdf = cdf[:, np.newaxis] - loc = exp_func(x, cdf, index.shape[0]) - return loc - - def _var(self): - """Return element/entry-wise variance of the distribution. - - Please set the upper and lower limits of the random variable correctly. - - Returns - ------- - pd.DataFrame with same rows, columns as `self` - variance of distribution (entry-wise) - """ - params = self.get_params(deep=False) - lower = params["lower"] - upper = params["upper"] - index = params["index"] - mean = self.mean().values - cdf_list = [] - x = np.linspace(lower, upper, num=int(1e3)) - for qpd in self.qpd: - cdf_list.append(qpd.cdf(x)) - cdf = np.asarray(cdf_list) - if cdf.ndim < 2: - cdf = cdf[:, np.newaxis] - var = var_func(x, mean, cdf, index.shape[0]) - return var + ppf_arr = xi + kappa * np.sinh((qs - gamma) / delta) + return ppf_arr def _pdf(self, x: np.ndarray): - """Probability density function. + """Probability density function.""" + alpha = self._bc_params["alpha"] + xi = self._qpd_params["xi"] + gamma = self._qpd_params["gamma"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] - this fucntion transform cdf to pdf - because j-qpd's pdf calculation is bit complex - """ - return pdf_func(x, self.qpd) + phi = self.phi - def _ppf(self, p: np.ndarray): - """Quantile function = percent point function = inverse cdf.""" - params = self.get_params(deep=False) - index = params["index"] - columns = params["columns"] - qv_low = params["qv_low"] - p_unique = np.unique(p) # de-broadcast - ppf_all = ppf_func(p_unique, self.qpd) - ppf_map = np.tile(p_unique, (qv_low.size, 1)).T - ppf = np.zeros((index.shape[0], len(columns))) - for r in range(p.shape[0]): - for c in range(p.shape[1]): - t = np.where(ppf_map[:, c] == p[r][c]) - ppf_part = ppf_all[t][c] - ppf[r][c] = ppf_part - return ppf + width = phi.ppf(1 - alpha) + + qs = gamma + delta * np.arcsinh((x - xi) / kappa) + qs_der = delta * arcsinh_der((x - xi) / kappa) / kappa + + # cdf_arr = phi.cdf(qs * width) + pdf_arr = phi.pdf(qs * width) * qs_der + return pdf_arr def _cdf(self, x: np.ndarray): """Cumulative distribution function.""" - params = self.get_params(deep=False) - index = params["index"] - columns = params["columns"] - qv_low = params["qv_low"] - x_unique = np.unique(x) # de-broadcast - cdf_all = cdf_func(x_unique, self.qpd) - cdf_map = np.tile(x_unique, (qv_low.size, 1)).T - cdf = np.zeros((index.shape[0], len(columns))) - for r in range(x.shape[0]): - for c in range(x.shape[1]): - t = np.where(cdf_map[:, c] == x[r][c]) - cdf_part = cdf_all[t][c] - cdf[r][c] = cdf_part - return cdf + alpha = self._bc_params["alpha"] + xi = self._qpd_params["xi"] + gamma = self._qpd_params["gamma"] + delta = self._qpd_params["delta"] + kappa = self._qpd_params["kappa"] + + phi = self.phi + + width = phi.ppf(1 - alpha) + qs = gamma + delta * np.arcsinh((x - xi) / kappa) + + cdf_arr = phi.cdf(qs * width) + return cdf_arr @classmethod def get_test_params(cls, parameter_set="default"): @@ -903,99 +723,142 @@ def get_test_params(cls, parameter_set="default"): params2 = { "alpha": 0.2, "version": "normal", - "qv_low": [-0.3, -0.3, -0.3], - "qv_median": [0.0, 0.0, 0.0], - "qv_high": [0.3, 0.3, 0.3], + "qv_low": [[-0.3], [-0.2], [-0.1]], + "qv_median": [[-0.1], [0.0], [0.1]], + "qv_high": [[0.2], [0.3], [0.4]], "index": pd.RangeIndex(3), "columns": pd.Index(["a"]), } return [params1, params2] -def calc_pdf(cdf: np.ndarray) -> np.ndarray: - """Return pdf value for all samples.""" - from findiff import FinDiff - - dx = 1e-6 - derivative = FinDiff(1, dx, 1) - pdf = np.asarray(derivative(cdf)) - return pdf - - -def exp_func(x: np.ndarray, cdf: np.ndarray, size: int): - """Return Expectation.""" - pdf = calc_pdf(cdf) - x = np.tile(x, (size, 1)) - loc = np.trapz(x * pdf, x, dx=1e-6, axis=1) - return loc - - -def var_func(x: np.ndarray, mu: np.ndarray, cdf: np.ndarray, size: int): - """Return Variance.""" - pdf = calc_pdf(cdf) - x = np.tile(x, (size, 1)) - var = np.trapz(((x - mu) ** 2) * pdf, x, dx=1e-6, axis=1) - return var - - -def pdf_func(x: np.ndarray, qpd: J_QPD_S | J_QPD_B | list): - """Return pdf value.""" - pdf = np.zeros_like(x) - for r in range(x.shape[0]): - for c in range(x.shape[1]): - element = x[r][c] - x0 = np.linspace(element, element + 1e-3, num=3) - if isinstance(qpd, list): - cdf = np.asarray([func.cdf(x0) for func in qpd]) - cdf = cdf.reshape(cdf.shape[0], -1) - else: - cdf = qpd.cdf(x0) - if cdf.ndim < 2: - for _ in range(2 - cdf.ndim): - cdf = cdf[:, np.newaxis] - cdf = cdf.T - pdf_part = calc_pdf(cdf) - pdf[r][c] = pdf_part[0][0] - return pdf - - -def ppf_func(x: np.ndarray, qpd: J_QPD_S | J_QPD_B | list): - """Return ppf value.""" - if isinstance(qpd, list): - ppf = np.asarray([func.ppf(x) for func in qpd]) - ppf = ppf.reshape(ppf.shape[0], -1) - else: - ppf = qpd.ppf(x) - if ppf.ndim < 2: - for _ in range(2 - ppf.ndim): - ppf = ppf[np.newaxis] - ppf = ppf.T - return ppf - - -def cdf_func(x: np.ndarray, qpd: J_QPD_S | J_QPD_B | list): - """Return cdf value.""" - if isinstance(qpd, list): - cdf = np.asarray([func.cdf(x) for func in qpd]) - cdf = cdf.reshape(cdf.shape[0], -1) +def _resolve_phi(phi): + """Resolve base distribution.""" + if phi == "normal": + return norm() + elif phi == "logistic": + return logistic() else: - cdf = qpd.cdf(x) - if cdf.ndim < 2: - for _ in range(2 - cdf.ndim): - cdf = cdf[np.newaxis] - cdf = cdf.T - return cdf - - -def _prep_qpd_params(qv_low, qv_median, qv_high): - """Prepare parameters for Johnson Quantile-Parameterized Distributions.""" - qv = [qv_low, qv_median, qv_high] - for i, instance in enumerate(qv): - if isinstance(instance, float): - qv[i] = np.array([qv[i]]) - elif isinstance(instance, Sequence): - qv[i] = np.asarray(qv[i]) - qv_low = qv[0].flatten() - qv_median = qv[1].flatten() - qv_high = qv[2].flatten() - return qv_low, qv_median, qv_high + return phi + + +def _prep_qpd_vars( + alpha, + qv_low, + qv_median, + qv_high, + lower, + upper, + phi, + mode="B", + **kwargs, +): + """Prepare parameters for Johnson Quantile-Parameterized Distributions. + + Parameters + ---------- + alpha : 2D np.array + lower quantile of SPT (upper is ``1 - alpha``) + qv_low : 2D np.array + quantile function value of ``alpha`` + qv_median : 2D np.array + quantile function value of quantile 0.5 + qv_high : 2D np.array + quantile function value of quantile ``1 - alpha`` + lower : 2D np.array + lower bound of range. + upper : 2D np.array + upper bound of range. + phi : scipy.stats.rv_continuous + base distribution + mode : str + options are ``B`` (default) or ``S`` + B = bounded mode, S = lower semi-bounded mode + """ + c = phi.ppf(1 - alpha) + + if mode == "U": + lower = 0 + + qll = qv_low - lower + qml = qv_median - lower + qhl = qv_high - lower + + if mode == "B": + rnge = upper - lower + + def tfun(x): + return phi.ppf(x / rnge) + + elif mode == "S": + tfun = np.log + elif mode == "U": + + def tfun(x): + return x + + L = tfun(qll) + H = tfun(qhl) + B = tfun(qml) + HL = H - L + BL = B - L + HB = H - B + LH2B = L + H - 2 * B + + HBL = np.where(BL < HB, BL, HB) + + n = np.where(LH2B > 0, 1, -1) + n = np.where(LH2B == 0, 0, n) + + if mode in ["B", "U"]: + xi = np.where(LH2B > 0, L, H) + xi = np.where(LH2B == 0, B, xi) + if mode == "S": + theta = np.where(LH2B > 0, qll, qhl) + theta = np.where(LH2B == 0, qml, theta) + if mode == "U": + theta = np.where(LH2B > 0, BL / HL, HB / HL) + + if mode in ["B", "S"]: + in_arccosh = HL / (2 * HBL) + delta_unn = np.arccosh(in_arccosh) + if mode == "S": + delta_unn = np.sinh(delta_unn) + delta = delta_unn / c + elif mode == "U": + delta = 1.0 / np.arccosh(1 / (2.0 * theta)) + delta = np.where(LH2B == 0, 1, delta) + + if mode == "B": + kappa = HL / np.sinh(2 * delta * c) + elif mode == "S": + kappa = HBL / (delta * c) + elif mode == "U": + kappa = HL / np.sinh(2.0 / delta) + kappa = np.where(LH2B == 0, HB, kappa) + + params = { + "c": c, + "L": L, + "H": H, + "B": B, + "n": n, + "delta": delta, + "kappa": kappa, + } + + if mode == "S": + params["theta"] = theta + if mode == "B": + params["rnge"] = rnge + if mode in ["B", "U"]: + params["xi"] = xi + if mode == "U": + params["gamma"] = -np.sign(LH2B) + + return params + + +def arcsinh_der(x): + """Return derivative of arcsinh.""" + return 1 / np.sqrt(1 + x**2) diff --git a/skpro/distributions/tests/test_qpd.py b/skpro/distributions/tests/test_qpd.py index 78c418c1a..0bf070516 100644 --- a/skpro/distributions/tests/test_qpd.py +++ b/skpro/distributions/tests/test_qpd.py @@ -1,5 +1,6 @@ """Tests for quantile-parameterized distributions.""" +import numpy as np import pytest from skpro.distributions.qpd import QPD_B, QPD_S, QPD_U @@ -24,6 +25,22 @@ def test_qpd_b_simple_use(): qpd.mean() +def test_qpd_b_pdf(): + """Test pdf of qpd with bounded mode.""" + # these parameters should produce a uniform on -0.5, 0.5 + qpd_linear = QPD_B( + alpha=0.2, + qv_low=-0.3, + qv_median=0, + qv_high=0.3, + lower=-0.5, + upper=0.5, + ) + x = np.linspace(-0.45, 0.45, 100) + pdf_vals = [qpd_linear.pdf(x_) for x_ in x] + np.testing.assert_allclose(pdf_vals, 1.0, rtol=1e-5) + + @pytest.mark.skipif( not run_test_for_class(QPD_S), reason="run test only if softdeps are present and incrementally (if requested)",