diff --git a/src/eddymotion/model/_dipy.py b/src/eddymotion/model/_dipy.py index a74c6ced..b4e1971d 100644 --- a/src/eddymotion/model/_dipy.py +++ b/src/eddymotion/model/_dipy.py @@ -1,7 +1,7 @@ # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: # -# Copyright The NiPreps Developers +# © The NiPreps Developers # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,106 +20,42 @@ # # https://www.nipreps.org/community/licensing/ # -r""" -DIPY-like models (a sandbox to trial them out before upstreaming to DIPY). - -Gaussian Process Model: Pairwise orientation angles ---------------------------------------------------- -Squared Exponential covariance kernel -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Kernel based on a squared exponential function for Gaussian processes on -multi-shell DWI data following to eqs. 14 and 16 in [Andersson15]_. -For a 2-shell case, the :math:`\mathbf{K}` kernel can be written as: - -.. math:: - \begin{equation} - \mathbf{K} = \left[ - \begin{matrix} - \lambda C_{\theta}(\theta (\mathbf{G}_{1}); a) + \sigma_{1}^{2} \mathbf{I} & - \lambda C_{\theta}(\theta (\mathbf{G}_{2}, \mathbf{G}_{1}); a) C_{b}(b_{2}, b_{1}; \ell) \\ - \lambda C_{\theta}(\theta (\mathbf{G}_{1}, \mathbf{G}_{2}); a) C_{b}(b_{1}, b_{2}; \ell) & - \lambda C_{\theta}(\theta (\mathbf{G}_{2}); a) + \sigma_{2}^{2} \mathbf{I} \\ - \end{matrix} - \right] - \end{equation} - -**Squared exponential shell-wise covariance kernel**: -Compute the squared exponential smooth function describing how the -covariance changes along the b direction. -It uses the log of the b-values as the measure of distance along the -b-direction according to eq. 15 in [Andersson15]_. - -.. math:: - C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right). - -**Squared exponential covariance kernel**: -Compute the squared exponential covariance matrix following to eq. 14 in [Andersson15]_. - -.. math:: - k(\textbf{x}, \textbf{x'}) = C_{\theta}(\mathbf{g}, \mathbf{g'}; a) C_{b}(|b - b'|; \ell) - -where :math:`C_{\theta}` is given by: - -.. math:: - \begin{equation} - C(\theta) = - \begin{cases} - 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ - 0 & \textnormal{if} \; \theta > a - \end{cases} - \end{equation} - -:math:`\theta` being computed as: - -.. math:: - \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) - -and :math:`C_{b}` is given by: - -.. math:: - C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right) - -:math:`b` and :math:`b'` being the b-values, and :math:`\mathbf{g}` and -:math:`\mathbf{g'}` the unit diffusion-encoding gradient unit vectors of the -shells; and :math:`{a, \ell}` some hyperparameters. - -""" +"""DIPY-like models (a sandbox to trial them out before upstreaming to DIPY).""" from __future__ import annotations import warnings -from sys import modules import numpy as np -from dipy.core.gradients import GradientTable, gradient_table +from dipy.core.gradients import GradientTable from dipy.reconst.base import ReconstModel from sklearn.gaussian_process import GaussianProcessRegressor -from sklearn.gaussian_process.kernels import ( - DotProduct, - Hyperparameter, - Kernel, - WhiteKernel, + +from eddymotion.model._sklearn import ( + EddyMotionGPR, + ExponentialKriging, + SphericalKriging, ) def gp_prediction( - model_gtab: GradientTable, gtab: GradientTable | np.ndarray, model: GaussianProcessRegressor, mask: np.ndarray | None = None, + return_std: bool = False, ) -> np.ndarray: """ Predicts one or more DWI orientations given a model. This function checks if the model is fitted and then extracts - orientations and potentially b-values from the gtab. It predicts the mean + orientations and potentially b-values from the X. It predicts the mean and standard deviation of the DWI signal using the model. Parameters ---------- model: :obj:`~sklearn.gaussian_process.GaussianProcessRegressor` A fitted GaussianProcessRegressor model. - gtab: :obj:`~dipy.core.gradients.GradientTable` + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` Gradient table with one or more orientations at which the GP will be evaluated. mask: :obj:`numpy.ndarray` A boolean mask indicating which voxels to use (optional). @@ -131,14 +67,16 @@ def gp_prediction( """ + X = gtab.bvecs.T if hasattr("bvecs") else np.asarray(gtab) + # Check it's fitted as they do in sklearn internally # https://github.com/scikit-learn/scikit-learn/blob/972e17fe1aa12d481b120ad4a3dc076bae736931/\ # sklearn/gaussian_process/_gpr.py#L410C9-L410C42 if not hasattr(model, "X_train_"): raise RuntimeError("Model is not yet fitted.") - # Extract orientations from gtab, and highly likely, the b-value too. - return model.predict(gtab, return_std=False) + # Extract orientations from bvecs, and highly likely, the b-value too. + return model.predict(X, return_std=return_std) class GaussianProcessModel(ReconstModel): @@ -162,7 +100,7 @@ def __init__( Parameters ---------- - kernel_model : :obj:`str` + kernel : :obj:`~sklearn.gaussian_process.kernels.Kernel` Kernel model to calculate the GP's covariance matrix. References @@ -176,21 +114,19 @@ def __init__( """ ReconstModel.__init__(self, None) - self.kernel = ( - PairwiseOrientationKernel( - weighting=kernel_model, - a=a, - lambda_s=lambda_s, - sigma_sq=sigma_sq, - ) - if kernel_model != "test" - else DotProduct() + WhiteKernel() + + self.sigma_sq = sigma_sq + + KernelType = SphericalKriging if kernel_model == "spherical" else ExponentialKriging + self.kernel = KernelType( + a=a, + lambda_s=lambda_s, ) def fit( self, data: np.ndarray, - gtab: GradientTable | None = None, + gtab: GradientTable | np.ndarray, mask: np.ndarray[bool] | None = None, random_state: int = 0, ) -> GPFit: @@ -198,10 +134,10 @@ def fit( Parameters ---------- - data : :obj:`~numpy.ndarray` - The measured signal from one voxel. - gtab : :obj:`~dipy.core.gradients.GradientTable` + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` The gradient table corresponding to the training data. + y : :obj:`~numpy.ndarray` + The measured signal from one voxel. mask : :obj:`~numpy.ndarray` A boolean array used to mark the coordinates in the data that should be analyzed that has the shape data.shape[:-1] @@ -213,38 +149,37 @@ def fit( """ + # Extract b-vecs: Scikit learn wants (n_samples, n_features) + # where n_features is 3, and n_samples the different diffusion orientations. + X = gtab.bvecs.T if hasattr("bvecs") else np.asarray(gtab) + + # Data must be shapes (n_samples, n_targets) where n_samples is + # the number of diffusion orientations, and n_targets is number of voxels. y = ( data[mask[..., None]] if mask is not None else np.reshape(data, (-1, data.shape[-1])) ).T - # sklearn wants (n_samples, n_features) as X's shape - X = gtab.bvecs - - # sklearn wants (n_samples, n_targets) for Y, where n_targets = n_voxels to simulate. - if (signal_dirs := y.shape[0]) != (grad_dirs := X.shape[0]): + if (grad_dirs := X.shape[0]) != (signal_dirs := y.shape[0]): raise ValueError( - f"Mismatched data {signal_dirs} and gradient table {grad_dirs} sizes." + f"Mismatched gradient directions in data ({signal_dirs}) " + f"and gradient table ({grad_dirs})." ) - gpr = GaussianProcessRegressor( + gpr = EddyMotionGPR( kernel=self.kernel, random_state=random_state, n_targets=y.shape[1], + alpha=self.sigma_sq, ) self._modelfit = GPFit( - gtab=gtab, model=gpr.fit(X, y), mask=mask, ) return self._modelfit - # @multi_voxel_fit - # def multi_fit(self, data_thres, mask=None, **kwargs): - # return GPFit(gpr.fit(self.gtab, data_thres)) - def predict( self, - gtab: GradientTable, + gtab: GradientTable | np.ndarray, **kwargs, ) -> np.ndarray: """ @@ -252,8 +187,8 @@ def predict( Parameters ---------- - gtab : :obj:`~dipy.core.gradients.GradientTable` - One or more gradient orientations at which the GP will be evaluated. + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` + Gradient table with one or more orientations at which the GP will be evaluated. Returns ------- @@ -273,8 +208,6 @@ class GPFit: Attributes ---------- - fitted_gtab : :obj:`~dipy.core.gradients.GradientTable` - The original gradient table with which the GP has been fitted. model: :obj:`~sklearn.gaussian_process.GaussianProcessRegressor` The fitted Gaussian process regressor object. mask: :obj:`~numpy.ndarray` @@ -284,7 +217,6 @@ class GPFit: def __init__( self, - gtab: GradientTable, model: GaussianProcessRegressor, mask: np.ndarray | None = None, ) -> None: @@ -293,28 +225,25 @@ def __init__( Parameters ---------- - gtab : :obj:`~dipy.core.gradients.GradientTable` - The gradient table with which the GP has been fitted. model: :obj:`~sklearn.gaussian_process.GaussianProcessRegressor` The fitted Gaussian process regressor object. mask: :obj:`~numpy.ndarray` The boolean mask used during fitting (can be ``None``). """ - self.fitted_gtab = gtab self.model = model self.mask = mask def predict( self, - gtab: GradientTable, + gtab: GradientTable | np.ndarray, ) -> np.ndarray: """ Generate DWI signal based on a fitted Gaussian Process. Parameters ---------- - gtab: :obj:`~dipy.core.gradients.GradientTable` + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` Gradient table with one or more orientations at which the GP will be evaluated. Returns @@ -323,374 +252,7 @@ def predict( A 3D or 4D array with the simulated gradient(s). """ - return gp_prediction(self.fitted_gtab, gtab, self.model, mask=self.mask) - - -def _ensure_positive_scale( - a: float, -) -> None: - if a <= 0: - raise ValueError(f"a must be strictly positive. Provided: {a}") - - -def compute_exponential_covariance( - theta: np.ndarray, - a: float, -) -> np.ndarray: - r"""Compute the exponential covariance matrix following eq. 9 in [Andersson15]_. - - .. math:: - - C(\theta) = \exp(- \frac{\theta}{a}) - - Parameters - ---------- - theta : :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - a : :obj:`float` - Positive scale parameter that here determines the "distance" at which θ - the covariance goes to zero. - - Returns - ------- - :obj:`~numpy.ndarray` - Exponential covariance function values. - - """ - - _ensure_positive_scale(a) - - return np.exp(-theta / a) - - -def compute_spherical_covariance( - theta: np.ndarray, - a: float, -) -> np.ndarray: - r"""Compute the spherical covariance matrix following eq. 10 in [Andersson15]_. - - .. math:: - - C(\theta) = \begin{cases} - 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ - 0 & \textnormal{if} \; \theta > a - \end{cases} - - Parameters - ---------- - theta : :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - a : :obj:`float` - Positive scale parameter that here determines the "distance" at which θ - the covariance goes to zero. - - Returns - ------- - :obj:`~numpy.ndarray` - Spherical covariance matrix. - - """ - - _ensure_positive_scale(a) - - return np.where(theta <= a, 1 - 3 * theta / (2 * a) + theta**3 / (2 * a**3), 0) - - -def compute_derivative( - theta: np.ndarray, - kernel: np.ndarray, - weighting: str, - params: dict[float], -): - """ - Compute the analytical derivative of the kernel. - - Parameters - ---------- - theta : :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - kernel : :obj:`~numpy.ndarray` - Current kernel. - weighting : :obj:`str` - The kind of kernel which derivatives will be calculated. - params : :obj:`dict` - Current parameter values. - - Returns - ------- - :obj:`~numpy.ndarray` - Gradients of the kernel. - - """ - - n_partials = len(params) - - a = params.pop("a") - lambda_s = params.pop("lambda_s") - - min_angles = theta > a - - if weighting == "spherical": - deriv_a = 1.5 * (theta[min_angles] / a**2 - theta[min_angles] ** 3 / a**4) - elif weighting == "exponential": - deriv_a = np.exp(-theta[min_angles] / a) * theta[min_angles] / a**2 - else: - raise ValueError(f"Unknown kernel weighting '{weighting}'.") - - K_gradient = np.zeros((*theta.shape, n_partials)) - K_gradient[..., 0] = kernel / lambda_s - K_gradient[..., 1][min_angles] = lambda_s * deriv_a - K_gradient[..., 2][theta > 1e-5] = 2 - - return K_gradient - - -def compute_pairwise_angles( - gtab_X: GradientTable | np.ndarray, - gtab_Y: GradientTable | np.ndarray | None = None, - closest_polarity: bool = True, -) -> np.ndarray: - r"""Compute pairwise angles across diffusion gradient encoding directions. - - Following [Andersson15]_, it computes the smallest of the angles between - each pair if ``closest_polarity`` is ``True``, i.e. - - .. math:: - - \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) - - Parameters - ---------- - gtab_X: :obj:`~dipy.core.gradients.GradientTable` - Gradient table - gtab_Y: :obj:`~dipy.core.gradients.GradientTable` - Gradient table - closest_polarity : :obj:`bool` - ``True`` to consider the smallest of the two angles between the crossing - lines resulting from reversing each vector pair. - - Returns - ------- - :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - - Examples - -------- - >>> from dipy.core.gradients import gradient_table - >>> bvecs = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> gtab = gradient_table([1000] * bvecs.shape[-1], bvecs) - >>> compute_pairwise_angles(gtab, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS - 3.1415... - >>> bvecs1 = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> bvecs2 = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> gtab1 = gradient_table([1000] * bvecs1.shape[-1], bvecs1) - >>> gtab2 = gradient_table([1000] * bvecs2.shape[-1], bvecs2) - >>> compute_pairwise_angles(gtab1, gtab2, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS - 3.1415... - >>> bvecs = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> gtab = gradient_table([1000] * bvecs.shape[-1], bvecs) - >>> compute_pairwise_angles(gtab)[0, 1] - 0.0 - - References - ---------- - .. [Andersson15] J. L. R. Andersson. et al., An integrated approach to - correction for off-resonance effects and subject movement in diffusion MR - imaging, NeuroImage 125 (2016) 1063–1078 - - """ - - bvecs_X = getattr(gtab_X, "bvecs", gtab_X) - bvecs_X = np.array(bvecs_X.T) / np.linalg.norm(bvecs_X, axis=1) - - if gtab_Y is None: - bvecs_Y = bvecs_X - else: - bvecs_Y = np.array(getattr(gtab_Y, "bvecs", gtab_Y)) - if bvecs_Y.ndim == 1: - bvecs_Y = bvecs_Y[np.newaxis, ...] - bvecs_Y = bvecs_Y.T / np.linalg.norm(bvecs_Y, axis=1) - - cosines = np.clip(bvecs_X.T @ bvecs_Y, -1.0, 1.0) - return np.arccos(np.abs(cosines) if closest_polarity else cosines) - - -class PairwiseOrientationKernel(Kernel): - """A scikit-learn's kernel for DWI signals.""" - - def __init__( - self, - weighting: str = "exponential", - lambda_s: float = 2.0, - a: float = 0.1, - sigma_sq: float = 1.0, - lambda_s_bounds: tuple[float, float] = (1e-5, 1e4), - a_bounds: tuple[float, float] = (1e-5, np.pi), - sigma_sq_bounds: tuple[float, float] = (1e-5, 1e4), - ): - r""" - Initialize a kernel with pairwise angles. - - Parameters - ---------- - weighting : :obj:`str` - The type of kernel to build (either "exponential", "sphere", or "test"). - lambda_s : :obj:`float` - The :math:`\lambda_s` hyperparameter. - a : :obj:`float` - Minimum angle in rads. - sigma_sq : :obj:`float` - Error allowed in collinear orientations. - lambda_s_bounds : :obj:`tuple` - Bounds for the :math:`\lambda_s` hyperparameter. - a_bounds : :obj:`tuple` - Bounds for the a parameter. - sigma_sq_bounds : :obj:`tuple` - Bounds for the error parameter. - - """ - self._weighting = weighting # For __repr__ - self.lambda_s = lambda_s - self.a = a - self.sigma_sq = sigma_sq - self.lambda_s_bounds = lambda_s_bounds - self.a_bounds = a_bounds - self.sigma_sq_bounds = sigma_sq_bounds - - @property - def hyperparameter_lambda_s(self): - return Hyperparameter("lambda_s", "numeric", self.lambda_s_bounds) - - @property - def hyperparameter_a(self): - return Hyperparameter("a", "numeric", self.a_bounds) - - @property - def hyperparameter_sigma_sq(self): - return Hyperparameter("sigma_sq", "numeric", self.sigma_sq_bounds) - - def __call__(self, gtab_X, gtab_Y=None, eval_gradient=False): - """ - Return the kernel K(gtab_X, gtab_Y) and optionally its gradient. - - Parameters - ---------- - gtab_X: :obj:`~dipy.core.gradients.GradientTable` - Gradient table (X) - gtab_Y: :obj:`~dipy.core.gradients.GradientTable`, optional - Gradient table (Y, optional) - eval_gradient : :obj:`bool`, optional - Determines whether the gradient with respect to the log of - the kernel hyperparameter is computed. - Only supported when gtab_Y is ``None``. - - Returns - ------- - K : ndarray of shape (n_samples_X, n_samples_Y) - Kernel k(X, Y) - - K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ - optional - The gradient of the kernel k(X, X) with respect to the log of the - hyperparameter of the kernel. Only returned when `eval_gradient` - is True. - - """ - thetas = compute_pairwise_angles(gtab_X, gtab_Y) - collinear = np.abs(thetas) < 1e-5 - thetas[collinear] = 0.0 - - covfunc = getattr(modules[__name__], f"compute_{self._weighting}_covariance") - - K = self.lambda_s * covfunc(thetas, self.a) - K[collinear] += self.sigma_sq - - if not eval_gradient: - return K - - if gtab_Y is not None: - raise RuntimeError("Gradients should not be calculated in inference time") - - K_gradient = compute_derivative( - thetas, - K, - weighting=self._weighting, - params=self.get_params(), - ) - - return K, K_gradient - - def diag(self, X): - """Returns the diagonal of the kernel k(X, X). - - The result of this method is identical to np.diag(self(X)); however, - it can be evaluated more efficiently since only the diagonal is - evaluated. - - Parameters - ---------- - X : ndarray of shape (n_samples_X, n_features) - Left argument of the returned kernel k(X, Y) - - Returns - ------- - K_diag : ndarray of shape (n_samples_X,) - Diagonal of kernel k(X, X) - """ - try: - n = len(X) - except TypeError: - n = len(X.bvals) - - covfunc = getattr(modules[__name__], f"compute_{self._weighting}_covariance") - return self.lambda_s * covfunc(np.zeros(n), self.a) + self.sigma_sq - - def is_stationary(self): - """Returns whether the kernel is stationary.""" - return True - - def __repr__(self): - return ( - f"{self._weighting} kernel" - f"(a={self.a}, lambda_s={self.lambda_s}, sigma_sq={self.sigma_sq})" - ) - - def get_params(self, deep=True): - """ - Get parameters of the kernel. - - Parameters - ---------- - deep : :obj:`bool` - Whether to return the parameters of the contained subobjects. - - Returns - ------- - params : :obj:`dict` - Parameter names mapped to their values. - - """ - return {"lambda_s": self.lambda_s, "a": self.a, "sigma_sq": self.sigma_sq} - - def set_params(self, **params): - """ - Set parameters of the kernel. - - Parameters - ---------- - params : :obj:`dict` - Kernel parameters. - - Returns - ------- - self : :obj:`object` - Returns self. - - """ - self.lambda_s = params.get("lambda_s", self.lambda_s) - self.a = params.get("a", self.a) - self.sigma_sq = params.get("sigma_sq", self.sigma_sq) - return self + return gp_prediction(gtab, self.model, mask=self.mask) def _rasb2dipy(gradient): @@ -706,6 +268,8 @@ def _rasb2dipy(gradient): print("Warning: make sure gradient information is not transposed!") with warnings.catch_warnings(): + from dipy.core.gradients import gradient_table + warnings.filterwarnings("ignore", category=UserWarning) retval = gradient_table(gradient[3, :], gradient[:3, :].T) return retval diff --git a/src/eddymotion/model/_sklearn.py b/src/eddymotion/model/_sklearn.py new file mode 100644 index 00000000..8e69cc6d --- /dev/null +++ b/src/eddymotion/model/_sklearn.py @@ -0,0 +1,435 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# © The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +r""" +Derivations from scikit-learn for Gaussian Processes. + +Gaussian Process Model: Pairwise orientation angles +--------------------------------------------------- +Squared Exponential covariance kernel +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Kernel based on a squared exponential function for Gaussian processes on +multi-shell DWI data following to eqs. 14 and 16 in [Andersson15]_. +For a 2-shell case, the :math:`\mathbf{K}` kernel can be written as: + +.. math:: + \begin{equation} + \mathbf{K} = \left[ + \begin{matrix} + \lambda C_{\theta}(\theta (\mathbf{G}_{1}); a) + \sigma_{1}^{2} \mathbf{I} & + \lambda C_{\theta}(\theta (\mathbf{G}_{2}, \mathbf{G}_{1}); a) C_{b}(b_{2}, b_{1}; \ell) \\ + \lambda C_{\theta}(\theta (\mathbf{G}_{1}, \mathbf{G}_{2}); a) C_{b}(b_{1}, b_{2}; \ell) & + \lambda C_{\theta}(\theta (\mathbf{G}_{2}); a) + \sigma_{2}^{2} \mathbf{I} \\ + \end{matrix} + \right] + \end{equation} + +**Squared exponential shell-wise covariance kernel**: +Compute the squared exponential smooth function describing how the +covariance changes along the b direction. +It uses the log of the b-values as the measure of distance along the +b-direction according to eq. 15 in [Andersson15]_. + +.. math:: + C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right). + +**Squared exponential covariance kernel**: +Compute the squared exponential covariance matrix following to eq. 14 in [Andersson15]_. + +.. math:: + k(\textbf{x}, \textbf{x'}) = C_{\theta}(\mathbf{g}, \mathbf{g'}; a) C_{b}(|b - b'|; \ell) + +where :math:`C_{\theta}` is given by: + +.. math:: + \begin{equation} + C(\theta) = + \begin{cases} + 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ + 0 & \textnormal{if} \; \theta > a + \end{cases} + \end{equation} + +:math:`\theta` being computed as: + +.. math:: + \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) + +and :math:`C_{b}` is given by: + +.. math:: + C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right) + +:math:`b` and :math:`b'` being the b-values, and :math:`\mathbf{g}` and +:math:`\mathbf{g'}` the unit diffusion-encoding gradient unit vectors of the +shells; and :math:`{a, \ell}` some hyperparameters. + +""" + +from __future__ import annotations + +from typing import Callable, Sequence + +import numpy as np +from scipy import optimize +from scipy.optimize._minimize import Bounds +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import ( + Hyperparameter, + Kernel, +) +from sklearn.metrics.pairwise import cosine_similarity + +BOUNDS_A: tuple[float, float] = (1e-4, np.pi) +BOUNDS_LAMBDA_S: tuple[float, float] = (1e-4, 1e4) +THETA_EPSILON: float = 1e-5 + + +class EddyMotionGPR(GaussianProcessRegressor): + def __init__( + self, + kernel: Kernel | None = None, + *, + alpha: float = 1e-10, + optimizer: str | Callable | None = "fmin_l_bfgs_b", + n_restarts_optimizer: int = 0, + normalize_y: bool = True, + copy_X_train: bool = True, + n_targets: int | None = None, + random_state: int | None = None, + max_iter: int = 2e05, + gtol: float = 1e-06, + ): + self.max_iter = max_iter + self.gtol = gtol + + super().__init__( + kernel, + alpha=alpha, + optimizer=optimizer, + n_restarts_optimizer=n_restarts_optimizer, + normalize_y=normalize_y, + copy_X_train=copy_X_train, + n_targets=n_targets, + random_state=random_state, + ) + + def _constrained_optimization( + self, + obj_func: Callable, + initial_theta: np.ndarray, + bounds: Sequence[tuple[float, float]] | Bounds, + ) -> tuple[float, float]: + from sklearn.utils.optimize import _check_optimize_result + + opt_res = optimize.minimize( + obj_func, + initial_theta, + method="L-BFGS-B", + jac=True, + bounds=bounds, + options={"maxiter": self.max_iter, "gtol": self.gtol}, + ) + _check_optimize_result("lbfgs", opt_res) + return opt_res.x, opt_res.fun + + +class ExponentialKriging(Kernel): + """A scikit-learn's kernel for DWI signals.""" + + def __init__( + self, + a: float = 0.01, + lambda_s: float = 2.0, + a_bounds: tuple[float, float] = BOUNDS_A, + lambda_s_bounds: tuple[float, float] = BOUNDS_LAMBDA_S, + ): + r""" + Initialize an exponential Kriging kernel. + + Parameters + ---------- + a : :obj:`float` + Minimum angle in rads. + lambda_s : :obj:`float` + The :math:`\lambda_s` hyperparameter. + a_bounds : :obj:`tuple` + Bounds for the a parameter. + lambda_s_bounds : :obj:`tuple` + Bounds for the :math:`\lambda_s` hyperparameter. + + """ + self.a = a + self.lambda_s = lambda_s + self.a_bounds = a_bounds + self.lambda_s_bounds = lambda_s_bounds + + @property + def hyperparameter_a(self) -> Hyperparameter: + return Hyperparameter("a", "numeric", self.a_bounds) + + @property + def hyperparameter_lambda_s(self) -> Hyperparameter: + return Hyperparameter("lambda_s", "numeric", self.lambda_s_bounds) + + def __call__( + self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False + ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: + """ + Return the kernel K(X, Y) and optionally its gradient. + + Parameters + ---------- + X: :obj:`~numpy.ndarray` + Gradient table (X) + Y: :obj:`~numpy.ndarray`, optional + Gradient table (Y, optional) + eval_gradient : :obj:`bool`, optional + Determines whether the gradient with respect to the log of + the kernel hyperparameter is computed. + Only supported when Y is ``None``. + + Returns + ------- + K : ndarray of shape (n_samples_X, n_samples_Y) + Kernel k(X, Y) + + K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ + optional + The gradient of the kernel k(X, X) with respect to the log of the + hyperparameter of the kernel. Only returned when `eval_gradient` + is True. + + """ + thetas = compute_pairwise_angles(X, Y) + thetas[np.abs(thetas) < THETA_EPSILON] = 0.0 + C_theta = np.exp(-thetas / self.a) + + if not eval_gradient: + return self.lambda_s * C_theta + + K_gradient = np.zeros((*thetas.shape, 2)) + K_gradient[..., 0] = self.lambda_s * C_theta * thetas / self.a**2 # Derivative w.r.t. a + K_gradient[..., 1] = C_theta + + return self.lambda_s * C_theta, K_gradient + + def diag(self, X: np.ndarray) -> np.ndarray: + """Returns the diagonal of the kernel k(X, X). + + The result of this method is identical to np.diag(self(X)); however, + it can be evaluated more efficiently since only the diagonal is + evaluated. + + Parameters + ---------- + X : ndarray of shape (n_samples_X, n_features) + Left argument of the returned kernel k(X, Y) + + Returns + ------- + K_diag : ndarray of shape (n_samples_X,) + Diagonal of kernel k(X, X) + """ + return self.lambda_s * np.ones(X.shape[0]) + + def is_stationary(self) -> bool: + """Returns whether the kernel is stationary.""" + return True + + def __repr__(self) -> str: + return f"ExponentialKriging (a={self.a}, λₛ={self.lambda_s})" + + +class SphericalKriging(Kernel): + """A scikit-learn's kernel for DWI signals.""" + + def __init__( + self, + a: float = 0.01, + lambda_s: float = 2.0, + a_bounds: tuple[float, float] = BOUNDS_A, + lambda_s_bounds: tuple[float, float] = BOUNDS_LAMBDA_S, + ): + r""" + Initialize a spherical Kriging kernel. + + Parameters + ---------- + a : :obj:`float` + Minimum angle in rads. + lambda_s : :obj:`float` + The :math:`\lambda_s` hyperparameter. + a_bounds : :obj:`tuple` + Bounds for the a parameter. + lambda_s_bounds : :obj:`tuple` + Bounds for the :math:`\lambda_s` hyperparameter. + + """ + self.a = a + self.lambda_s = lambda_s + self.a_bounds = a_bounds + self.lambda_s_bounds = lambda_s_bounds + + @property + def hyperparameter_a(self) -> Hyperparameter: + return Hyperparameter("a", "numeric", self.a_bounds) + + @property + def hyperparameter_lambda_s(self) -> Hyperparameter: + return Hyperparameter("lambda_s", "numeric", self.lambda_s_bounds) + + def __call__( + self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False + ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: + """ + Return the kernel K(X, Y) and optionally its gradient. + + Parameters + ---------- + X: :obj:`~numpy.ndarray` + Gradient table (X) + Y: :obj:`~numpy.ndarray`, optional + Gradient table (Y, optional) + eval_gradient : :obj:`bool`, optional + Determines whether the gradient with respect to the log of + the kernel hyperparameter is computed. + Only supported when Y is ``None``. + + Returns + ------- + K : ndarray of shape (n_samples_X, n_samples_Y) + Kernel k(X, Y) + + K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ + optional + The gradient of the kernel k(X, X) with respect to the log of the + hyperparameter of the kernel. Only returned when `eval_gradient` + is True. + + """ + thetas = compute_pairwise_angles(X, Y) + thetas[np.abs(thetas) < THETA_EPSILON] = 0.0 + + nonzero = thetas <= self.a + + C_theta = np.zeros_like(thetas) + C_theta[nonzero] = ( + 1 - 1.5 * thetas[nonzero] / self.a + 0.5 * thetas[nonzero] ** 3 / self.a**3 + ) + + if not eval_gradient: + return self.lambda_s * C_theta + + deriv_a = np.zeros_like(thetas) + deriv_a[nonzero] = ( + 1.5 * self.lambda_s * (thetas[nonzero] / self.a**2 - thetas[nonzero] ** 3 / self.a**4) + ) + K_gradient = np.dstack((deriv_a, C_theta)) + + return self.lambda_s * C_theta, K_gradient + + def diag(self, X: np.ndarray) -> np.ndarray: + """Returns the diagonal of the kernel k(X, X). + + The result of this method is identical to np.diag(self(X)); however, + it can be evaluated more efficiently since only the diagonal is + evaluated. + + Parameters + ---------- + X : ndarray of shape (n_samples_X, n_features) + Left argument of the returned kernel k(X, Y) + + Returns + ------- + K_diag : ndarray of shape (n_samples_X,) + Diagonal of kernel k(X, X) + """ + return self.lambda_s * np.ones(X.shape[0]) + + def is_stationary(self) -> bool: + """Returns whether the kernel is stationary.""" + return True + + def __repr__(self) -> str: + return f"SphericalKriging (a={self.a}, λₛ={self.lambda_s})" + + +def compute_pairwise_angles( + X: np.ndarray, + Y: np.ndarray | None = None, + closest_polarity: bool = True, + dense_output: bool = True, +) -> np.ndarray: + r"""Compute pairwise angles across diffusion gradient encoding directions. + + Following [Andersson15]_, it computes the smallest of the angles between + each pair if ``closest_polarity`` is ``True``, i.e., + + .. math:: + + \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) + + Parameters + ---------- + X : {array-like, sparse matrix} of shape (n_samples_X, n_features) + Input data. + Y : {array-like, sparse matrix} of shape (n_samples_Y, n_features), \ + default=None + Input data. If ``None``, the output will be the pairwise + similarities between all samples in ``X``. + dense_output : :obj:`bool`, default=True + Whether to return dense output even when the input is sparse. If + ``False``, the output is sparse if both input arrays are sparse. + closest_polarity : :obj:`bool`, default=True + ``True`` to consider the smallest of the two angles between the crossing + lines resulting from reversing each vector pair. + + Returns + ------- + :obj:`~numpy.ndarray` + Pairwise angles across diffusion gradient encoding directions. + + Examples + -------- + >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> compute_pairwise_angles(X, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS + 3.1415... + >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> Y = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> compute_pairwise_angles(X, Y, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS + 3.1415... + >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> compute_pairwise_angles(X)[0, 1] + 0.0 + + References + ---------- + .. [Andersson15] J. L. R. Andersson. et al., An integrated approach to + correction for off-resonance effects and subject movement in diffusion MR + imaging, NeuroImage 125 (2016) 1063-11078 + + """ + + cosines = np.clip(cosine_similarity(X, Y, dense_output=dense_output), -1.0, 1.0) + return np.arccos(np.abs(cosines)) if closest_polarity else np.arccos(cosines) diff --git a/test/test_model.py b/test/test_model.py index 117b2d93..4bb8de65 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -118,7 +118,7 @@ def test_gp_model(): bvecs = X.T / np.linalg.norm(X.T, axis=0) gtab = gradient_table([1000] * bvecs.shape[-1], bvecs) - gpfit = gp.fit(y, gtab) + gpfit = gp.fit(gtab, y) X_qry = bvecs[:, :2].T prediction = gpfit.predict(X_qry) diff --git a/test/test_dipy.py b/test/test_sklearn.py similarity index 82% rename from test/test_dipy.py rename to test/test_sklearn.py index 34fb352f..277ba0b9 100644 --- a/test/test_dipy.py +++ b/test/test_sklearn.py @@ -24,15 +24,9 @@ import numpy as np import pytest -from dipy.core.gradients import gradient_table from dipy.io import read_bvals_bvecs -from eddymotion.model._dipy import ( - PairwiseOrientationKernel, - compute_exponential_covariance, - compute_pairwise_angles, - compute_spherical_covariance, -) +from eddymotion.model import _sklearn as ems GradientTablePatch = namedtuple("gtab", ["bvals", "bvecs"]) @@ -263,63 +257,42 @@ ) def test_compute_pairwise_angles(bvecs1, bvecs2, closest_polarity, expected): # DIPY requires the vectors to be normalized - _bvecs1 = bvecs1 / np.linalg.norm(bvecs1, axis=0) - gtab1 = gradient_table([1000] * _bvecs1.shape[-1], _bvecs1) - + _bvecs1 = (bvecs1 / np.linalg.norm(bvecs1, axis=0)).T _bvecs2 = None - gtab2 = None + if bvecs2 is not None: - _bvecs2 = bvecs2 / np.linalg.norm(bvecs2, axis=0) - gtab2 = gradient_table([1000] * _bvecs2.shape[-1], _bvecs2) + _bvecs2 = (bvecs2 / np.linalg.norm(bvecs2, axis=0)).T - obtained = compute_pairwise_angles(gtab1, gtab2, closest_polarity) + obtained = ems.compute_pairwise_angles(_bvecs1, _bvecs2, closest_polarity) if _bvecs2 is not None: - assert (_bvecs1.shape[-1], _bvecs2.shape[-1]) == obtained.shape + assert (_bvecs1.shape[0], _bvecs2.shape[0]) == obtained.shape assert obtained.shape == expected.shape np.testing.assert_array_almost_equal(obtained, expected, decimal=2) -def test_compute_exponential_covariance(): - obtained = compute_exponential_covariance(THETAS, 0.5) - assert np.allclose(obtained, EXPECTED_EXPONENTIAL) - - -def test_compute_spherical_covariance(): - obtained = compute_spherical_covariance(THETAS, 1.23) - assert np.allclose(obtained, EXPECTED_SPHERICAL) - - -def test_kernel(repodata): +@pytest.mark.parametrize("covariance", ["Spherical", "Exponential"]) +def test_kernel(repodata, covariance): """Check kernel construction.""" bvals, bvecs = read_bvals_bvecs( str(repodata / "ds000114_singleshell.bval"), str(repodata / "ds000114_singleshell.bvec"), ) - gtab_original = gradient_table(bvals, bvecs) - - bvals = gtab_original.bvals[~gtab_original.b0s_mask] - bvecs = gtab_original.bvecs[~gtab_original.b0s_mask, :] - gtab = gradient_table(bvals, bvecs) - kernel = PairwiseOrientationKernel() + bvecs = bvecs[bvals > 10] - K = kernel(gtab) + KernelType = getattr(ems, f"{covariance}Kriging") + kernel = KernelType() + K = kernel(bvecs) - assert K.shape == (len(bvals), len(bvals)) - assert np.allclose(np.diagonal(K), kernel.diag(gtab)) + assert K.shape == (bvecs.shape[0],) * 2 - # DIPY bug - gradient tables cannot be created with just one bvec/bval - # https://github.com/dipy/dipy/issues/3283 - gtab_Y = GradientTablePatch(bvals[10], bvecs[10, ...]) + assert np.allclose(np.diagonal(K), kernel.diag(bvecs)) - K_predict = kernel(gtab, gtab_Y) + K_predict = kernel(bvecs, [bvecs[10, ...]]) assert K_predict.shape == (K.shape[0], 1) - gtab_Y = gradient_table(bvals[10:14], bvecs[10:14, ...]) - - K_predict = kernel(gtab, gtab_Y) - + K_predict = kernel(bvecs, bvecs[10:14, ...]) assert K_predict.shape == (K.shape[0], 4)