From 0a22e0482fcfd0c3b3ecdd4d2e2f3dadce0e72cc Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Mon, 18 Sep 2023 20:38:03 +0200 Subject: [PATCH] checkpoint: energy prediction works --- examples/low-rank-kernel-model.py | 63 +++ examples/sparse-kernel-model.py | 34 -- src/equisolve/numpy/kernels/__init__.py | 5 + .../numpy/kernels/_aggregate_kernel.py | 171 +++++++ src/equisolve/numpy/models/__init__.py | 3 +- .../numpy/models/sor_kernel_ridge.py | 426 ++++++++++++++++++ src/equisolve/numpy/models/sparse_kernel.py | 216 --------- .../numpy/models/sor_kernel_ridge.py | 235 ++++++++++ 8 files changed, 902 insertions(+), 251 deletions(-) create mode 100644 examples/low-rank-kernel-model.py delete mode 100644 examples/sparse-kernel-model.py create mode 100644 src/equisolve/numpy/kernels/__init__.py create mode 100644 src/equisolve/numpy/kernels/_aggregate_kernel.py create mode 100644 src/equisolve/numpy/models/sor_kernel_ridge.py delete mode 100644 src/equisolve/numpy/models/sparse_kernel.py create mode 100644 tests/equisolve_tests/numpy/models/sor_kernel_ridge.py diff --git a/examples/low-rank-kernel-model.py b/examples/low-rank-kernel-model.py new file mode 100644 index 0000000..e11f705 --- /dev/null +++ b/examples/low-rank-kernel-model.py @@ -0,0 +1,63 @@ +""" +Computing a SoR Kernel Model +============================ + +.. start-body + +In this tutorial we calculate a kernel model using subset of regressor (SoR) +Kernel model. +""" + +import ase.io +import metatensor +from rascaline import SoapPowerSpectrum + +from equisolve.numpy.models import SorKernelRidge +from equisolve.numpy.sample_selection import FPS +from equisolve.utils import ase_to_tensormap + + +frames = ase.io.read("dataset.xyz", ":20") +y = ase_to_tensormap(frames, energy="energy") +n_to_select = 100 +degree = 3 + +HYPER_PARAMETERS = { + "cutoff": 5.0, + "max_radial": 6, + "max_angular": 4, + "atomic_gaussian_width": 0.3, + "center_atom_weight": 1.0, + "radial_basis": { + "Gto": {}, + }, + "cutoff_function": { + "ShiftedCosine": {"width": 0.5}, + }, +} + +calculator = SoapPowerSpectrum(**HYPER_PARAMETERS) + +descriptor = calculator.compute(frames, gradients=[]) + +descriptor = descriptor.keys_to_samples("species_center") +descriptor = descriptor.keys_to_properties(["species_neighbor_1", "species_neighbor_2"]) + +pseudo_points = FPS(n_to_select=n_to_select).fit_transform(descriptor) + +clf = SorKernelRidge() +clf.fit( + descriptor, + pseudo_points, + y, + kernel_type="polynomial", + kernel_kwargs={"degree": 3, "aggregate_names": ["center", "species_center"]}, +) +y_pred = clf.predict(descriptor) + +print( + "MAE:", + metatensor.mean_over_samples( + metatensor.abs(metatensor.subtract(y_pred, y)), "structure" + )[0].values[0, 0], +) diff --git a/examples/sparse-kernel-model.py b/examples/sparse-kernel-model.py deleted file mode 100644 index dd2571d..0000000 --- a/examples/sparse-kernel-model.py +++ /dev/null @@ -1,34 +0,0 @@ -# Example - -import ase.io -import metatensor -from metatenso import TensorMap - -from equisolve.numpy.models.sparse_kernel import ( - SparseKernelRidge, - compute_sparse_kernel, -) -from equisolve.numpy.sample_section import FPS -from equisolve.utils import ase_to_tensormap - - -frames = ase.io.read("dataset.xyz", ":20") -y = ase_to_tensormap(frames) -ps = TensorMap() -n_to_select = 100 -degree = 3 - - -pseudo_points = FPS(n_to_select=n_to_select).fit_transform(ps) - -k_mm = compute_sparse_kernel( - tensor=pseudo_points, pseudo_points=pseudo_points, degree=degree -) -k_nm = compute_sparse_kernel(tensor=ps, pseudo_points=pseudo_points, degree=degree) -k_y = compute_sparse_kernel(tensor=y, pseudo_points=pseudo_points, degree=degree) - -# User can do their structure sum -k_nm = metatensor.sum_over_samples(k_nm, samples_names="species_center") - -clf = SparseKernelRidge() -clf.fit(k_mm=k_mm, k_nm=k_nm, y=k_y) diff --git a/src/equisolve/numpy/kernels/__init__.py b/src/equisolve/numpy/kernels/__init__.py new file mode 100644 index 0000000..7dfd985 --- /dev/null +++ b/src/equisolve/numpy/kernels/__init__.py @@ -0,0 +1,5 @@ +from ._aggregate_kernel import AggregateKernel # noqa: F401 +from ._aggregate_kernel import AggregateLinear, AggregatePolynomial + + +__all__ = ["AggregateLinear", "AggregatePolynomial"] diff --git a/src/equisolve/numpy/kernels/_aggregate_kernel.py b/src/equisolve/numpy/kernels/_aggregate_kernel.py new file mode 100644 index 0000000..e200468 --- /dev/null +++ b/src/equisolve/numpy/kernels/_aggregate_kernel.py @@ -0,0 +1,171 @@ +from typing import List, Tuple, Union + +import metatensor +from metatensor import TensorMap + + +try: + import torch + + HAS_TORCH = True + TorchModule = torch.nn.Module +except ImportError: + HAS_TORCH = False + import abc + + # TODO move to more module.py + class Module(metaclass=abc.ABCMeta): + @abc.abstractmethod + def forward(self, *args, **kwargs): + pass + + def __call__(self, *args, **kwargs): + return self.forward(*args, **kwargs) + + @abc.abstractmethod + def export_torch(self): + pass + + +class AggregateKernel(Module): + """ + A kernel that aggregates values in a kernel over :param aggregate_names: using + a aggregaten function given by :param aggregate_type: + + :param aggregate_names: + + :param aggregate_type: + """ + + def __init__( + self, + aggregate_names: Union[str, List[str]] = "aggregate", + aggregate_type: str = "sum", + structurewise_aggregate: bool = False, + ): + valid_aggregate_types = ["sum", "mean"] + if aggregate_type not in valid_aggregate_types: + raise ValueError( + f"Given aggregate_type {aggregate_type!r} but only " + f"{aggregate_type!r} are supported." + ) + if structurewise_aggregate: + raise NotImplementedError( + "structurewise aggregation has not been implemented." + ) + + self._aggregate_names = aggregate_names + self._aggregate_type = aggregate_type + self._structurewise_aggregate = structurewise_aggregate + + def aggregate_features(self, tensor: TensorMap) -> TensorMap: + if self._aggregate_type == "sum": + return metatensor.sum_over_samples( + tensor, samples_names=self._aggregate_names + ) + elif self._aggregate_type == "mean": + return metatensor.mean_over_samples( + tensor, samples_names=self._aggregate_names + ) + else: + raise NotImplementedError( + f"aggregate_type {self._aggregate_type!r} has not been implemented." + ) + + def aggregate_kernel( + self, kernel: TensorMap, are_pseudo_points: Tuple[bool, bool] = (False, False) + ) -> TensorMap: + if self._aggregate_type == "sum": + if not are_pseudo_points[0]: + kernel = metatensor.sum_over_samples(kernel, self._aggregate_names) + if not are_pseudo_points[1]: + # TODO {sum,mean}_over_properties does not exist + raise NotImplementedError( + "properties dimenson cannot be aggregated for the moment" + ) + kernel = metatensor.sum_over_properties(kernel, self._aggregate_names) + return kernel + elif self._aggregate_type == "mean": + if not are_pseudo_points[0]: + kernel = metatensor.mean_over_samples(kernel, self._aggregate_names) + if not are_pseudo_points[1]: + # TODO {sum,mean}_over_properties does not exist + raise NotImplementedError( + "properties dimenson cannot be aggregated for the moment" + ) + kernel = metatensor.mean_over_properties(kernel, self._aggregate_names) + return kernel + else: + raise NotImplementedError( + f"aggregate_type {self._aggregate_type!r} has not been implemented." + ) + + def forward( + self, + tensor1: TensorMap, + tensor2: TensorMap, + are_pseudo_points: Tuple[bool, bool] = (False, False), + ) -> TensorMap: + return self.aggregate_kernel( + self.compute_kernel(tensor1, tensor2), are_pseudo_points + ) + + def compute_kernel(self, tensor1: TensorMap, tensor2: TensorMap) -> TensorMap: + raise NotImplementedError("compute_kernel needs to be implemented.") + + +class AggregateLinear(AggregateKernel): + def __init__( + self, + aggregate_names: Union[str, List[str]] = "aggregate", + aggregate_type: str = "sum", + structurewise_aggregate: bool = False, + ): + super().__init__(aggregate_names, aggregate_type, structurewise_aggregate) + + def forward( + self, + tensor1: TensorMap, + tensor2: TensorMap, + are_pseudo_points: Tuple[bool, bool] = (False, False), + ) -> TensorMap: + # we overwrite default behavior because for linear kernels we can do it more + # memory efficient + if not are_pseudo_points[0]: + tensor1 = self.aggregate_features(tensor1) + if not are_pseudo_points[1]: + tensor2 = self.aggregate_features(tensor2) + return self.compute_kernel(tensor1, tensor2) + + def compute_kernel(self, tensor1: TensorMap, tensor2: TensorMap) -> TensorMap: + return metatensor.dot(tensor1, tensor2) + + def export_torch(self): + raise NotImplementedError("export_torch has not been implemented") + # idea is to do something in the lines of + # return euqisolve.torch.kernels.AggregateLinear( + # self._aggregate_names, + # self._aggregate_type) + + +class AggregatePolynomial(AggregateKernel): + def __init__( + self, + aggregate_names: Union[str, List[str]] = "aggregate", + aggregate_type: str = "sum", + structurewise_aggregate: bool = False, + degree: int = 2, + ): + super().__init__(aggregate_names, aggregate_type, structurewise_aggregate) + self._degree = 2 + + def compute_kernel(self, tensor1: TensorMap, tensor2: TensorMap): + return metatensor.pow(metatensor.dot(tensor1, tensor2), self._degree) + + def export_torch(self): + raise NotImplementedError("export_torch has not been implemented") + # idea is to do something in the lines of + # return euqisolve.torch.kernels.AggregatePolynomial( + # self._aggregate_names, + # self._aggregate_type, + # self._degree) diff --git a/src/equisolve/numpy/models/__init__.py b/src/equisolve/numpy/models/__init__.py index cc1042d..13aba4b 100644 --- a/src/equisolve/numpy/models/__init__.py +++ b/src/equisolve/numpy/models/__init__.py @@ -13,6 +13,7 @@ from .linear_model import Ridge +from .sor_kernel_ridge import SorKernelRidge -__all__ = ["Ridge"] +__all__ = ["Ridge", "SorKernelRidge"] diff --git a/src/equisolve/numpy/models/sor_kernel_ridge.py b/src/equisolve/numpy/models/sor_kernel_ridge.py new file mode 100644 index 0000000..f4731ee --- /dev/null +++ b/src/equisolve/numpy/models/sor_kernel_ridge.py @@ -0,0 +1,426 @@ +from typing import List, Union + +import metatensor +import numpy as np +import scipy +from metatensor import Labels, TensorBlock, TensorMap + +from ..kernels import AggregateKernel, AggregateLinear, AggregatePolynomial + + +class SorKernelRidge: + """ + Uses the subset of regressors (SoR) approximation of the kernel + + .. math:: + + w = (k_{nm}K_{nm} + k_{mm})^-1 @ k_{mn} y + + plus regularization + + Reference + --------- + Quinonero-Candela, J., & Rasmussen, C. E. (2005). A unifying view of sparse + approximate Gaussian process regression. The Journal of Machine Learning Research, + 6, 1939-1959. + """ + + def __init__( + self, + ) -> None: + self._weights = None + + def _set_kernel(self, kernel: Union[str, AggregateKernel], **kernel_kwargs): + valid_kernels = ["linear", "polynomial", "precomputed"] + aggregate_type = kernel_kwargs.get("aggregate_type", "sum") + if aggregate_type != "sum": + raise ValueError( + f'aggregate_type={aggregate_type!r} found but must be "sum"' + ) + if kernel == "linear": + self._kernel = AggregateLinear(aggregate_type="sum", **kernel_kwargs) + elif kernel == "polynomial": + self._kernel = AggregatePolynomial(aggregate_type="sum", **kernel_kwargs) + elif kernel == "precomputed": + self._kernel = None + elif isinstance(kernel, AggregateKernel): + self._kernel = kernel + else: + raise ValueError( + f"kernel type {kernel!r} is not supported. Please use one " + f"of the valid kernels {valid_kernels!r}" + ) + + def fit( + self, + X: TensorMap, + X_pseudo: TensorMap, + y: TensorMap, + kernel_type: Union[str, AggregateKernel] = "linear", + kernel_kwargs: dict = None, + accumulate_key_names: Union[str, List[str], None] = None, + alpha: Union[float, TensorMap] = 1.0, + solver: str = "RKHS-QR", + rcond: float = None, + ): + r""" + :param X: + features + if kernel type "precomputed" is used, the kernel k_nm is assumed + :param X_pseudo: + pseudo points + if kernel type "precomputed" is used, the kernel k_mm is assumed + :param y: + targets + :param kernel_type: + type of kernel used + :param kernel_kwargs: + additional keyword argument for specific kernel + - **linear** None + - **polynomial** degree + :param accumulate_key_names: + a string or list of strings that specify which key names should be + accumulate to one kernel. This is intended for key columns inducing sparsity + in the properties (e.g. neighbour species) + :param alpha: + regularization + :param solver: + determines which solver to use + ... TODO doc ... + :param rcond: + argument for the solver lstsq + + + TODO move to developer doc + + Derivation + ---------- + + We take equation (16b) (the mean expression) + + .. math:: + + \sigma^{-2} K_{tm}\Sigma K_{MN}y + + we put in the $\Sigma$ + + .. math:: + + \sigma^{-2} K_{tm}(\sigma^{-2}K_{mn}K_{mn}+K_{mm})^{-1} K_{mn}y + + We can move around the $\sigma's$ + + .. math:: + + K_{tm}((K_{mn}\sigma^{-1})(\sigma^{-1}K_{mn)}+K_{mm})^{-1} + (K_{mn}\sigma^{-1})(y\sigma^{-1}) + + you can see the building blocks in the code are $K_{mn}\sigma^{-1}$ and + $y\sigma^{-1}$ + """ + + # TODO store backend to use bring it back to original backend (e.g. torch) + # at the end of calculation + X = metatensor.to(X, backend="numpy") + X_pseudo = metatensor.to(X_pseudo, backend="numpy") + y = metatensor.to(y, backend="numpy") + + if type(alpha) is float: + alpha_is_zero = alpha == 0.0 + + alpha_tensor = metatensor.ones_like(y) + + samples = Labels( + names=y.samples_names, + values=np.zeros([1, len(y.samples_names)], dtype=int), + ) + + alpha_tensor = metatensor.slice( + alpha_tensor, axis="samples", labels=samples + ) + alpha = metatensor.multiply(alpha_tensor, alpha) + + elif isinstance(alpha, TensorMap): + raise NotImplementedError("TensorMaps are not yet supported") + else: + raise ValueError("alpha must either be a float or a TensorMap") + + if kernel_kwargs is None: + kernel_kwargs = {} + self._set_kernel(kernel_type, **kernel_kwargs) + self._kernel_type = kernel_type + + if self._kernel_type == "precomputed": + k_nm = X + k_mm = X_pseudo + else: + k_mm = self._kernel(X_pseudo, X_pseudo, are_pseudo_points=(True, True)) + k_nm = self._kernel(X, X_pseudo, are_pseudo_points=(False, True)) + + if accumulate_key_names is not None: + raise NotImplementedError( + "accumulate_key_names only supports None for the moment" + ) + + # solve + weight_blocks = [] + for key, y_block in y.items(): + k_nm_block = k_nm.block(key) + k_mm_block = k_mm.block(key) + X_block = X.block(key) + structures = metatensor.operations._dispatch.unique( + k_nm_block.samples["structure"] + ) + n_atoms_per_structure = [] + for structure in structures: + # TODO dispatch sum + n_atoms = np.sum(X_block.samples["structure"] == structure) + n_atoms_per_structure.append(float(n_atoms)) + + # PR COMMENT removed delta because would say this is part of the standardizr + n_atoms_per_structure = np.array(n_atoms_per_structure) + normalization = metatensor.operations._dispatch.sqrt(n_atoms_per_structure) + alpha_values = alpha.block(key).values + if not alpha_is_zero: + normalization /= alpha_values[0, 0] + normalization = normalization[:, None] + + k_nm_reg = k_nm_block.values * normalization + y_reg = y_block.values * normalization + + self._solver = _SorKernelSolver( + k_mm_block.values, regularizer=1, jitter=0, solver=solver + ) + + if rcond is None: + # PR COMMENT maybe we get this from a case class RidgeBase Ridge + # uses the same + rcond_ = max(k_nm_reg.shape) * np.finfo(k_nm_reg.dtype.char.lower()).eps + else: + rcond_ = rcond + self._solver.fit(k_nm_reg, y_reg, rcond=rcond_) + + weight_block = TensorBlock( + values=self._solver.weights.T, + samples=y_block.properties, + components=k_nm_block.components, + properties=k_nm_block.properties, + ) + weight_blocks.append(weight_block) + + self._weights = TensorMap(y.keys, weight_blocks) + + self._X_pseudo = X_pseudo.copy() + + @property + def weights(self) -> TensorMap: + return self._weights + + def predict(self, T: TensorMap) -> TensorMap: + """ + :param T: + features + if kernel type "precomputed" is used, the kernel k_tm is assumed + """ + if self._kernel_type == "precomputed": + k_tm = T + else: + k_tm = self._kernel(T, self._X_pseudo, are_pseudo_points=(False, True)) + return metatensor.dot(k_tm, self._weights) + + def forward(self, tensor: TensorMap) -> TensorMap: + return self.predict(tensor) + + +class _SorKernelSolver: + """ + A few quick implementation notes, docs to be done. + + This is meant to solve the subset of regressors (SoR) problem:: + + .. math:: + + w = (KNM.T@KNM + reg*KMM)^-1 @ KNM.T@y + + The inverse needs to be stabilized with application of a numerical jitter, + that is expressed as a fraction of the largest eigenvalue of KMM + + :param KMM: + KNM matrix + :param regularizer: + regularizer + :param jitter: + numerical jitter to stabilize fit + :param solver: + Method to solve the sparse KRR equations. + + * RKHS-QR: TBD + * RKHS: Compute first the reproducing kernel features by diagonalizing K_MM and + computing `P_NM = K_NM @ U_MM @ Lam_MM^(-1.2)` and then solves the linear + problem for those (which is usually better conditioned):: + + (P_NM.T@P_NM + 1)^(-1) P_NM.T@Y + + * QR: TBD + * solve: Uses `scipy.linalg.solve` for the normal equations:: + + (K_NM.T@K_NM + K_MM)^(-1) K_NM.T@Y + + * lstsq: require rcond value. Use `numpy.linalg.solve(rcond=rcond)` for the + normal equations:: + + (K_NM.T@K_NM + K_MM)^(-1) K_NM.T@Y + + Reference + --------- + Foster, L., Waagen, A., Aijaz, N., Hurley, M., Luis, A., Rinsky, J., ... & + Srivastava, A. (2009). Stable and Efficient Gaussian Process Calculations. Journal + of Machine Learning Research, 10(4). + """ + + def __init__( + self, + KMM: np.ndarray, + regularizer: float = 1.0, + jitter: float = 0.0, + solver: str = "RKHS", + relative_jitter: bool = True, + ): + self.solver = solver + self.KMM = KMM + self.relative_jitter = relative_jitter + + self._nM = len(KMM) + if self.solver == "RKHS" or self.solver == "RKHS-QR": + self._vk, self._Uk = scipy.linalg.eigh(KMM) + self._vk = self._vk[::-1] + self._Uk = self._Uk[:, ::-1] + elif self.solver == "QR" or self.solver == "solve" or self.solver == "lstsq": + # gets maximum eigenvalue of KMM to scale the numerical jitter + self._KMM_maxeva = scipy.sparse.linalg.eigsh( + KMM, k=1, return_eigenvectors=False + )[0] + else: + raise ValueError( + f"Solver {solver} not supported. Possible values " + "are 'RKHS', 'RKHS-QR', 'QR', 'solve' or lstsq." + ) + if relative_jitter: + if self.solver == "RKHS" or self.solver == "RKHS-QR": + self._jitter_scale = self._vk[0] + elif ( + self.solver == "QR" or self.solver == "solve" or self.solver == "lstsq" + ): + self._jitter_scale = self._KMM_maxeva + else: + self._jitter_scale = 1.0 + self.set_regularizers(regularizer, jitter) + + def set_regularizers(self, regularizer=1.0, jitter=0.0): + self.regularizer = regularizer + self.jitter = jitter + if self.solver == "RKHS" or self.solver == "RKHS-QR": + self._nM = len(np.where(self._vk > self.jitter * self._jitter_scale)[0]) + self._PKPhi = self._Uk[:, : self._nM] * 1 / np.sqrt(self._vk[: self._nM]) + elif self.solver == "QR": + self._VMM = scipy.linalg.cholesky( + self.regularizer * self.KMM + + np.eye(self._nM) * self._jitter_scale * self.jitter + ) + self._Cov = np.zeros((self._nM, self._nM)) + self._KY = None + + def fit(self, KNM, Y, rcond=None): + if len(Y.shape) == 1: + Y = Y[:, np.newaxis] + if self.solver == "RKHS": + Phi = KNM @ self._PKPhi + self._weights = self._PKPhi @ scipy.linalg.solve( + Phi.T @ Phi + np.eye(self._nM) * self.regularizer, + Phi.T @ Y, + assume_a="pos", + ) + elif self.solver == "RKHS-QR": + A = np.vstack( + [KNM @ self._PKPhi, np.sqrt(self.regularizer) * np.eye(self._nM)] + ) + Q, R = np.linalg.qr(A) + self._weights = self._PKPhi @ scipy.linalg.solve_triangular( + R, Q.T @ np.vstack([Y, np.zeros((self._nM, Y.shape[1]))]) + ) + elif self.solver == "QR": + A = np.vstack([KNM, self._VMM]) + Q, R = np.linalg.qr(A) + self._weights = scipy.linalg.solve_triangular( + R, Q.T @ np.vstack([Y, np.zeros((KNM.shape[1], Y.shape[1]))]) + ) + elif self.solver == "solve": + self._weights = scipy.linalg.solve( + KNM.T @ KNM + + self.regularizer * self.KMM + + np.eye(self._nM) * self.jitter * self._jitter_scale, + KNM.T @ Y, + assume_a="pos", + ) + elif self.solver == "lstsq": + self._weights = np.linalg.lstsq( + KNM.T @ KNM + + self.regularizer * self.KMM + + np.eye(self._nM) * self.jitter * self._jitter_scale, + KNM.T @ Y, + rcond=rcond, + )[0] + else: + ValueError("solver not implemented") + + def partial_fit(self, KNM, Y, accumulate_only=False, rcond=None): + if len(Y) > 0: + # only accumulate if we are passing data + if len(Y.shape) == 1: + Y = Y[:, np.newaxis] + if self.solver == "RKHS": + Phi = KNM @ self._PKPhi + elif self.solver == "solve" or self.solver == "lstsq": + Phi = KNM + else: + raise ValueError( + "Partial fit can only be realized with " + "solver = 'RKHS' or 'solve'" + ) + if self._KY is None: + self._KY = np.zeros((self._nM, Y.shape[1])) + + self._Cov += Phi.T @ Phi + self._KY += Phi.T @ Y + + # do actual fit if called with empty array or if asked + if len(Y) == 0 or (not accumulate_only): + if self.solver == "RKHS": + self._weights = self._PKPhi @ scipy.linalg.solve( + self._Cov + np.eye(self._nM) * self.regularizer, + self._KY, + assume_a="pos", + ) + elif self.solver == "solve": + self._weights = scipy.linalg.solve( + self._Cov + + self.regularizer * self.KMM + + np.eye(self.KMM.shape[0]) * self.jitter * self._jitter_scale, + self._KY, + assume_a="pos", + ) + elif self.solver == "lstsq": + self._weights = np.linalg.lstsq( + self._Cov + + self.regularizer * self.KMM + + np.eye(self.KMM.shape[0]) * self.jitter * self._jitter_scale, + self._KY, + rcond=rcond, + )[0] + + @property + def weights(self): + return self._weights + + def predict(self, KTM): + return KTM @ self._weights diff --git a/src/equisolve/numpy/models/sparse_kernel.py b/src/equisolve/numpy/models/sparse_kernel.py deleted file mode 100644 index a57454a..0000000 --- a/src/equisolve/numpy/models/sparse_kernel.py +++ /dev/null @@ -1,216 +0,0 @@ -from typing import Union - -import metatensor -import numpy as np -import scipy -from metatensor import Labels, TensorBlock, TensorMap -from metatensor.operations import dot, multiply, ones_like, slice - -from ..utils import block_to_array, dict_to_tensor_map, tensor_map_to_dict - - -def compute_sparse_kernel( - tensor: TensorMap, pseudo_points: TensorMap, degree: int -) -> TensorMap: - metatensor.pow(metatensor.dot(tensor, pseudo_points), degree) - - -class SparseKernelRidge: - def __init__( - self, - parameter_keys: Union[List[str], str] = None, - ) -> None: - if type(parameter_keys) not in (list, tuple, np.ndarray): - self.parameter_keys = [parameter_keys] - else: - self.parameter_keys = parameter_keys - - self._weights = None - - def fit( - self, - k_mm: TensorMap, - k_nm: TensorMap, - k_y: TensorMap, - alpha: Union[float, TensorMap] = 1.0, - jitter: float = 1e-13, - solver: str = "auto", - cond: float = None, - ): - if type(alpha) is float: - alpha_tensor = ones_like(k_y) - - samples = Labels( - names=k_y.sample_names, - values=np.zeros([1, len(k_y.sample_names)], dtype=int), - ) - - alpha_tensor = slice(alpha_tensor, samples=samples) - alpha = multiply(alpha_tensor, alpha) - elif type(alpha) is not TensorMap: - raise ValueError("alpha must either be a float or a TensorMap") - - for key, yblock in k_y: - k_nm_block = k_nm.block(key) - k_mm_block = k_mm.block(key) - structures = np.unique(k_nm_block.samples["structure"]) - n_atoms_per_structure = [] - for structure in structures: - n_atoms = np.sum(k_nm_block.samples["structure"] == structure) - n_atoms_per_structure.append(float(n_atoms)) - - delta = np.std(yblock.values) - energy_regularizer = np.sqrt(n_atoms_per_structure) - # yblock.values[:] /= energy_regularizer - - _alpha = block_to_array( - alpha.block(key), parameter_keys=self.parameter_keys - ) - Y = block_to_array(yblock, parameter_keys=self.parameter_keys) - n_centers = yblock.value.shape[0] - Y[:n_centers] /= energy_regularizer[:, None] - Y /= _alpha / delta - - KNM = block_to_array(k_nm_block, parameter_keys=self.parameter_keys) - KNM[:n_centers] /= energy_regularizer[:, None] - KNM /= _alpha / delta - - KMM = block_to_array(k_mm_block, parameter_keys=self.parameter_keys) - - # check that alpha has the the same samples as k_nm and only one property - - def predict(self): - pass - - -class SparseGPRSolver: - """ - A few quick implementation notes, docs to be done. - - This is meant to solve the sparse GPR problem:: - - b = (KNM.T@KNM + reg*KMM)^-1 @ KNM.T@y - - The inverse needs to be stabilized with application of a numerical jitter, - that is expressed as a fraction of the largest eigenvalue of KMM - - Parameters - ---------- - KMM : numpy.ndarray - KNM matrix - regularizer : float - regularizer - jitter : float - numerical jitter to stabilize fit - solver : {'RKHS-QR', 'RKHS', 'QR', 'solve', 'lstsq'} - Method to solve the sparse KRR equations. - - * RKHS-QR: TBD - * RKHS: Compute first the reproducing kernel features by diagonalizing K_MM - and computing `P_NM = K_NM @ U_MM @ Lam_MM^(-1.2)` and then solves the linear - problem for those (which is usually better conditioned):: - - (P_NM.T@P_NM + 1)^(-1) P_NM.T@Y - - * QR: TBD - * solve: Uses `scipy.linalg.solve` for the normal equations:: - - (K_NM.T@K_NM + K_MM)^(-1) K_NM.T@Y - - * lstsq: require rcond value. Use `numpy.linalg.solve(rcond=rcond)` for the normal equations:: - - (K_NM.T@K_NM + K_MM)^(-1) K_NM.T@Y - """ - - def __init__( - self, KMM, regularizer=1, jitter=0, solver="RKHS", relative_jitter=True - ): - self.solver = solver - self.KMM = KMM - self.relative_jitter = relative_jitter - - self._nM = len(KMM) - if self.solver == "RKHS" or self.solver == "RKHS-QR": - self._vk, self._Uk = scipy.linalg.eigh(KMM) - self._vk = self._vk[::-1] - self._Uk = self._Uk[:, ::-1] - elif self.solver == "QR" or self.solver == "solve" or self.solver == "lstsq": - # gets maximum eigenvalue of KMM to scale the numerical jitter - self._KMM_maxeva = scipy.sparse.linalg.eigsh( - KMM, k=1, return_eigenvectors=False - )[0] - else: - raise ValueError( - f"Solver {solver} not supported. Possible values " - "are 'RKHS', 'RKHS-QR', 'QR', 'solve' or lstsq." - ) - if relative_jitter: - if self.solver == "RKHS" or self.solver == "RKHS-QR": - self._jitter_scale = self._vk[0] - elif ( - self.solver == "QR" or self.solver == "solve" or self.solver == "lstsq" - ): - self._jitter_scale = self._KMM_maxeva - else: - self._jitter_scale = 1.0 - self.set_regularizers(regularizer, jitter) - - def set_regularizers(self, regularizer=1.0, jitter=0.0): - self.regularizer = regularizer - self.jitter = jitter - if self.solver == "RKHS" or self.solver == "RKHS-QR": - self._nM = len(np.where(self._vk > self.jitter * self._jitter_scale)[0]) - self._PKPhi = self._Uk[:, : self._nM] * 1 / np.sqrt(self._vk[: self._nM]) - elif self.solver == "QR": - self._VMM = scipy.linalg.cholesky( - self.regularizer * self.KMM - + np.eye(self._nM) * self._jitter_scale * self.jitter - ) - self._Cov = np.zeros((self._nM, self._nM)) - self._KY = None - - def fit(self, KNM, Y, rcond=None): - if len(Y.shape) == 1: - Y = Y[:, np.newaxis] - if self.solver == "RKHS": - Phi = KNM @ self._PKPhi - self._weights = self._PKPhi @ scipy.linalg.solve( - Phi.T @ Phi + np.eye(self._nM) * self.regularizer, - Phi.T @ Y, - assume_a="pos", - ) - elif self.solver == "RKHS-QR": - A = np.vstack( - [KNM @ self._PKPhi, np.sqrt(self.regularizer) * np.eye(self._nM)] - ) - Q, R = np.linalg.qr(A) - self._weights = self._PKPhi @ scipy.linalg.solve_triangular( - R, Q.T @ np.vstack([Y, np.zeros((self._nM, Y.shape[1]))]) - ) - elif self.solver == "QR": - A = np.vstack([KNM, self._VMM]) - Q, R = np.linalg.qr(A) - self._weights = scipy.linalg.solve_triangular( - R, Q.T @ np.vstack([Y, np.zeros((KNM.shape[1], Y.shape[1]))]) - ) - elif self.solver == "solve": - self._weights = scipy.linalg.solve( - KNM.T @ KNM - + self.regularizer * self.KMM - + np.eye(self._nM) * self.jitter * self._jitter_scale, - KNM.T @ Y, - assume_a="pos", - ) - elif self.solver == "lstsq": - self._weights = np.linalg.lstsq( - KNM.T @ KNM - + self.regularizer * self.KMM - + np.eye(self._nM) * self.jitter * self._jitter_scale, - KNM.T @ Y, - rcond=rcond, - )[0] - else: - ValueError("solver not implemented") - - def predict(self, KTM): - return KTM @ self._weights diff --git a/tests/equisolve_tests/numpy/models/sor_kernel_ridge.py b/tests/equisolve_tests/numpy/models/sor_kernel_ridge.py new file mode 100644 index 0000000..354356a --- /dev/null +++ b/tests/equisolve_tests/numpy/models/sor_kernel_ridge.py @@ -0,0 +1,235 @@ +from typing import List, Tuple, Union + +import metatensor +import numpy as np +import pytest +from metatensor import Labels, TensorBlock, TensorMap + +from equisolve.numpy.models import SorKernelRidge + +from ..utilities import tensor_to_tensormap + + +def random_structured_tensor( + n_blocks: int, + n_structures: int, + n_aggregate: Union[int, List[int]], + n_properties: int, + rng: np.random.Generator = None, +) -> TensorMap: + if rng is None: + rng = np.random.default_rng() + + if isinstance(n_aggregate, list): + raise NotImplementedError("list support for n_aggregate not implemented yet") + + n_samples = n_structures * n_aggregate + X_arr = rng.random([n_blocks, n_samples, n_properties]) + X = tensor_to_tensormap(X_arr) + + sample_indices = np.repeat(np.arange(n_structures), n_samples // n_structures) + structure_indices = sample_indices.copy() + atom_indices = np.tile(np.arange(n_aggregate), n_samples // n_aggregate) + samples_values = np.vstack((sample_indices, structure_indices, atom_indices)).T + new_samples = Labels( + names=["sample", "structure", "aggregate"], values=samples_values + ) + new_blocks = [] + for block in X.blocks(): + # TODO support components + assert len(block.components) == 0, "components not yet supported" + new_block = TensorBlock( + values=block.values, + samples=new_samples, + components=block.components, + properties=block.properties, + ) + # TODO create forces with finite difference, otherwise unlearnable + # I dont have a better idea + for parameter, grad_block in block.gradients(): + new_block.add_gradient(parameter, grad_block) + new_blocks.append(new_block) + return TensorMap(X.keys, new_blocks) + + +def random_structured_target( + tensor: TensorMap, rng: np.random.Generator = None +) -> Tuple[TensorMap, TensorMap]: + """ + one cannot create just random properties, because there might be no linear + relationship between input and target + + :param tensor: + input tensor + """ + # TODO function requires also a accumulate_key_names as SorKernelRidge + if rng is None: + rng = np.random.default_rng() + + weight_blocks = [] + for block in tensor.blocks(): + # TODO components are assumed to be zero in this construction + # add components + assert len(block.components) == 0, "components not yet supported" + weight_block = TensorBlock( + values=rng.random([1, len(block.properties)]), + samples=Labels.range("property", 1), + components=[], + properties=block.properties, + ) + weight_blocks.append(weight_block) + weight = TensorMap(tensor.keys, weight_blocks) + target = metatensor.sum_over_samples(metatensor.dot(tensor, weight), "aggregate") + return weight, target + + +class TestSorKernelRidge: + """Test the class for a linear ridge regression.""" + + @pytest.fixture(autouse=True) + def set_random_generator(self): + """Set the random generator to same seed before each test is run. + Otherwise test behaviour is dependend on the order of the tests + in this file and the number of parameters of the test. + """ + self.rng = np.random.default_rng(0x1225787418FBDFD12) + + @pytest.mark.parametrize("alpha", np.geomspace(1e-5, 1e5, 3).tolist()) + @pytest.mark.parametrize("solver", ["solve", "lstsq", "RKHS", "RKHS-QR", "QR"]) + @pytest.mark.parametrize( + "kernel", + [ + {"type": "linear", "kwargs": {}}, + {"type": "polynomial", "kwargs": {"degree": 2}}, + ], + ) + def test_fit_and_predict(self, alpha, solver, kernel): + """Test if ridge is working and all shapes are converted correctly. + Test is performed for two blocks. + """ + num_structures = 5 + num_aggregates = 10 # PR COMMENT this is a more generic name for atoms + # num_samples = num_structures * num_aggregates + num_properties = 5 + num_pseudo_points = 5 + + # Create input values + y_arr = self.rng.random([2, num_structures, 1]) + + X = random_structured_tensor( + 2, num_structures, num_aggregates, num_properties, self.rng + ) + y = tensor_to_tensormap(y_arr) + + X_samples_values = np.vstack([block.samples.values for block in X.blocks()]) + X_samples_values = np.unique(X_samples_values, axis=0) + selected_points = self.rng.choice( + X_samples_values, num_pseudo_points, replace=False + ) + pseudo_samples = Labels(names=X[0].samples.names, values=selected_points) + X_pseudo = metatensor.slice(X, axis="samples", labels=pseudo_samples) + + sample_indices = np.arange(num_structures) + structure_indices = sample_indices.copy() + samples_values = np.vstack((sample_indices, structure_indices)).T + new_samples = Labels(names=["sample", "structure"], values=samples_values) + new_blocks = [] + for block in y.blocks(): + new_block = TensorBlock( + values=block.values, + samples=new_samples, + components=block.components, + properties=block.properties, + ) + for parameter, grad_block in block.gradients(): + new_block.add_gradient(parameter, grad_block) + new_blocks.append(new_block) + y = TensorMap(y.keys, new_blocks) + + clf = SorKernelRidge() + clf.fit( + X=X, + X_pseudo=X_pseudo, + y=y, + alpha=alpha, + solver=solver, + kernel_type=kernel["type"], + kernel_kwargs=kernel["kwargs"], + ) + assert len(clf.weights) == 2 + assert clf.weights.block(0).values.shape[1] == num_pseudo_points + assert clf.weights.block(1).values.shape[1] == num_pseudo_points + + y_pred = clf.predict(X) + + for key in y.keys: + assert y_pred.block(key).values.shape == y.block(key).values.shape + assert y_pred.block(key).samples == y.block(key).samples + assert y_pred.block(key).components == y.block(key).components + assert y_pred.block(key).properties == y.block(key).properties + + @pytest.mark.parametrize( + "args", + [ + (1e-9, "lstsq", 1e-12, 1e-12), + (1e-9, "RKHS", 1e-6, 1e-6), # RKHS is a bit less accurate + (1e-9, "RKHS-QR", 1e-12, 1e-12), + (0.0, "lstsq", 1e-2, 1e-2), + (0.0, "RKHS", 1e-2, 1e-2), + (0.0, "RKHS-QR", 1e-2, 1e-2), + ], + ) # RKHS and RKHS-QR are less accurate for zero alpha + def test_exact(self, args): + """Test if ridge is working and all shapes are converted correctly. + Test is performed for two blocks. + """ + alpha, solver, atol, rtol = args + + num_structures = 5 + num_aggregates = 10 + num_samples = num_structures * num_aggregates + num_properties = 3 + num_pseudo_points = num_samples + + # Create input values + X = random_structured_tensor( + 2, num_structures, num_aggregates, num_properties, self.rng + ) + _, y = random_structured_target(X, self.rng) + + X_samples_values = np.vstack([block.samples.values for block in X.blocks()]) + X_samples_values = np.unique(X_samples_values, axis=0) + selected_points = self.rng.choice( + X_samples_values, num_pseudo_points, replace=False + ) + pseudo_samples = Labels(names=X[0].samples.names, values=selected_points) + X_pseudo = metatensor.slice(X, axis="samples", labels=pseudo_samples) + + sample_indices = np.arange(num_structures) + structure_indices = sample_indices.copy() + samples_values = np.vstack((sample_indices, structure_indices)).T + new_samples = Labels(names=["sample", "structure"], values=samples_values) + new_blocks = [] + for block in y.blocks(): + new_block = TensorBlock( + values=block.values, + samples=new_samples, + components=block.components, + properties=block.properties, + ) + for parameter, grad_block in block.gradients(): + new_block.add_gradient(parameter, grad_block) + new_blocks.append(new_block) + y = TensorMap(y.keys, new_blocks) + + clf = SorKernelRidge() + clf.fit(X=X, X_pseudo=X_pseudo, y=y, alpha=alpha, solver=solver) + assert len(clf.weights) == 2 + assert clf.weights.block(0).values.shape[1] == len(pseudo_samples) + assert clf.weights.block(1).values.shape[1] == len(pseudo_samples) + + y_pred = clf.predict(X) + + # TODO remove print, but useful to see for now + print(alpha, solver, np.mean(y[0].values / y_pred[0].values)) + metatensor.allclose_raise(y, y_pred, rtol=rtol, atol=atol)