diff --git a/docs/src/explanations/index.rst b/docs/src/explanations/index.rst index 8643a7235..77eb9ffa0 100644 --- a/docs/src/explanations/index.rst +++ b/docs/src/explanations/index.rst @@ -13,3 +13,4 @@ all about. concepts soap + rotation_adapted \ No newline at end of file diff --git a/docs/src/explanations/rotation_adapted.rst b/docs/src/explanations/rotation_adapted.rst new file mode 100644 index 000000000..e98bb332d --- /dev/null +++ b/docs/src/explanations/rotation_adapted.rst @@ -0,0 +1,88 @@ +Rotation-Adapted Features +========================= + +Equivariance +------------ + +Descriptors like SOAP are translation, rotation, and permutation invariant. +Indeed, such invariances are extremely useful if one wants to learn an invariant target (e.g., the energy). +Being already encoded in the descriptor, the learning algorithm does not have to learn such a physical requirement. + +The situation is different if the target is not invariant. For example, one may want to learn a dipole. The dipole rotates with a rotation of the molecule, and as such, invariant descriptors do not have the required symmetries for this task. + +Instead, one would need a rotation equivariant descriptor. +Rotation equivariance means that, if I first rotate the structure and compute the descriptor, I obtain the same result as first computing the descriptor and then applying the rotation, i.e., the descriptor behaves correctly upon rotation operations. +Denoting a structure as :math:`A`, the function computing the descriptor as :math:`f(\cdot)`, and the rotation operator as :math:`\hat{R}`, rotation equivariance can be expressed as: + +.. math:: + :name: eq:equivariance + + f(\hat{R} A) = \hat{R} f(A) + +Of course, invariance is a special case of equivariance. + + +Rotation Equivariance of the Spherical Expansion +------------------------------------------------ + +The spherical expansion is a rotation equivariant descriptor. +Let's consider the expansion coefficients of :math:`\rho_i(\mathbf{r})`. +We have: + +.. math:: + + \hat{R} \rho_i(\mathbf{r}) &= \sum_{nlm} c_{nlm}^{i} R_n(r) \hat{R} Y_l^m(\hat{\mathbf{r}}) \nonumber \\ + &= \sum_{nlmm'} c_{nlm}^{i} R_n(r) D_{m,m'}^{l}(\hat{R}) Y_l^{m'}(\hat{\mathbf{r}}) \nonumber \\ + &= \sum_{nlm} \left( \sum_{m'} D_{m',m}^l(\hat{R}) c_{nlm'}^{i}\right) B_{nlm}(\mathbf{r}) \nonumber + +and noting that :math:`Y_l^m(\hat{R} \hat{\mathbf{r}}) = \hat{R} Y_l^m(\hat{\mathbf{r}})` and :math:`\hat{R}r = r`, equation :ref:`(1) ` is satisfied and we conclude that the expansion coefficients :math:`c_{nlm}^{i}` are rotation equivariant. +Indeed, each :math:`c_{nlm}^{i}` transforms under rotation as the spherical harmonics :math:`Y_l^m(\hat{\mathbf{r}})`. + +Using the Dirac notation, the coefficient :math:`c_{nlm}^{i}` can be expressed as :math:`\braket{nlm\vert\rho_i}`. +Equivalently, and to stress the fact that this coefficient describes something that transforms under rotation as a spherical harmonics :math:`Y_l^m(\hat{\mathbf{r}})`, it is sometimes written as :math:`\braket{n\vert\rho_i;lm}`, i.e., the atomic density is "tagged" with a label that tells how it transforms under rotations. + + +Completeness Relations of Spherical Harmonics +--------------------------------------------- + +Spherical harmonics can be combined together using rules coming from standard theory of angular momentum: + +.. math:: + :name: eq:cg_coupling + + \ket{lm} \propto \ket{l_1 l_2 l m} = \sum_{m_1 m_2} C_{m_1 m_2 m}^{l_1 l_2 l} \ket{l_1 m_1} \ket{l_2 m_2} + +where :math:`C_{m_1 m_2 m}^{l_1 l_2 l}` is a Clebsch-Gordan (CG) coefficient. + +Thanks to the one-to-one correspondence (under rotation) between :math:`c_{nlm}^{i}` and :math:`Y_l^m`, +:ref:`(2) ` means that one can take products of two spherical expansion coefficients (which amounts to considering density correlations), and combine them with CG coefficients to get new coefficients that transform as a single spherical harmonics. +This process is known as coupling, from the uncoupled basis of angular momentum (formed by the product of rotation eigenstates) to a coupled basis (a single rotation eigenstate). + +One can also write the inverse of :ref:`(2) `: + +.. math:: + :name: eq:cg_decoupling + + \ket{l_1 m_1} \ket{l_2 m_2} = \sum_{l m} C_{m_1 m_2 m}^{l_1 l_2 l m} \ket{l_1 l_2 l m} + +that express the product of two rotation eigenstates in terms of one. This process is known as decoupling. + +Example: :math:`\lambda`-SOAP +----------------------------- + +A straightforward application of :ref:`(2) ` is the construction of :math:`\lambda`-SOAP features. +Indeed, :math:`\lambda`-SOAP was created in order to have a rotation and inversion equivariant version of the 3-body density correlations. +The :math:`\lambda` represents the degree of a spherical harmonics, :math:`Y_{\lambda}^{\mu}(\hat{\mathbf{r}})`, +and it indicates that this descriptor can transform under rotations as a spherical harmonics, i.e., it is rotation equivariant. + +It is then obtained by considering two expansion coefficients of the atomic density, and combining them with a CG iteration to a coupled basis, +as in :ref:`(2) `. +The :math:`\lambda`-SOAP descriptor is then: + +.. math:: + + \braket{n_1 l_1 n_2 l_2\vert\overline{\rho_i^{\otimes 2}, \sigma, \lambda \mu}} = + \frac{\delta_{\sigma, (-1)^{l_1 + l_2 + \lambda}}}{\sqrt{2 \lambda + 1}} + \sum_{m} C_{m (\mu-m) \mu}^{l_1 l_2 \lambda} c_{n_1 l_1 m}^{i} c_{n_2 l_2 (\mu - m)}^{i} + +where we have assumed real spherical harmonics coefficients. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c319086d0..36feada37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ classifiers = [ dependencies = [ "metatensor-core >=0.1.0,<0.2.0", + "wigners", ] [project.urls] diff --git a/python/rascaline/rascaline/utils/__init__.py b/python/rascaline/rascaline/utils/__init__.py index 8f0f588a8..8309ac44c 100644 --- a/python/rascaline/rascaline/utils/__init__.py +++ b/python/rascaline/rascaline/utils/__init__.py @@ -1,5 +1,6 @@ import os +from .clebsch_gordan import * # noqa from .power_spectrum import PowerSpectrum # noqa from .splines import ( # noqa AtomicDensityBase, diff --git a/python/rascaline/rascaline/utils/clebsch_gordan/__init__.py b/python/rascaline/rascaline/utils/clebsch_gordan/__init__.py new file mode 100644 index 000000000..38777d18e --- /dev/null +++ b/python/rascaline/rascaline/utils/clebsch_gordan/__init__.py @@ -0,0 +1,7 @@ +from .clebsch_gordan import correlate_density, correlate_density_metadata # noqa + + +__all__ = [ + "correlate_density", + "correlate_density_metadata", +] diff --git a/python/rascaline/rascaline/utils/clebsch_gordan/_cg_cache.py b/python/rascaline/rascaline/utils/clebsch_gordan/_cg_cache.py new file mode 100644 index 000000000..8ad082fb8 --- /dev/null +++ b/python/rascaline/rascaline/utils/clebsch_gordan/_cg_cache.py @@ -0,0 +1,551 @@ +""" +Module that stores the ClebschGordanReal class for computing and caching Clebsch +Gordan coefficients for use in CG combinations. +""" +from typing import Union + +import numpy as np +import wigners + +from . import _dispatch + + +try: + from mops import sparse_accumulation_of_products as sap # noqa F401 + + HAS_MOPS = True +except ImportError: + HAS_MOPS = False + +try: + from torch import Tensor as TorchTensor +except ImportError: + + class TorchTensor: + pass + + +UNKNOWN_ARRAY_TYPE = ( + "unknown array type, only numpy arrays and torch tensors are supported" +) + + +# ================================= +# ===== ClebschGordanReal class +# ================================= + + +class ClebschGordanReal: + """ + Class for computing Clebsch-Gordan coefficients for real spherical + harmonics. + + Stores the coefficients in a dictionary in the `self.coeffs` attribute, + which is built at initialization. There are 3 current use cases for the + format of these coefficients. By default, sparse accumulation of products is + performed, whether or not Mops is installed. + + Case 1: standard sparse format. + + Each dictionary entry is a dictionary with entries for each (m1, m2, mu) + combination. + + { + (l1, l2, lambda): { + (m1, m2, mu) : cg_{m1, m2, mu}^{l1, l2, lambda} + for m1 in range(-l1, l1 + 1), + for m2 in range(-l2, l2 + 1), + }, + ... + for l1 in range(0, l1_list) + for l2 in range(0, l2_list) + for lambda in range(0, range(|l1 - l2|, ..., |l1 + l2|)) + } + + Case 2: standard dense format. + + Each dictionary entry is a dense array with shape (2 * l1 + 1, 2 * l2 + 1, 2 + * lambda + 1). + + { + (l1, l2, lambda): + array( + cg_{m1, m2, mu}^{l1, l2, lambda} + ... + for m1 in range(-l1, l1 + 1), + for m2 in range(-l2, l2 + 1), + for mu in range(-lambda, lambda + 1), + + shape=(2 * l1 + 1, 2 * l2 + 1, 2 * lambda + 1), + ) + ... + for l1 in range(0, l1_list) + for l2 in range(0, l2_list) + for lambda in range(0, range(|l1 - l2|, ..., |l1 + l2|)) + } + + Case 3: MOPS sparse format. + + Each dictionary entry contains a tuple with four 1D arrays, corresponding to + the CG coeffs and m1, m2, mu indices respectively. All of these arrays are + sorted according to the mu index. This format is used for Sparse + Accumulation of Products (SAP) as implemented in MOPS. See + https://github.com/lab-cosmo/mops . + + { + (l1, l2, lambda): + ( + [ + cg_{m1, m2, mu}^{l1, l2, lambda} + ... + for m1 in range(-l1, l1 + 1), + for m2 in range(-l2, l2 + 1), + for mu in range(-lambda, lambda + 1) + ], + [ + m1 for m1 in range(-l1, l1 + 1), + ], + [ + m2 for m2 in range(-l2, l2 + 1), + ], + [ + mu for mu in range(-lambda, lambda + 1), + ], + ) + + + } + + where `cg_{m1, m2, mu}^{l1, l2, lambda}` is the Clebsch-Gordan coefficient + that describes the combination of the `m1` irreducible component of the `l1` + angular channel and the `m2` irreducible component of the `l2` angular + channel into the irreducible tensor of order `lambda`. In all cases, these + correspond to the non-zero CG coefficients, i.e. those in the range |-l, + ..., +l| for each angular order l in {l1, l2, lambda}. + + :param lambda_max: maximum lambda value to compute CG coefficients for. + :param sparse: whether to store the CG coefficients in sparse format. + :param use_mops: whether to store the CG coefficients in MOPS sparse format. + This is recommended as the default for sparse accumulation, but can only + be used if Mops is installed. + """ + + def __init__(self, lambda_max: int, sparse: bool = True, use_mops: bool = HAS_MOPS): + self._lambda_max = lambda_max + self._sparse = sparse + + if sparse: + if not HAS_MOPS: + # TODO: provide a warning once Mops is fully ready + # import warnings + # warnings.warn( + # "It is recommended to use MOPS for sparse accumulation. " + # " This can be installed with ``pip install" + # " git+https://github.com/lab-cosmo/mops`." + # " Falling back to numpy for now." + # ) + self._use_mops = False + else: + self._use_mops = True + + else: + # TODO: provide a warning once Mops is fully ready + # if HAS_MOPS: + # import warnings + # warnings.warn( + # "Mops is installed, but not being used" + # " as dense operations chosen." + # ) + self._use_mops = False + + self._coeffs = ClebschGordanReal.build_coeff_dict( + self._lambda_max, + self._sparse, + self._use_mops, + ) + + @property + def lambda_max(self): + return self._lambda_max + + @property + def sparse(self): + return self._sparse + + @property + def use_mops(self): + return self._use_mops + + @property + def coeffs(self): + return self._coeffs + + @staticmethod + def build_coeff_dict(lambda_max: int, sparse: bool, use_mops: bool): + """ + Builds a dictionary of Clebsch-Gordan coefficients for all possible + combination of l1 and l2, up to lambda_max. + """ + # real-to-complex and complex-to-real transformations as matrices + r2c = {} + c2r = {} + coeff_dict = {} + for lambda_ in range(0, lambda_max + 1): + c2r[lambda_] = _complex2real(lambda_) + r2c[lambda_] = _real2complex(lambda_) + + for l1 in range(lambda_max + 1): + for l2 in range(lambda_max + 1): + for lambda_ in range( + max(l1, l2) - min(l1, l2), min(lambda_max, (l1 + l2)) + 1 + ): + complex_cg = _complex_clebsch_gordan_matrix(l1, l2, lambda_) + + real_cg = (r2c[l1].T @ complex_cg.reshape(2 * l1 + 1, -1)).reshape( + complex_cg.shape + ) + + real_cg = real_cg.swapaxes(0, 1) + real_cg = (r2c[l2].T @ real_cg.reshape(2 * l2 + 1, -1)).reshape( + real_cg.shape + ) + real_cg = real_cg.swapaxes(0, 1) + + real_cg = real_cg @ c2r[lambda_].T + + if (l1 + l2 + lambda_) % 2 == 0: + cg_l1l2lam = np.real(real_cg) + else: + cg_l1l2lam = np.imag(real_cg) + + if sparse: + # Find the m1, m2, mu idxs of the nonzero CG coeffs + nonzeros_cg_coeffs_idx = np.where(np.abs(cg_l1l2lam) > 1e-15) + if use_mops: + # Store CG coeffs in a specific format for use in + # MOPS. Here we need the m1, m2, mu, and CG coeffs + # to be stored as separate 1D arrays. + m1_arr, m2_arr, mu_arr, C_arr = [], [], [], [] + for m1, m2, mu in zip(*nonzeros_cg_coeffs_idx): + m1_arr.append(m1) + m2_arr.append(m2) + mu_arr.append(mu) + C_arr.append(cg_l1l2lam[m1, m2, mu]) + + # Reorder the arrays based on sorted mu values + mu_idxs = np.argsort(mu_arr) + m1_arr = np.array(m1_arr)[mu_idxs] + m2_arr = np.array(m2_arr)[mu_idxs] + mu_arr = np.array(mu_arr)[mu_idxs] + C_arr = np.array(C_arr)[mu_idxs] + cg_l1l2lam = (C_arr, m1_arr, m2_arr, mu_arr) + else: + # Otherwise fall back to torch/numpy and store as + # sparse dicts. + cg_l1l2lam = { + (m1, m2, mu): cg_l1l2lam[m1, m2, mu] + for m1, m2, mu in zip(*nonzeros_cg_coeffs_idx) + } + + # Store + coeff_dict[(l1, l2, lambda_)] = cg_l1l2lam + + return coeff_dict + + +# ============================ +# ===== Helper functions +# ============================ + + +def _real2complex(lambda_: int) -> np.ndarray: + """ + Computes a matrix that can be used to convert from real to complex-valued + spherical harmonics(coefficients) of order ``lambda_``. + + This is meant to be applied to the left: ``real2complex @ [-lambda_, ..., + +lambda_]``. + + See https://en.wikipedia.org/wiki/Spherical_harmonics#Real_form for details + on the convention for how these tranformations are defined. + """ + result = np.zeros((2 * lambda_ + 1, 2 * lambda_ + 1), dtype=np.complex128) + inv_sqrt_2 = 1.0 / np.sqrt(2) + i_sqrt_2 = 1j / np.sqrt(2) + for m in range(-lambda_, lambda_ + 1): + if m < 0: + # Positve part + result[lambda_ + m, lambda_ + m] = +i_sqrt_2 + # Negative part + result[lambda_ - m, lambda_ + m] = -i_sqrt_2 * ((-1) ** m) + + if m == 0: + result[lambda_, lambda_] = +1.0 + + if m > 0: + # Negative part + result[lambda_ - m, lambda_ + m] = +inv_sqrt_2 + # Positive part + result[lambda_ + m, lambda_ + m] = +inv_sqrt_2 * ((-1) ** m) + + return result + + +def _complex2real(lambda_: int) -> np.ndarray: + """ + Converts from complex to real spherical harmonics. This is just given by the + conjugate tranpose of the real->complex transformation matrices. + """ + return np.conjugate(_real2complex(lambda_)).T + + +def _complex_clebsch_gordan_matrix(l1, l2, lambda_): + r"""clebsch-gordan matrix + Computes the Clebsch-Gordan (CG) matrix for + transforming complex-valued spherical harmonics. + The CG matrix is computed as a 3D array of elements + < l1 m1 l2 m2 | lambda_ mu > + where the first axis loops over m1, the second loops over m2, + and the third one loops over mu. The matrix is real. + For example, using the relation: + | l1 l2 lambda_ mu > = + \sum_{m1, m2} + | l1 m1 > | l2 m2 > + (https://en.wikipedia.org/wiki/Clebsch–Gordan_coefficients, section + "Formal definition of Clebsch-Gordan coefficients", eq 2) + one can obtain the spherical harmonics lambda_ from two sets of + spherical harmonics with l1 and l2 (up to a normalization factor). + E.g.: + Args: + l1: l number for the first set of spherical harmonics + l2: l number for the second set of spherical harmonics + lambda_: l number For the third set of spherical harmonics + Returns: + cg: CG matrix for transforming complex-valued spherical harmonics + >>> from scipy.special import sph_harm + >>> import numpy as np + >>> import wigners + >>> C_112 = _complex_clebsch_gordan_matrix(1, 1, 2) + >>> comp_sph_1 = np.array([sph_harm(m, 1, 0.2, 0.2) for m in range(-1, 1 + 1)]) + >>> comp_sph_2 = np.array([sph_harm(m, 1, 0.2, 0.2) for m in range(-1, 1 + 1)]) + >>> # obtain the (unnormalized) spherical harmonics + >>> # with l = 2 by contraction over m1 and m2 + >>> comp_sph_2_u = np.einsum("ijk,i,j->k", C_112, comp_sph_1, comp_sph_2) + >>> # we can check that they differ from the spherical harmonics + >>> # by a constant factor + >>> comp_sph_2 = np.array([sph_harm(m, 2, 0.2, 0.2) for m in range(-2, 2 + 1)]) + >>> ratio = comp_sph_2 / comp_sph_2_u + >>> np.allclose(ratio[0], ratio) + True + """ + if np.abs(l1 - l2) > lambda_ or np.abs(l1 + l2) < lambda_: + return np.zeros((2 * l1 + 1, 2 * l2 + 1, 2 * lambda_ + 1), dtype=np.double) + else: + return wigners.clebsch_gordan_array(l1, l2, lambda_) + + +# ================================================= +# ===== Functions for performing CG combinations +# ================================================= + + +def combine_arrays( + arr_1: Union[np.ndarray, TorchTensor], + arr_2: Union[np.ndarray, TorchTensor], + lambda_: int, + cg_cache, + return_empty_array: bool = False, +) -> Union[np.ndarray, TorchTensor]: + """ + Couples arrays `arr_1` and `arr_2` corresponding to the irreducible + spherical components of 2 angular channels l1 and l2 using the appropriate + Clebsch-Gordan coefficients. As l1 and l2 can be combined to form multiple + lambda channels, this function returns the coupling to a single specified + channel `lambda`. The angular channels l1 and l2 are inferred from the size + of the components axis (axis 1) of the input arrays. + + `arr_1` has shape (n_i, 2 * l1 + 1, n_p) and `arr_2` has shape (n_i, 2 * l2 + + 1, n_q). n_i is the number of samples, n_p and n_q are the number of + properties in each array. The number of samples in each array must be the + same. + + The ouput array has shape (n_i, 2 * lambda + 1, n_p * n_q), where lambda is + the input parameter `lambda_`. + + The Clebsch-Gordan coefficients are cached in `cg_cache`. Currently, these + must be produced by the ClebschGordanReal class in this module. These + coefficients can be stored in either sparse dictionaries or dense arrays. + + The combination operation is dispatched such that numpy arrays or torch + tensors are automatically handled. + + `return_empty_array` can be used to return an empty array of the correct + shape, without performing the CG combination step. This can be useful for + probing the outputs of CG iterations in terms of metadata without the + computational cost of performing the CG combinations - i.e. using the + function :py:func:`combine_single_center_to_body_order_metadata_only`. + + :param arr_1: array with the m values for l1 with shape [n_samples, 2 * l1 + + 1, n_q_properties] + :param arr_2: array with the m values for l2 with shape [n_samples, 2 * l2 + + 1, n_p_properties] + :param lambda_: int value of the resulting coupled channel + :param cg_cache: either a sparse dictionary with keys (m1, m2, mu) and array + values being sparse blocks of shape , or a dense array + of shape [(2 * l1 +1) * (2 * l2 +1), (2 * lambda_ + 1)]. + + :returns: array of shape [n_samples, (2*lambda_+1), q_properties * p_properties] + """ + # If just precomputing metadata, return an empty array + if return_empty_array: + return sparse_combine(arr_1, arr_2, lambda_, cg_cache, return_empty_array=True) + + # Otherwise, perform the CG combination + # Spare CG cache + if cg_cache.sparse: + return sparse_combine(arr_1, arr_2, lambda_, cg_cache, return_empty_array=False) + + # Dense CG cache + return dense_combine(arr_1, arr_2, lambda_, cg_cache) + + +def sparse_combine( + arr_1: Union[np.ndarray, TorchTensor], + arr_2: Union[np.ndarray, TorchTensor], + lambda_: int, + cg_cache, + return_empty_array: bool = False, +) -> Union[np.ndarray, TorchTensor]: + """ + Performs a Clebsch-Gordan combination step on 2 arrays using sparse + operations. The angular channel of each block is inferred from the size of + its component axis, and the blocks are combined to the desired output + angular channel `lambda_` using the appropriate Clebsch-Gordan coefficients. + + :param arr_1: array with the m values for l1 with shape [n_samples, 2 * l1 + + 1, n_q_properties] + :param arr_2: array with the m values for l2 with shape [n_samples, 2 * l2 + + 1, n_p_properties] + :param lambda_: int value of the resulting coupled channel + :param cg_cache: sparse dictionary with keys (m1, m2, mu) and array values + being sparse blocks of shape + + :returns: array of shape [n_samples, (2*lambda_+1), q_properties * p_properties] + """ + # Samples dimensions must be the same + assert arr_1.shape[0] == arr_2.shape[0] + + # Infer l1 and l2 from the len of the length of axis 1 of each tensor + l1 = (arr_1.shape[1] - 1) // 2 + l2 = (arr_2.shape[1] - 1) // 2 + + # Define other useful dimensions + n_i = arr_1.shape[0] # number of samples + n_p = arr_1.shape[2] # number of properties in arr_1 + n_q = arr_2.shape[2] # number of properties in arr_2 + + if return_empty_array: # used when only computing metadata + return _dispatch.zeros_like((n_i, 2 * lambda_ + 1, n_p * n_q), like=arr_1) + + if isinstance(arr_1, np.ndarray) and HAS_MOPS: + # Reshape + arr_1 = np.repeat(arr_1[:, :, :, None], n_q, axis=3).reshape( + n_i, 2 * l1 + 1, n_p * n_q + ) + arr_2 = np.repeat(arr_2[:, :, None, :], n_p, axis=2).reshape( + n_i, 2 * l2 + 1, n_p * n_q + ) + + arr_1 = _dispatch.swapaxes(arr_1, 1, 2).reshape(n_i * n_p * n_q, 2 * l1 + 1) + arr_2 = _dispatch.swapaxes(arr_2, 1, 2).reshape(n_i * n_p * n_q, 2 * l2 + 1) + + # Do SAP + arr_out = sap( + arr_1, + arr_2, + *cg_cache._coeffs[(l1, l2, lambda_)], + output_size=2 * lambda_ + 1, + ) + assert arr_out.shape == (n_i * n_p * n_q, 2 * lambda_ + 1) + + # Reshape back + arr_out = arr_out.reshape(n_i, n_p * n_q, 2 * lambda_ + 1) + arr_out = _dispatch.swapaxes(arr_out, 1, 2) + + return arr_out + + if isinstance(arr_1, np.ndarray) or isinstance(arr_1, TorchTensor): + # Initialise output array + arr_out = _dispatch.zeros_like((n_i, 2 * lambda_ + 1, n_p * n_q), like=arr_1) + + # Get the corresponding Clebsch-Gordan coefficients + cg_coeffs = cg_cache.coeffs[(l1, l2, lambda_)] + + # Fill in each mu component of the output array in turn + for m1, m2, mu in cg_coeffs.keys(): + # Broadcast arrays, multiply together and with CG coeff + arr_out[:, mu, :] += ( + arr_1[:, m1, :, None] * arr_2[:, m2, None, :] * cg_coeffs[(m1, m2, mu)] + ).reshape(n_i, n_p * n_q) + + return arr_out + + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def dense_combine( + arr_1: Union[np.ndarray, TorchTensor], + arr_2: Union[np.ndarray, TorchTensor], + lambda_: int, + cg_cache, +) -> Union[np.ndarray, TorchTensor]: + """ + Performs a Clebsch-Gordan combination step on 2 arrays using a dense + operation. The angular channel of each block is inferred from the size of + its component axis, and the blocks are combined to the desired output + angular channel `lambda_` using the appropriate Clebsch-Gordan coefficients. + + :param arr_1: array with the m values for l1 with shape [n_samples, 2 * l1 + + 1, n_q_properties] + :param arr_2: array with the m values for l2 with shape [n_samples, 2 * l2 + + 1, n_p_properties] + :param lambda_: int value of the resulting coupled channel + :param cg_cache: dense array of shape [(2 * l1 +1) * (2 * l2 +1), (2 * lambda_ + + 1)] + + :returns: array of shape [n_samples, (2*lambda_+1), q_properties * p_properties] + """ + if isinstance(arr_1, np.ndarray) or isinstance(arr_1, TorchTensor): + # Infer l1 and l2 from the len of the length of axis 1 of each tensor + l1 = (arr_1.shape[1] - 1) // 2 + l2 = (arr_2.shape[1] - 1) // 2 + cg_coeffs = cg_cache.coeffs[(l1, l2, lambda_)] + + # (samples None None l1_mu q) * (samples l2_mu p None None) + # -> (samples l2_mu p l1_mu q) we broadcast it in this way + # so we only need to do one swapaxes in the next step + arr_out = arr_1[:, None, None, :, :] * arr_2[:, :, :, None, None] + + # (samples l2_mu p l1_mu q) -> (samples q p l1_mu l2_mu) + arr_out = _dispatch.swapaxes(arr_out, 1, 4) + + # samples (q p l1_mu l2_mu) -> (samples (q p) (l1_mu l2_mu)) + arr_out = arr_out.reshape( + -1, + arr_1.shape[2] * arr_2.shape[2], + arr_1.shape[1] * arr_2.shape[1], + ) + + # (l1_mu l2_mu lam_mu) -> ((l1_mu l2_mu) lam_mu) + cg_coeffs = cg_coeffs.reshape(-1, 2 * lambda_ + 1) + + # (samples (q p) (l1_mu l2_mu)) @ ((l1_mu l2_mu) lam_mu) + # -> samples (q p) lam_mu + arr_out = arr_out @ cg_coeffs + + # (samples (q p) lam_mu) -> (samples lam_mu (q p)) + return _dispatch.swapaxes(arr_out, 1, 2) + + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) diff --git a/python/rascaline/rascaline/utils/clebsch_gordan/_dispatch.py b/python/rascaline/rascaline/utils/clebsch_gordan/_dispatch.py new file mode 100644 index 000000000..f44aa3713 --- /dev/null +++ b/python/rascaline/rascaline/utils/clebsch_gordan/_dispatch.py @@ -0,0 +1,133 @@ +""" +Module containing dispatch functions for numpy/torch CG combination operations. +""" +from typing import List, Optional + +import numpy as np + + +try: + import torch + from torch import Tensor as TorchTensor +except ImportError: + + class TorchTensor: + pass + + +UNKNOWN_ARRAY_TYPE = ( + "unknown array type, only numpy arrays and torch tensors are supported" +) + + +def unique(array, axis: Optional[int] = None): + """Find the unique elements of an array.""" + if isinstance(array, TorchTensor): + return torch.unique(array, dim=axis) + elif isinstance(array, np.ndarray): + return np.unique(array, axis=axis) + + +def int_range_like(min_val, max_val, like): + """Returns an array of integers from min to max, non-inclusive, based on the + type of `like`""" + if isinstance(like, TorchTensor): + return torch.arange(min_val, max_val, dtype=torch.int64, device=like.device) + elif isinstance(like, np.ndarray): + return np.arange(min_val, max_val).astype(np.int64) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def int_array_like(int_list: List[int], like): + """ + Converts the input list of int to a numpy array or torch tensor + based on the type of `like`. + """ + if isinstance(like, TorchTensor): + return torch.tensor(int_list, dtype=torch.int64, device=like.device) + elif isinstance(like, np.ndarray): + return np.array(int_list).astype(np.int64) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def concatenate(arrays, axis: Optional[int] = 0): + """Concatenate arrays along an axis.""" + if isinstance(arrays[0], TorchTensor): + return torch.cat(arrays, dim=axis) + elif isinstance(arrays[0], np.ndarray): + return np.concatenate(arrays, axis=axis) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def all(array, axis: Optional[int] = None): + """Test whether all array elements along a given axis evaluate to True. + + This function has the same behavior as + ``np.all(array,axis=axis)``. + """ + if isinstance(array, bool): + array = np.array(array) + if isinstance(array, list): + array = np.array(array) + + if isinstance(array, TorchTensor): + # torch.all has two implementation, and picks one depending if more than one + # parameter is given. The second one does not supports setting dim to `None` + if axis is None: + return torch.all(input=array) + else: + return torch.all(input=array, dim=axis) + elif isinstance(array, np.ndarray): + return np.all(a=array, axis=axis) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def any(array): + """Test whether any array elements along a given axis evaluate to True. + + This function has the same behavior as + ``np.any(array)``. + """ + if isinstance(array, bool): + array = np.array(array) + if isinstance(array, list): + array = np.array(array) + if isinstance(array, TorchTensor): + return torch.any(array) + elif isinstance(array, np.ndarray): + return np.any(array) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def zeros_like(shape, like): + """Return an array of zeros with the same shape and type as a given array. + + This function has the same behavior as + ``np.zeros_like(array)``. + """ + if isinstance(like, TorchTensor): + return torch.zeros( + shape, + requires_grad=like.requires_grad, + dtype=like.dtype, + device=like.device, + ) + elif isinstance(like, np.ndarray): + return np.zeros(shape, dtype=like.dtype) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) + + +def swapaxes(array, axis0: int, axis1: int): + """Swaps axes of an array.""" + if isinstance(array, TorchTensor): + return torch.swapaxes(array, axis0, axis1) + elif isinstance(array, np.ndarray): + return np.swapaxes(array, axis0, axis1) + else: + raise TypeError(UNKNOWN_ARRAY_TYPE) diff --git a/python/rascaline/rascaline/utils/clebsch_gordan/clebsch_gordan.py b/python/rascaline/rascaline/utils/clebsch_gordan/clebsch_gordan.py new file mode 100644 index 000000000..5572c7598 --- /dev/null +++ b/python/rascaline/rascaline/utils/clebsch_gordan/clebsch_gordan.py @@ -0,0 +1,772 @@ +""" +Module for computing Clebsch-gordan iterations with metatensor TensorMaps. +""" +import itertools +from typing import List, Optional, Tuple, Union + +from metatensor import Labels, TensorBlock, TensorMap + +from . import _cg_cache, _dispatch + + +# ====================================================================== +# ===== Public API functions +# ====================================================================== + + +def correlate_density( + density: TensorMap, + correlation_order: int, + angular_cutoff: Optional[int] = None, + selected_keys: Optional[Union[Labels, List[Labels]]] = None, + skip_redundant: Optional[Union[bool, List[bool]]] = False, + output_selection: Optional[Union[bool, List[bool]]] = None, +) -> Union[TensorMap, List[TensorMap]]: + """ + Takes iterative Clebsch-Gordan (CG) tensor products of a density descriptor + with itself up to the desired correlation order. Returns + :py:class:`TensorMap`(s) corresponding to the density correlations output + from the specified iteration(s). + + A density descriptor necessarily is body order 2 (i.e. correlation order 1), + but can be single- or multi-center. The output is a :py:class:`list` of + density correlations for each iteration specified in `output_selection`, up + to the target order passed in `correlation_order`. By default only the last + correlation (i.e. the correlation of order ``correlation_order``) is + returned. + + This function is an iterative special case of the more general + :py:func:`correlate_tensors`. As a density is being correlated with itself, + some redundant CG tensor products can be skipped with the `skip_redundant` + keyword. + + Selections on the angular and parity channels at each iteration can also be + controlled with arguments `angular_cutoff`, `angular_selection` and + `parity_selection`. + + :param density: A density descriptor of body order 2 (correlation order 1), + in :py:class:`TensorMap` format. This may be, for example, a rascaline + :py:class:`SphericalExpansion` or :py:class:`LodeSphericalExpansion`. + Alternatively, this could be multi-center descriptor, such as a pair + density. + :param correlation_order: The desired correlation order of the output + descriptor. Must be >= 1. + :param angular_cutoff: The maximum angular channel to compute at any given + CG iteration, applied globally to all iterations until the target + correlation order is reached. + :param selected_keys: :py:class:`Labels` or `List[:py:class:`Labels`]` + specifying the angular and/or parity channels to output at each + iteration. All :py:class:`Labels` objects passed here must only contain + key names "spherical_harmonics_l" and "inversion_sigma". If a single + :py:class:`Labels` object is passed, this is applied to the final + iteration only. If a :py:class:`list` of :py:class:`Labels` objects is + passed, each is applied to its corresponding iteration. If None is + passed, all angular and parity channels are output at each iteration, + with the global `angular_cutoff` applied if specified. + :param skip_redundant: Whether to skip redundant CG combinations. Defaults + to False, which means all combinations are performed. If a + :py:class:`list` of :py:class:`bool` is passed, this is applied to each + iteration. If a single :py:class:`bool` is passed, this is applied to + all iterations. + :param output_selection: A :py:class:`list` of :py:class:`bool` specifying + whether to output a :py:class:`TensorMap` for each iteration. If a + single :py:class:`bool` is passed as True, outputs from all iterations + will be returned. If a :py:class:`list` of :py:class:`bool` is passed, + this controls the output at each corresponding iteration. If None is + passed, only the final iteration is output. + + :return: A :py:class:`list` of :py:class:`TensorMap` corresponding to the + density correlations output from the specified iterations. If the output + from a single iteration is requested, a :py:class:`TensorMap` is + returned instead. + """ + return _correlate_density( + density, + correlation_order, + angular_cutoff, + selected_keys, + skip_redundant, + output_selection, + compute_metadata_only=False, + sparse=True, # sparse CG cache by default + ) + + +def correlate_density_metadata( + density: TensorMap, + correlation_order: int, + angular_cutoff: Optional[int] = None, + selected_keys: Optional[Union[Labels, List[Labels]]] = None, + skip_redundant: Optional[Union[bool, List[bool]]] = False, + output_selection: Optional[Union[bool, List[bool]]] = None, +) -> Union[TensorMap, List[TensorMap]]: + """ + Returns the metadata-only :py:class:`TensorMap`(s) that would be output by + the function :py:func:`correlate_density` under the same settings, without + perfoming the actual Clebsch-Gordan tensor products. See this function for + full documentation. + """ + + return _correlate_density( + density, + correlation_order, + angular_cutoff, + selected_keys, + skip_redundant, + output_selection, + compute_metadata_only=True, + ) + + +# ==================================================================== +# ===== Private functions that do the work on the TensorMap level +# ==================================================================== + + +def _correlate_density( + density: TensorMap, + correlation_order: int, + angular_cutoff: Optional[int] = None, + selected_keys: Optional[Union[Labels, List[Labels]]] = None, + skip_redundant: Optional[Union[bool, List[bool]]] = False, + output_selection: Optional[Union[bool, List[bool]]] = None, + compute_metadata_only: bool = False, + sparse: bool = True, +) -> Union[TensorMap, List[TensorMap]]: + """ + Performs the density correlations for public functions + :py:func:`correlate_density` and :py:func:`correlate_density_metadata`. + """ + # Check inputs + if correlation_order <= 1: + raise ValueError("`correlation_order` must be > 1") + # TODO: implement combinations of gradients too + if _dispatch.any([len(list(block.gradients())) > 0 for block in density]): + raise NotImplementedError( + "Clebsch Gordan combinations with gradients not yet implemented." + " Use metatensor.remove_gradients to remove gradients from the input." + ) + # Check metadata + if not ( + _dispatch.all(density.keys.names == ["spherical_harmonics_l", "species_center"]) + or _dispatch.all( + density.keys.names + == ["spherical_harmonics_l", "species_center", "species_neighbor"] + ) + ): + raise ValueError( + "input `density` must have key names" + ' ["spherical_harmonics_l", "species_center"] or' + ' ["spherical_harmonics_l", "species_center", "species_neighbor"]' + ) + if not _dispatch.all(density.component_names == ["spherical_harmonics_m"]): + raise ValueError( + "input `density` must have a single component" + " axis with name `spherical_harmonics_m`" + ) + n_iterations = correlation_order - 1 # num iterations + density = _standardize_keys(density) # standardize metadata + density_correlation = density # create a copy to combine with itself + + # Parse the selected keys + selected_keys = _parse_selected_keys( + n_iterations=n_iterations, + angular_cutoff=angular_cutoff, + selected_keys=selected_keys, + like=density.keys.values, + ) + # Parse the bool flags that control skipping of redundant CG combinations + # and TensorMap output from each iteration + skip_redundant, output_selection = _parse_bool_iteration_filters( + n_iterations, + skip_redundant=skip_redundant, + output_selection=output_selection, + ) + + # Pre-compute the keys needed to perform each CG iteration + key_metadata = _precompute_keys( + density.keys, + density.keys, + n_iterations=n_iterations, + selected_keys=selected_keys, + skip_redundant=skip_redundant, + ) + # Compute CG coefficient cache + if compute_metadata_only: + cg_cache = None + else: + angular_max = max( + _dispatch.concatenate( + [density.keys.column("spherical_harmonics_l")] + + [mdata[2].column("spherical_harmonics_l") for mdata in key_metadata] + ) + ) + # TODO: keys have been precomputed, so perhaps we don't need to + # compute all CG coefficients up to angular_max here. + # TODO: use sparse cache by default until we understand under which + # circumstances (and if) dense is faster. + cg_cache = _cg_cache.ClebschGordanReal(angular_max, sparse=sparse) + + # Perform iterative CG tensor products + density_correlations = [] + for iteration in range(n_iterations): + # Define the correlation order of the current iteration + correlation_order_it = iteration + 2 + + # Combine block pairs + blocks_out = [] + for key_1, key_2, key_out in zip(*key_metadata[iteration]): + block_out = _combine_blocks_same_samples( + density_correlation[key_1], + density[key_2], + key_out["spherical_harmonics_l"], + cg_cache, + compute_metadata_only=compute_metadata_only, + ) + blocks_out.append(block_out) + keys_out = key_metadata[iteration][2] + density_correlation = TensorMap(keys=keys_out, blocks=blocks_out) + + # If this tensor is to be included in the output, move the [l1, l2, ...] + # keys to properties and store + if output_selection[iteration]: + density_correlations.append( + density_correlation.keys_to_properties( + [f"l{i}" for i in range(1, correlation_order_it + 1)] + + [f"k{i}" for i in range(2, correlation_order_it)] + ) + ) + + # Drop redundant key names. TODO: these should be part of the global + # matadata associated with the TensorMap. Awaiting this functionality in + # metatensor. + for i, tensor in enumerate(density_correlations): + keys = tensor.keys + if len(_dispatch.unique(tensor.keys.column("order_nu"))) == 1: + keys = keys.remove(name="order_nu") + if len(_dispatch.unique(tensor.keys.column("inversion_sigma"))) == 1: + keys = keys.remove(name="inversion_sigma") + density_correlations[i] = TensorMap( + keys=keys, blocks=[b.copy() for b in tensor.blocks()] + ) + + # Return a single TensorMap in the simple case + if len(density_correlations) == 1: + return density_correlations[0] + + # Otherwise return a list of TensorMaps + return density_correlations + + +# ================================================================== +# ===== Functions to handle metadata +# ================================================================== + + +def _standardize_keys(tensor: TensorMap) -> TensorMap: + """ + Takes a nu=1 tensor and standardizes its metadata. This involves: 1) moving + the "species_neighbor" key to properties, if present as a dimension in the + keys, and 2) adding dimensions in the keys for tracking the body order + ("order_nu") and parity ("inversion_sigma") of the blocks. + + Checking for the presence of the "species_neighbor" key in the keys allows + the option of the user pre-moving this key to the properties before calling + `n_body_iteration_single_center`, allowing sparsity in a set of global + neighbors to be created if desired. + + Assumes that the input `tensor` is nu=1, and has only even parity blocks. + """ + if "species_neighbor" in tensor.keys.names: + tensor = tensor.keys_to_properties(keys_to_move="species_neighbor") + keys = tensor.keys.insert( + name="order_nu", + values=_dispatch.int_array_like([1], like=tensor.keys.values), + index=0, + ) + keys = keys.insert( + name="inversion_sigma", + values=_dispatch.int_array_like([1], like=tensor.keys.values), + index=1, + ) + return TensorMap(keys=keys, blocks=[b.copy() for b in tensor.blocks()]) + + +def _parse_selected_keys( + n_iterations: int, + angular_cutoff: Optional[int] = None, + selected_keys: Optional[Union[Labels, List[Labels]]] = None, + like=None, +) -> List[Union[None, Labels]]: + """ + Parses the `selected_keys` argument passed to public functions. Checks the + values and returns a :py:class:`list` of :py:class:`Labels` objects, one for + each iteration of CG combination. + + `like` is required if a new :py:class:`Labels` object is to be created by + :py:mod:`_dispatch`. + """ + # Check angular_cutoff arg + if angular_cutoff is not None: + if not isinstance(angular_cutoff, int): + raise TypeError("`angular_cutoff` must be passed as an int") + if angular_cutoff < 1: + raise ValueError("`angular_cutoff` must be >= 1") + + if selected_keys is None: + if angular_cutoff is None: # no selections at all + selected_keys = [None] * n_iterations + else: + # Create a key selection with all angular channels <= the specified + # angular cutoff + selected_keys = [ + Labels( + names=["spherical_harmonics_l"], + values=_dispatch.int_range_like( + 0, angular_cutoff, like=like + ).reshape(-1, 1), + ) + ] * n_iterations + + if isinstance(selected_keys, Labels): + # Create a list, but only apply a key selection at the final iteration + selected_keys = [None] * (n_iterations - 1) + [selected_keys] + + # Check the selected_keys + if not isinstance(selected_keys, List): + raise TypeError( + "`selected_keys` must be a `Labels` or List[Union[None, `Labels`]]" + ) + if not len(selected_keys) == n_iterations: + raise ValueError( + "`selected_keys` must be a List[Union[None, Labels]] of length" + " `correlation_order` - 1" + ) + if not _dispatch.all( + [isinstance(val, (Labels, type(None))) for val in selected_keys] + ): + raise TypeError("`selected_keys` must be a Labels or List[Union[None, Labels]]") + + # Now iterate over each of the Labels (or None) in the list and check + for slct in selected_keys: + if slct is None: + continue + assert isinstance(slct, Labels) + if not _dispatch.all( + [ + name in ["spherical_harmonics_l", "inversion_sigma"] + for name in slct.names + ] + ): + raise ValueError( + "specified key names in `selected_keys` must be either" + " 'spherical_harmonics_l' or 'inversion_sigma'" + ) + if "spherical_harmonics_l" in slct.names: + if angular_cutoff is not None: + if not _dispatch.all( + slct.column("spherical_harmonics_l") <= angular_cutoff + ): + raise ValueError( + "specified angular channels in `selected_keys` must be <= the" + " specified `angular_cutoff`" + ) + if not _dispatch.all( + [angular_l >= 0 for angular_l in slct.column("spherical_harmonics_l")] + ): + raise ValueError( + "specified angular channels in `selected_keys` must be >= 0" + ) + if "inversion_sigma" in slct.names: + if not _dispatch.all( + [parity_s in [-1, +1] for parity_s in slct.column("inversion_sigma")] + ): + raise ValueError( + "specified parities in `selected_keys` must be -1 or +1" + ) + + return selected_keys + + +def _parse_bool_iteration_filters( + n_iterations: int, + skip_redundant: Optional[Union[bool, List[bool]]] = False, + output_selection: Optional[Union[bool, List[bool]]] = None, +) -> List[List[bool]]: + """ + Parses the `skip_redundant` and `output_selection` arguments passed to + public functions. + """ + if isinstance(skip_redundant, bool): + skip_redundant = [skip_redundant] * n_iterations + if not _dispatch.all([isinstance(val, bool) for val in skip_redundant]): + raise TypeError("`skip_redundant` must be a `bool` or `list` of `bool`") + if not len(skip_redundant) == n_iterations: + raise ValueError( + "`skip_redundant` must be a bool or `list` of `bool` of length" + " `correlation_order` - 1" + ) + if output_selection is None: + output_selection = [False] * (n_iterations - 1) + [True] + else: + if isinstance(output_selection, bool): + output_selection = [output_selection] * n_iterations + if not isinstance(output_selection, List): + raise TypeError("`output_selection` must be passed as `list` of `bool`") + + if not len(output_selection) == n_iterations: + raise ValueError( + "`output_selection` must be a ``list`` of ``bool`` of length" + " corresponding to the number of CG iterations" + ) + if not _dispatch.all([isinstance(v, bool) for v in output_selection]): + raise TypeError("`output_selection` must be passed as a `list` of `bool`") + if not _dispatch.all([isinstance(v, bool) for v in output_selection]): + raise TypeError("`output_selection` must be passed as a `list` of `bool`") + + return skip_redundant, output_selection + + +def _precompute_keys( + keys_1: Labels, + keys_2: Labels, + n_iterations: int, + selected_keys: List[Union[None, Labels]], + skip_redundant: List[bool], +) -> List[Tuple[Labels, List[List[int]]]]: + """ + Computes all the keys metadata needed to perform `n_iterations` of CG + combination steps. + + At each iteration, a full product of the keys of two tensors, i.e. `keys_1` + and `keys_2` is computed. Then, key selections are applied according to the + user-defined settings: the maximum angular channel cutoff + (`angular_cutoff`), and angular and/or parity selections specified in + `selected_keys`. + + If `skip_redundant` is True, then keys that represent redundant CG + operations are not included in the output keys at each step. + """ + keys_metadata = [] + keys_out = keys_1 + for iteration in range(n_iterations): + # Get the keys metadata for the combination of the 2 tensors + keys_1_entries, keys_2_entries, keys_out = _precompute_keys_full_product( + keys_1=keys_out, + keys_2=keys_2, + ) + if selected_keys[iteration] is not None: + keys_1_entries, keys_2_entries, keys_out = _apply_key_selection( + keys_1_entries, + keys_2_entries, + keys_out, + selected_keys=selected_keys[iteration], + ) + + if skip_redundant[iteration]: + keys_1_entries, keys_2_entries, keys_out = _remove_redundant_keys( + keys_1_entries, keys_2_entries, keys_out + ) + + # Check that some keys are produced as a result of the combination + if len(keys_out) == 0: + raise ValueError( + f"invalid selections: iteration {iteration + 1} produces no" + " valid combinations. Check the `angular_cutoff` and" + " `selected_keys` args and try again." + ) + + keys_metadata.append((keys_1_entries, keys_2_entries, keys_out)) + + return keys_metadata + + +def _precompute_keys_full_product( + keys_1: Labels, keys_2: Labels +) -> Tuple[List, List, Labels]: + """ + Given the keys of 2 TensorMaps, returns the keys that would be present after + a full CG product of these TensorMaps. + + Assumes that `keys_1` corresponds to a TensorMap with arbitrary body order, + while `keys_2` corresponds to a TensorMap with body order 1. `keys_1` must + follow the key name convention: + + ["order_nu", "inversion_sigma", "spherical_harmonics_l", "species_center", + "l1", "l2", ..., f"l{`nu`}", "k2", ..., f"k{`nu`-1}"]. The "lx" columns + track the l values of the nu=1 blocks that were previously combined. The + "kx" columns tracks the intermediate lambda values of nu > 1 blocks that + have been combined. + + For instance, a TensorMap of body order nu=4 will have key names + ["order_nu", "inversion_sigma", "spherical_harmonics_l", "species_center", + "l1", "l2", "l3", "l4", "k2", "k3"]. Two nu=1 TensorMaps with blocks of + order "l1" and "l2" were combined to form a nu=2 TensorMap with blocks of + order "k2". This was combined with a nu=1 TensorMap with blocks of order + "l3" to form a nu=3 TensorMap with blocks of order "k3". Finally, this was + combined with a nu=1 TensorMap with blocks of order "l4" to form a nu=4. + + .. math :: + + \\bra{ + n_1 l_1 ; n_2 l_2 k_2 ; ... ; + n_{\nu-1} l_{\\nu-1} k_{\\nu-1} ; + n_{\\nu} l_{\\nu} k_{\\nu}; \\lambda + } + \\ket{ \\rho^{\\otimes \\nu}; \\lambda M } + + `keys_2` must follow the key name convention: ["order_nu", + "inversion_sigma", "spherical_harmonics_l", "species_center"] + + Returned is Tuple[List, List, Labels]. The first two lists correspond to the + LabelsEntry objects of the keys being combined. The third element is a + Labels object corresponding to the keys of the output TensorMap. Each entry + in this Labels object corresponds to the keys is formed by combination of + the pair of blocks indexed by correspoding key pairs in the first two lists. + """ + # Get the correlation order of the first TensorMap. + unique_nu = _dispatch.unique(keys_1.column("order_nu")) + if len(unique_nu) > 1: + raise ValueError( + "keys_1 must correspond to a tensor of a single correlation order." + f" Found {len(unique_nu)} body orders: {unique_nu}" + ) + nu1 = unique_nu[0] + + # Define new correlation order of output TensorMap + nu = nu1 + 1 + + # The correlation order of the second TensorMap should be nu = 1. + assert _dispatch.all(keys_2.column("order_nu") == 1) + + # If nu1 = 1, the key names don't yet have any "lx" columns + if nu1 == 1: + l_list_names = [] + new_l_list_names = ["l1", "l2"] + else: + l_list_names = [f"l{angular_l}" for angular_l in range(1, nu1 + 1)] + new_l_list_names = l_list_names + [f"l{nu}"] + + # Check key names + assert _dispatch.all( + keys_1.names + == ["order_nu", "inversion_sigma", "spherical_harmonics_l", "species_center"] + + l_list_names + + [f"k{k}" for k in range(2, nu1)] + ) + assert _dispatch.all( + keys_2.names + == ["order_nu", "inversion_sigma", "spherical_harmonics_l", "species_center"] + ) + + # Define key names of output Labels (i.e. for combined TensorMap) + new_names = ( + ["order_nu", "inversion_sigma", "spherical_harmonics_l", "species_center"] + + new_l_list_names + + [f"k{k}" for k in range(2, nu)] + ) + + new_key_values = [] + keys_1_entries = [] + keys_2_entries = [] + for key_1, key_2 in itertools.product(keys_1, keys_2): + # Unpack relevant key values + sig1, lam1, a = key_1.values[1:4] + sig2, lam2, a2 = key_2.values[1:4] + + # Only combine blocks of the same chemical species + if a != a2: + continue + + # First calculate the possible non-zero angular channels that can be + # formed from combination of blocks of order `lam1` and `lam2`. This + # corresponds to values in the inclusive range { |lam1 - lam2|, ..., + # |lam1 + lam2| } + nonzero_lams = _dispatch.int_range_like( + abs(lam1 - lam2), abs(lam1 + lam2) + 1, like=key_1.values + ) + + # Now iterate over the non-zero angular channels and apply the custom + # selections + for lambda_ in nonzero_lams: + # Calculate new sigma + sig = sig1 * sig2 * (-1) ** (lam1 + lam2 + lambda_) + + # Extract the l and k lists from keys_1 + l_list = key_1.values[4 : 4 + nu1].tolist() + k_list = key_1.values[4 + nu1 :].tolist() + + # Build the new keys values. l{nu} is `lam2`` (i.e. + # "spherical_harmonics_l" of the key from `keys_2`. k{nu-1} is + # `lam1` (i.e. "spherical_harmonics_l" of the key from `keys_1`). + new_vals = [nu, sig, lambda_, a] + l_list + [lam2] + k_list + [lam1] + new_key_values.append(new_vals) + keys_1_entries.append(key_1) + keys_2_entries.append(key_2) + + # Define new keys as the full product of keys_1 and keys_2 + keys_out = Labels( + names=new_names, + values=_dispatch.int_array_like(new_key_values, like=keys_1.values), + ) + + return keys_1_entries, keys_2_entries, keys_out + + +def _apply_key_selection( + keys_1_entries: List, keys_2_entries: List, keys_out: Labels, selected_keys: Labels +) -> Tuple[List, List, Labels]: + """ + Applies a selection according to `selected_keys` to the keys of an output + TensorMap `keys_out` produced by combination of blocks indexed by keys + entries in `keys_1_entries` and `keys_2_entries` lists. + + After application of the selections, returned is a reduced set of keys and + set of corresponding parents key entries. + + If a selection in `selected_keys` is not valid based on the keys in + `keys_out`, an error is raised. + """ + # Extract the relevant columns from `selected_keys` that the selection will + # be performed on + keys_out_vals = [[k[name] for name in selected_keys.names] for k in keys_out] + + # First check that all of the selected keys exist in the output keys + for slct in selected_keys.values: + if not _dispatch.any([_dispatch.all(slct == k) for k in keys_out_vals]): + raise ValueError( + f"selected key {selected_keys.names} = {slct} not found" + " in the output keys. Check the `selected_keys` argument." + ) + + # Build a mask of the selected keys + mask = [ + _dispatch.any([_dispatch.all(i == j) for j in selected_keys.values]) + for i in keys_out_vals + ] + + # Apply the mask to key entries and keys and return + keys_1_entries = [k for k, isin in zip(keys_1_entries, mask) if isin] + keys_2_entries = [k for k, isin in zip(keys_2_entries, mask) if isin] + keys_out = Labels(names=keys_out.names, values=keys_out.values[mask]) + + return keys_1_entries, keys_2_entries, keys_out + + +def _remove_redundant_keys( + keys_1_entries: List, keys_2_entries: List, keys_out: Labels +) -> Tuple[List, List, Labels]: + """ + For a Labels object `keys_out` that corresponds to the keys of a TensorMap + formed by combined of the blocks described by the entries in the lists + `keys_1_entries` and `keys_2_entries`, removes redundant keys. + + These are the keys that correspond to blocks that have the same sorted l + list. The block where the l values are already sorted (i.e. l1 <= l2 <= ... + <= ln) is kept. + """ + # Get and check the correlation order of the input keys + nu1 = keys_1_entries[0]["order_nu"] + nu2 = keys_2_entries[0]["order_nu"] + assert nu2 == 1 + + # Get the correlation order of the output TensorMap + nu = nu1 + 1 + + # Identify keys of redundant blocks and remove them + key_idxs_to_keep = [] + for key_idx, key in enumerate(keys_out): + # Get the important key values. This is all of the keys, excpet the k + # list + key_vals_slice = key.values[: 4 + (nu + 1)].tolist() + first_part, l_list = key_vals_slice[:4], key_vals_slice[4:] + + # Sort the l list + l_list_sorted = sorted(l_list) + + # Compare the sliced key with the one recreated when the l list is + # sorted. If they are identical, this is the key of the block that we + # want to compute a CG combination for. + key_slice_tuple = tuple(first_part + l_list) + key_slice_sorted_tuple = tuple(first_part + l_list_sorted) + if _dispatch.all(key_slice_tuple == key_slice_sorted_tuple): + key_idxs_to_keep.append(key_idx) + + # Build a reduced Labels object for the combined keys, with redundancies removed + keys_out_red = Labels( + names=keys_out.names, + values=_dispatch.int_array_like( + [keys_out[idx].values for idx in key_idxs_to_keep], + like=keys_1_entries[0].values, + ), + ) + + # Store the list of reduced entries that combine to form the reduced output keys + keys_1_entries_red = [keys_1_entries[idx] for idx in key_idxs_to_keep] + keys_2_entries_red = [keys_2_entries[idx] for idx in key_idxs_to_keep] + + return keys_1_entries_red, keys_2_entries_red, keys_out_red + + +# ================================================================== +# ===== Functions to perform the CG combinations of blocks +# ================================================================== + + +def _combine_blocks_same_samples( + block_1: TensorBlock, + block_2: TensorBlock, + lambda_: int, + cg_cache, + compute_metadata_only: bool = False, +) -> TensorBlock: + """ + For a given pair of TensorBlocks and desired angular channel, combines the + values arrays and returns a new TensorBlock. + """ + + # Do the CG combination - single center so no shape pre-processing required + if compute_metadata_only: + combined_values = _cg_cache.combine_arrays( + block_1.values, block_2.values, lambda_, cg_cache, return_empty_array=True + ) + else: + combined_values = _cg_cache.combine_arrays( + block_1.values, block_2.values, lambda_, cg_cache, return_empty_array=False + ) + + # Infer the new nu value: block 1's properties are nu pairs of + # "species_neighbor_x" and "nx". + combined_nu = int((len(block_1.properties.names) / 2) + 1) + + # Define the new property names for "nx" and "species_neighbor_x" + n_names = [f"n{i}" for i in range(1, combined_nu + 1)] + neighbor_names = [f"species_neighbor_{i}" for i in range(1, combined_nu + 1)] + prop_names = [item for i in zip(neighbor_names, n_names) for item in i] + + # Create a TensorBlock + combined_block = TensorBlock( + values=combined_values, + samples=block_1.samples, + components=[ + Labels( + names=["spherical_harmonics_m"], + values=_dispatch.int_range_like( + min_val=-lambda_, max_val=lambda_ + 1, like=block_1.values + ).reshape(-1, 1), + ), + ], + properties=Labels( + names=prop_names, + values=_dispatch.int_array_like( + [ + _dispatch.concatenate((b2, b1)) + for b2 in block_2.properties.values + for b1 in block_1.properties.values + ], + like=block_1.values, + ), + ), + ) + + return combined_block diff --git a/python/rascaline/tests/utils/clebsch_gordan.py b/python/rascaline/tests/utils/clebsch_gordan.py new file mode 100644 index 000000000..38b7fab32 --- /dev/null +++ b/python/rascaline/tests/utils/clebsch_gordan.py @@ -0,0 +1,540 @@ +# -*- coding: utf-8 -*- +import os +from typing import List + +import metatensor +import numpy as np +import pytest +from metatensor import Labels, TensorBlock, TensorMap + +import rascaline +from rascaline.utils import PowerSpectrum +from rascaline.utils.clebsch_gordan._cg_cache import ClebschGordanReal +from rascaline.utils.clebsch_gordan.clebsch_gordan import ( + _correlate_density, + _standardize_keys, + correlate_density, + correlate_density_metadata, +) + + +# Try to import some modules +ase = pytest.importorskip("ase") +import ase.io # noqa: E402 + + +try: + import metatensor.operations + + HAS_METATENSOR_OPERATIONS = True +except ImportError: + HAS_METATENSOR_OPERATIONS = False +try: + import sympy # noqa: F401 + + HAS_SYMPY = True +except ImportError: + HAS_SYMPY = False + +if HAS_SYMPY: + from .rotations import WignerDReal, transform_frame_o3, transform_frame_so3 + + +DATA_ROOT = os.path.join(os.path.dirname(__file__), "data") + +SPHEX_HYPERS = { + "cutoff": 3.0, # Angstrom + "max_radial": 6, # Exclusive + "max_angular": 4, # Inclusive + "atomic_gaussian_width": 0.2, + "radial_basis": {"Gto": {}}, + "cutoff_function": {"ShiftedCosine": {"width": 0.5}}, + "center_atom_weight": 1.0, +} + +SPHEX_HYPERS_SMALL = { + "cutoff": 3.0, # Angstrom + "max_radial": 1, # Exclusive + "max_angular": 2, # Inclusive + "atomic_gaussian_width": 0.2, + "radial_basis": {"Gto": {}}, + "cutoff_function": {"ShiftedCosine": {"width": 0.5}}, + "center_atom_weight": 1.0, +} + + +# ============ Pytest fixtures ============ + + +@pytest.fixture() +def cg_cache_sparse(): + return ClebschGordanReal(lambda_max=5, sparse=True) + + +@pytest.fixture() +def cg_cache_dense(): + return ClebschGordanReal(lambda_max=5, sparse=False) + + +# ============ Helper functions ============ + + +def h2_isolated(): + return ase.io.read(os.path.join(DATA_ROOT, "h2_isolated.xyz"), ":") + + +def h2o_isolated(): + return ase.io.read(os.path.join(DATA_ROOT, "h2o_isolated.xyz"), ":") + + +def h2o_periodic(): + return ase.io.read(os.path.join(DATA_ROOT, "h2o_periodic.xyz"), ":") + + +def wigner_d_matrices(lmax: int): + return WignerDReal(lmax=lmax) + + +def spherical_expansion(frames: List[ase.Atoms]): + """Returns a rascaline SphericalExpansion""" + calculator = rascaline.SphericalExpansion(**SPHEX_HYPERS) + return calculator.compute(frames) + + +def spherical_expansion_small(frames: List[ase.Atoms]): + """Returns a rascaline SphericalExpansion""" + calculator = rascaline.SphericalExpansion(**SPHEX_HYPERS_SMALL) + return calculator.compute(frames) + + +def power_spectrum(frames: List[ase.Atoms]): + """Returns a rascaline PowerSpectrum constructed from a + SphericalExpansion""" + return PowerSpectrum(rascaline.SphericalExpansion(**SPHEX_HYPERS)).compute(frames) + + +def power_spectrum_small(frames: List[ase.Atoms]): + """Returns a rascaline PowerSpectrum constructed from a + SphericalExpansion""" + return PowerSpectrum(rascaline.SphericalExpansion(**SPHEX_HYPERS_SMALL)).compute( + frames + ) + + +def get_norm(tensor: TensorMap): + """ + Calculates the norm used in CG iteration tests. Assumes that the TensorMap + is sliced to a single sample. + + For a given atomic sample, the norm is calculated for each feature vector, + as a sum over lambda, sigma, and m. + """ + # Check that there is only one sample + assert ( + len( + metatensor.unique_metadata( + tensor, "samples", ["structure", "center", "species_center"] + ).values + ) + == 1 + ) + norm = 0.0 + for key, block in tensor.items(): # Sum over lambda and sigma + angular_l = key["spherical_harmonics_l"] + norm += np.sum( + [ + np.linalg.norm(block.values[0, m, :]) ** 2 + for m in range(-angular_l, angular_l + 1) + ] + ) + + return norm + + +# ============ Test equivariance ============ + + +@pytest.mark.skipif( + not HAS_SYMPY or not HAS_METATENSOR_OPERATIONS, + reason="SymPy or metatensor-operations are not installed", +) +@pytest.mark.parametrize( + "frames, nu_target, angular_cutoff, selected_keys", + [ + ( + h2_isolated(), + 3, + None, + Labels( + names=["spherical_harmonics_l", "inversion_sigma"], + values=np.array([[0, 1], [4, 1], [5, 1]]), + ), + ), + (h2o_isolated(), 2, 5, None), + (h2o_periodic(), 2, 5, None), + ], +) +def test_so3_equivariance(frames, nu_target, angular_cutoff, selected_keys): + """ + Tests that the output of :py:func:`correlate_density` is equivariant under + SO(3) transformations. + """ + wig = wigner_d_matrices(nu_target * SPHEX_HYPERS["max_angular"]) + frames_so3 = [transform_frame_so3(frame, wig.angles) for frame in frames] + + nu_1 = spherical_expansion(frames) + nu_1_so3 = spherical_expansion(frames_so3) + + nu_3 = correlate_density( + density=nu_1, + correlation_order=nu_target, + angular_cutoff=angular_cutoff, + selected_keys=selected_keys, + ) + nu_3_so3 = correlate_density( + density=nu_1_so3, + correlation_order=nu_target, + angular_cutoff=angular_cutoff, + selected_keys=selected_keys, + ) + + nu_3_transf = wig.transform_tensormap_so3(nu_3) + assert metatensor.allclose(nu_3_transf, nu_3_so3) + + +@pytest.mark.skipif( + not HAS_SYMPY or not HAS_METATENSOR_OPERATIONS, + reason="SymPy or metatensor-operations are not installed", +) +@pytest.mark.parametrize( + "frames, nu_target, angular_cutoff, selected_keys", + [ + ( + h2_isolated(), + 3, + None, + Labels( + names=["spherical_harmonics_l"], + values=np.array([0, 4, 5]).reshape(-1, 1), + ), + ), + (h2o_isolated(), 2, 5, None), + (h2o_periodic(), 2, 5, None), + ], +) +def test_o3_equivariance(frames, nu_target, angular_cutoff, selected_keys): + """ + Tests that the output of :py:func:`correlate_density` is equivariant under + O(3) transformations. + """ + wig = wigner_d_matrices(nu_target * SPHEX_HYPERS["max_angular"]) + frames_o3 = [transform_frame_o3(frame, wig.angles) for frame in frames] + + nu_1 = spherical_expansion(frames) + nu_1_o3 = spherical_expansion(frames_o3) + + nu_3 = correlate_density( + density=nu_1, + correlation_order=nu_target, + angular_cutoff=angular_cutoff, + selected_keys=selected_keys, + ) + nu_3_o3 = correlate_density( + density=nu_1_o3, + correlation_order=nu_target, + angular_cutoff=angular_cutoff, + selected_keys=selected_keys, + ) + + nu_3_transf = wig.transform_tensormap_o3(nu_3) + assert metatensor.allclose(nu_3_transf, nu_3_o3) + + +# ============ Test lambda-SOAP vs PowerSpectrum ============ + + +@pytest.mark.skipif( + not HAS_METATENSOR_OPERATIONS, reason="metatensor-operations is not installed" +) +@pytest.mark.parametrize("frames", [h2_isolated()]) +@pytest.mark.parametrize( + "sphex_powspec", + [ + (spherical_expansion, power_spectrum), + (spherical_expansion_small, power_spectrum_small), + ], +) +def test_lambda_soap_vs_powerspectrum(frames, sphex_powspec): + """ + Tests for exact equivalence between the invariant block of a generated + lambda-SOAP equivariant and the Python implementation of PowerSpectrum in + rascaline utils. + """ + # Build a PowerSpectrum + ps = sphex_powspec[1](frames) + + # Build a lambda-SOAP + density = sphex_powspec[0](frames) + lsoap = correlate_density( + density=density, + correlation_order=2, + selected_keys=Labels( + names=["spherical_harmonics_l"], values=np.array([0]).reshape(-1, 1) + ), + ) + keys = lsoap.keys.remove(name="spherical_harmonics_l") + lsoap = TensorMap(keys=keys, blocks=[b.copy() for b in lsoap.blocks()]) + + # Manipulate metadata to match that of PowerSpectrum: + # 1) remove components axis + # 2) change "l1" and "l2" properties dimensions to just "l" (as l1 == l2) + blocks = [] + for block in lsoap.blocks(): + n_samples, n_props = block.values.shape[0], block.values.shape[2] + new_props = block.properties + new_props = new_props.remove(name="l1") + new_props = new_props.rename(old="l2", new="l") + blocks.append( + TensorBlock( + values=block.values.reshape((n_samples, n_props)), + samples=block.samples, + components=[], + properties=new_props, + ) + ) + lsoap = TensorMap(keys=lsoap.keys, blocks=blocks) + + # Compare metadata + assert metatensor.equal_metadata(lsoap, ps) + + # allclose on values + assert metatensor.allclose(lsoap, ps) + + +# ============ Test norm preservation ============ + + +@pytest.mark.skipif( + not HAS_METATENSOR_OPERATIONS, reason="metatensor-operations is not installed" +) +@pytest.mark.parametrize("frames", [h2_isolated(), h2o_periodic()]) +@pytest.mark.parametrize("correlation_order", [2, 3, 4]) +def test_correlate_density_norm(frames, correlation_order): + """ + Checks \\|ρ^\\nu\\| = \\|ρ\\|^\\nu in the case where l lists are not + sorted. If l lists are sorted, thus saving computation of redundant block + combinations, the norm check will not hold for target body order greater + than 2. + """ + + # Build nu=1 SphericalExpansion + nu1 = spherical_expansion_small(frames) + + # Build higher body order tensor without sorting the l lists + nux = correlate_density( + nu1, + correlation_order=correlation_order, + angular_cutoff=None, + selected_keys=None, + skip_redundant=False, + ) + # Build higher body order tensor *with* sorting the l lists + nux_sorted_l = correlate_density( + nu1, + correlation_order=correlation_order, + angular_cutoff=None, + selected_keys=None, + skip_redundant=True, + ) + + # Standardize the features by passing through the CG combination code but with + # no iterations (i.e. body order 1 -> 1) + nu1 = _standardize_keys(nu1) + + # Make only lambda and sigma part of keys + nu1 = nu1.keys_to_samples(["species_center"]) + nux = nux.keys_to_samples(["species_center"]) + nux_sorted_l = nux_sorted_l.keys_to_samples(["species_center"]) + + # The norm shoudl be calculated for each sample. First find the unqiue + # samples + uniq_samples = metatensor.unique_metadata( + nux, "samples", names=["structure", "center", "species_center"] + ) + grouped_labels = [ + Labels(names=nux.sample_names, values=uniq_samples.values[i].reshape(1, 3)) + for i in range(len(uniq_samples)) + ] + + # Calculate norms + norm_nu1 = 0.0 + norm_nux = 0.0 + norm_nux_sorted_l = 0.0 + for sample in grouped_labels: + # Slice the TensorMaps + nu1_sliced = metatensor.slice(nu1, "samples", labels=sample) + nux_sliced = metatensor.slice(nux, "samples", labels=sample) + nux_sorted_sliced = metatensor.slice(nux_sorted_l, "samples", labels=sample) + + # Calculate norms + norm_nu1 += get_norm(nu1_sliced) ** correlation_order + norm_nux += get_norm(nux_sliced) + norm_nux_sorted_l += get_norm(nux_sorted_sliced) + + # Without sorting the l list we should get the same norm + assert np.allclose(norm_nu1, norm_nux) + + # But with sorting the l list we should get a different norm + assert not np.allclose(norm_nu1, norm_nux_sorted_l) + + +# ============ Test CG cache ============ + + +@pytest.mark.parametrize("l1, l2", [(1, 2), (2, 3), (0, 5)]) +def test_clebsch_gordan_orthogonality(cg_cache_dense, l1, l2): + """ + Test orthogonality relationships of cached dense CG coefficients. + + See + https://en.wikipedia.org/wiki/Clebsch%E2%80%93Gordan_coefficients#Orthogonality_relations + for details. + """ + lam_min = abs(l1 - l2) + lam_max = l1 + l2 + + # We test lam dimension + # \sum_{-m1 \leq l1 \leq m1, -m2 \leq l2 \leq m2} + # <λμ|l1m1,l2m2> = δ_μμ' + for lam in range(lam_min, lam_max): + cg_mat = cg_cache_dense.coeffs[(l1, l2, lam)].reshape(-1, 2 * lam + 1) + dot_product = cg_mat.T @ cg_mat + diag_mask = np.zeros(dot_product.shape, dtype=np.bool_) + diag_mask[np.diag_indices(len(dot_product))] = True + assert np.allclose( + dot_product[~diag_mask], np.zeros(dot_product.shape)[~diag_mask] + ) + assert np.allclose(dot_product[diag_mask], dot_product[diag_mask][0]) + + # We test l1 l2 dimension + # \sum_{|l1-l2| \leq λ \leq l1+l2} \sum_{-μ \leq λ \leq μ} + # <λμ|l1m1,l2m2> = δ_m1m1' δ_m2m2' + l1l2_dim = (2 * l1 + 1) * (2 * l2 + 1) + dot_product = np.zeros((l1l2_dim, l1l2_dim)) + for lam in range(lam_min, lam_max + 1): + cg_mat = cg_cache_dense.coeffs[(l1, l2, lam)].reshape(-1, 2 * lam + 1) + dot_product += cg_mat @ cg_mat.T + diag_mask = np.zeros(dot_product.shape, dtype=np.bool_) + diag_mask[np.diag_indices(len(dot_product))] = True + + assert np.allclose(dot_product[~diag_mask], np.zeros(dot_product.shape)[~diag_mask]) + assert np.allclose(dot_product[diag_mask], dot_product[diag_mask][0]) + + +@pytest.mark.skipif( + not HAS_METATENSOR_OPERATIONS, reason="metatensor-operations is not installed" +) +@pytest.mark.parametrize("frames", [h2_isolated(), h2o_isolated()]) +def test_correlate_density_dense_sparse_agree(frames): + """ + Tests for agreement between nu=3 tensors built using both sparse and dense + CG coefficient caches. + """ + density = spherical_expansion_small(frames) + + # NOTE: testing the private function here so we can control the use of + # sparse v dense CG cache + n_body_sparse = _correlate_density( + density, + correlation_order=3, + compute_metadata_only=False, + sparse=True, + ) + n_body_dense = _correlate_density( + density, + correlation_order=3, + compute_metadata_only=False, + sparse=False, + ) + + assert metatensor.allclose(n_body_sparse, n_body_dense, atol=1e-8, rtol=1e-8) + + +# ============ Test metadata ============ + + +@pytest.mark.skipif( + not HAS_METATENSOR_OPERATIONS, reason="metatensor-operations is not installed" +) +@pytest.mark.parametrize("frames", [h2o_isolated()]) +@pytest.mark.parametrize("correlation_order", [2, 3]) +@pytest.mark.parametrize("skip_redundant", [True, False]) +def test_correlate_density_metadata_agree(frames, correlation_order, skip_redundant): + """ + Tests that the metadata of outputs from :py:func:`correlate_density` and + :py:func:`correlate_density_metadata` agree. + """ + for nu1 in [spherical_expansion_small(frames), spherical_expansion(frames)]: + # Build higher body order tensor with CG computation + nux = correlate_density( + nu1, + correlation_order=correlation_order, + angular_cutoff=None, + selected_keys=None, + skip_redundant=skip_redundant, + ) + # Build higher body order tensor without CG computation - i.e. metadata + # only + nux_metadata_only = correlate_density_metadata( + nu1, + correlation_order=correlation_order, + angular_cutoff=None, + selected_keys=None, + skip_redundant=skip_redundant, + ) + assert metatensor.equal_metadata(nux, nux_metadata_only) + + +@pytest.mark.parametrize("frames", [h2o_isolated()]) +@pytest.mark.parametrize( + "selected_keys", + [ + None, + Labels( + names=["spherical_harmonics_l"], values=np.array([1, 2, 4]).reshape(-1, 1) + ), + ], +) +@pytest.mark.parametrize("skip_redundant", [True, False]) +def test_correlate_density_angular_selection( + frames: List[ase.Atoms], + selected_keys: Labels, + skip_redundant: bool, +): + """ + Tests that the correct angular channels are outputted based on the + specified ``selected_keys``. + """ + nu_1 = spherical_expansion(frames) + + nu_2 = correlate_density( + density=nu_1, + correlation_order=2, + angular_cutoff=None, + selected_keys=selected_keys, + skip_redundant=skip_redundant, + ) + + if selected_keys is None: + assert np.all( + [ + angular in np.arange(SPHEX_HYPERS["max_angular"] * 2 + 1) + for angular in np.unique(nu_2.keys.column("spherical_harmonics_l")) + ] + ) + + else: + assert np.all( + np.sort(np.unique(nu_2.keys.column("spherical_harmonics_l"))) + == np.sort(selected_keys.column("spherical_harmonics_l")) + ) diff --git a/python/rascaline/tests/utils/data/h2_isolated.xyz b/python/rascaline/tests/utils/data/h2_isolated.xyz new file mode 100644 index 000000000..ec5f59680 --- /dev/null +++ b/python/rascaline/tests/utils/data/h2_isolated.xyz @@ -0,0 +1,4 @@ +2 +pbc="F F F" +H 1.97361700 1.73067300 2.47063400 +H 1.97361700 3.26932700 2.47063400 diff --git a/python/rascaline/tests/utils/data/h2o_isolated.xyz b/python/rascaline/tests/utils/data/h2o_isolated.xyz new file mode 100644 index 000000000..fc876d2ba --- /dev/null +++ b/python/rascaline/tests/utils/data/h2o_isolated.xyz @@ -0,0 +1,5 @@ +3 +pbc="F F F" +O 2.56633400 2.50000000 2.50370100 +H 1.97361700 1.73067300 2.47063400 +H 1.97361700 3.26932700 2.47063400 diff --git a/python/rascaline/tests/utils/data/h2o_periodic.xyz b/python/rascaline/tests/utils/data/h2o_periodic.xyz new file mode 100644 index 000000000..3374566e6 --- /dev/null +++ b/python/rascaline/tests/utils/data/h2o_periodic.xyz @@ -0,0 +1,5 @@ +3 +Lattice="5.0 0.0 0.0 0.0 5.0 0.0 0.0 0.0 5.0" pbc="T T T" +O 2.56633400 2.50000000 2.50370100 +H 1.97361700 1.73067300 2.47063400 +H 1.97361700 3.26932700 2.47063400 diff --git a/python/rascaline/tests/utils/rotations.py b/python/rascaline/tests/utils/rotations.py new file mode 100644 index 000000000..2abe22714 --- /dev/null +++ b/python/rascaline/tests/utils/rotations.py @@ -0,0 +1,330 @@ +""" +Class for generating real Wigner-D matrices, and using them to rotate ASE frames +and TensorMaps of density coefficients in the spherical basis. +""" +from typing import Sequence + +import pytest + + +ase = pytest.importorskip("ase") + +import numpy as np # noqa: E402 +from metatensor import TensorBlock, TensorMap # noqa: E402 +from scipy.spatial.transform import Rotation # noqa: E402 + + +try: + import torch # noqa: E402 + from torch import Tensor as TorchTensor # noqa: E402 +except ImportError: + + class TorchTensor: + pass + + +# ===== Functions for transformations in the Cartesian basis ===== + + +def cartesian_rotation(angles: Sequence[float]): + """ + Returns a Cartesian rotation matrix in the appropriate convention (ZYZ, + implicit rotations) to be consistent with the common Wigner D definition. + + `angles` correspond to the alpha, beta, gamma Euler angles in the ZYZ + convention, in radians. + """ + return Rotation.from_euler("ZYZ", angles).as_matrix() + + +def transform_frame_so3(frame: ase.Atoms, angles: Sequence[float]) -> ase.Atoms: + """ + Transforms the positions and cell coordinates of an ASE frame by a SO(3) + rigid rotation. + """ + new_frame = frame.copy() + + # Build cartesian rotation matrix + R = cartesian_rotation(angles) + + # Rotate its positions and cell + new_frame.positions = new_frame.positions @ R.T + new_frame.cell = new_frame.cell @ R.T + + return new_frame + + +def transform_frame_o3(frame: ase.Atoms, angles: Sequence[float]) -> ase.Atoms: + """ + Transforms the positions and cell coordinates of an ASE frame by an O(3) + rotation. This involves a rigid SO(3) rotation of the positions and cell + according to the Euler `angles`, then an inversion by multiplying just the + positions by -1. + """ + new_frame = frame.copy() + + # Build cartesian rotation matrix + R = cartesian_rotation(angles) + + # Rotate its positions and cell + new_frame.positions = new_frame.positions @ R.T + new_frame.cell = new_frame.cell @ R.T + + # Invert the atom positions + new_frame.positions *= -1 + + return new_frame + + +# ===== WignerDReal for transformations in the spherical basis ===== + + +class WignerDReal: + """ + A helper class to compute Wigner D matrices given the Euler angles of a rotation, + and apply them to spherical harmonics (or coefficients). Built to function with + real-valued coefficients. + """ + + def __init__(self, lmax: int, angles: Sequence[float] = None): + """ + Initialize the WignerDReal class. + + :param lmax: int, the maximum angular momentum channel for which the + Wigner D matrices are computed + :param angles: Sequence[float], the alpha, beta, gamma Euler angles, in + radians. + """ + self.lmax = lmax + # Randomly generate Euler angles between 0 and 2 pi if none are provided + if angles is None: + angles = np.random.uniform(size=(3)) * 2 * np.pi + self.angles = angles + self.rotation = cartesian_rotation(angles) + + r2c_mats = {} + c2r_mats = {} + for L in range(0, self.lmax + 1): + r2c_mats[L] = np.hstack( + [_r2c(np.eye(2 * L + 1)[i])[:, np.newaxis] for i in range(2 * L + 1)] + ) + c2r_mats[L] = np.conjugate(r2c_mats[L]).T + self.matrices = {} + for L in range(0, self.lmax + 1): + wig = _wigner_d(L, self.angles) + self.matrices[L] = np.real(c2r_mats[L] @ np.conjugate(wig) @ r2c_mats[L]) + + def rotate_coeff_vector( + self, + frame: ase.Atoms, + coeffs: np.ndarray, + lmax: dict, + nmax: dict, + ) -> np.ndarray: + """ + Rotates the irreducible spherical components (ISCs) of basis set + coefficients in the spherical basis passed in as a flat vector. + + Required is the basis set definition specified by ``lmax`` and ``nmax``. + This are dicts of the form: + + lmax = {symbol: lmax_value, ...} + nmax = {(symbol, l): nmax_value, ...} + + where ``symbol`` is the chemical symbol of the atom, ``lmax_value`` is + its corresponding max l channel value. For each combination of species + symbol and lmax, there exists a max radial channel value ``nmax_value``. + + Then, the assumed ordering of basis function coefficients follows a + hierarchy, which can be read as nested loops over the various indices. + Be mindful that some indices range are from 0 to x (exclusive) and + others from 0 to x + 1 (exclusive). The ranges reported below are + ordered. + + 1. Loop over atoms (index ``i``, of chemical species ``a``) in the + structure. ``i`` takes values 0 to N (** exclusive **), where N is the + number of atoms in the structure. + + 2. Loop over spherical harmonics channel (index ``l``) for each atom. + ``l`` takes values from 0 to ``lmax[a] + 1`` (** exclusive **), where + ``a`` is the chemical species of atom ``i``, given by the chemical + symbol at the ``i``th position of ``symbol_list``. + + 3. Loop over radial channel (index ``n``) for each atom ``i`` and + spherical harmonics channel ``l`` combination. ``n`` takes values from 0 + to ``nmax[(a, l)]`` (** exclusive **). + + 4. Loop over spherical harmonics component (index ``m``) for each atom. + ``m`` takes values from ``-l`` to ``l`` (** inclusive **). + + :param frame: the atomic structure in ASE format for which the + coefficients are defined. + :param coeffs: the coefficients in the spherical basis, as a flat + vector. + :param lmax: dict containing the maximum spherical harmonics (l) value + for each atom type. + :param nmax: dict containing the maximum radial channel (n) value for + each combination of atom type and l. + + :return: the rotated coefficients in the spherical basis, as a flat + vector with the same order as the input vector. + """ + # Initialize empty vector for storing the rotated ISCs + rot_vect = np.empty_like(coeffs) + + # Iterate over atomic species of the atoms in the frame + curr_idx = 0 + for symbol in frame.get_chemical_symbols(): + # Get the basis set lmax value for this species + sym_lmax = lmax[symbol] + for angular_l in range(sym_lmax + 1): + # Get the number of radial functions for this species and l value + sym_l_nmax = nmax[(symbol, angular_l)] + # Get the Wigner D Matrix for this l value + wig_mat = self.matrices[angular_l].T + for _n in range(sym_l_nmax): + # Retrieve the irreducible spherical component + isc = coeffs[curr_idx : curr_idx + (2 * angular_l + 1)] + # Rotate the ISC and store + rot_isc = isc @ wig_mat + rot_vect[curr_idx : curr_idx + (2 * angular_l + 1)][:] = rot_isc[:] + # Update the start index for the next ISC + curr_idx += 2 * angular_l + 1 + + return rot_vect + + def rotate_tensorblock(self, angular_l: int, block: TensorBlock) -> TensorBlock: + """ + Rotates a TensorBlock ``block``, represented in the spherical basis, + according to the Wigner D Real matrices for the given ``l`` value. + Assumes the components of the block are [("spherical_harmonics_m",),]. + """ + # Get the Wigner matrix for this l value + wig = self.matrices[angular_l].T + + # Copy the block + block_rotated = block.copy() + vals = block_rotated.values + + # Perform the rotation, either with numpy or torch, by taking the + # tensordot product of the irreducible spherical components. Modify + # in-place the values of the copied TensorBlock. + if isinstance(vals, TorchTensor): + wig = torch.tensor(wig) + block_rotated.values[:] = torch.tensordot( + vals.swapaxes(1, 2), wig, dims=1 + ).swapaxes(1, 2) + elif isinstance(block.values, np.ndarray): + block_rotated.values[:] = np.tensordot( + vals.swapaxes(1, 2), wig, axes=1 + ).swapaxes(1, 2) + else: + raise TypeError("TensorBlock values must be a numpy array or torch tensor.") + + return block_rotated + + def transform_tensormap_so3(self, tensor: TensorMap) -> TensorMap: + """ + Transforms a TensorMap by a by an SO(3) rigid rotation using Wigner-D + matrices. + + Assumes the input tensor follows the metadata structure consistent with + those produce by rascaline. + """ + # Retrieve the key and the position of the l value in the key names + keys = tensor.keys + idx_l_value = keys.names.index("spherical_harmonics_l") + + # Iterate over the blocks and rotate + rotated_blocks = [] + for key in keys: + # Retrieve the l value + angular_l = key[idx_l_value] + + # Rotate the block and store + rotated_blocks.append(self.rotate_tensorblock(angular_l, tensor[key])) + + return TensorMap(keys, rotated_blocks) + + def transform_tensormap_o3(self, tensor: TensorMap) -> TensorMap: + """ + Transforms a TensorMap by a by an O(3) transformation: this involves an + SO(3) rigid rotation using Wigner-D Matrices followed by an inversion. + + Assumes the input tensor follows the metadata structure consistent with + those produce by rascaline. + """ + # Retrieve the key and the position of the l value in the key names + keys = tensor.keys + idx_l_value = keys.names.index("spherical_harmonics_l") + + # Iterate over the blocks and rotate + new_blocks = [] + for key in keys: + # Retrieve the l value + angular_l = key[idx_l_value] + + # Rotate the block + new_block = self.rotate_tensorblock(angular_l, tensor[key]) + + # Work out the inversion multiplier according to the convention + inversion_multiplier = 1 + if key["spherical_harmonics_l"] % 2 == 1: + inversion_multiplier *= -1 + + # "inversion_sigma" may not be present if CG iterations haven't been + # performed (i.e. nu=1 rascaline SphericalExpansion) + if "inversion_sigma" in keys.names: + if key["inversion_sigma"] == -1: + inversion_multiplier *= -1 + + # Invert the block by applying the inversion multiplier + new_block = TensorBlock( + values=new_block.values * inversion_multiplier, + samples=new_block.samples, + components=new_block.components, + properties=new_block.properties, + ) + new_blocks.append(new_block) + + return TensorMap(keys, new_blocks) + + +# ===== Helper functions for WignerDReal + + +def _wigner_d(angular_l: int, angles: Sequence[float]) -> np.ndarray: + """ + Computes the Wigner D matrix: + D^l_{mm'}(alpha, beta, gamma) + from sympy and converts it to numerical values. + + `angles` are the alpha, beta, gamma Euler angles (radians, ZYZ convention) + and l the irrep. + """ + try: + from sympy.physics.wigner import wigner_d + except ModuleNotFoundError: + raise ModuleNotFoundError( + "Calculation of Wigner D matrices requires a sympy installation" + ) + return np.complex128(wigner_d(angular_l, *angles)) + + +def _r2c(sp): + """ + Real to complex SPH. Assumes a block with 2l+1 reals corresponding + to real SPH with m indices from -l to +l + """ + + i_sqrt_2 = 1.0 / np.sqrt(2) + + angular_l = (len(sp) - 1) // 2 # infers l from the vector size + rc = np.zeros(len(sp), dtype=np.complex128) + rc[angular_l] = sp[angular_l] + for m in range(1, angular_l + 1): + rc[angular_l + m] = ( + (sp[angular_l + m] + 1j * sp[angular_l - m]) * i_sqrt_2 * (-1) ** m + ) + rc[angular_l - m] = (sp[angular_l + m] - 1j * sp[angular_l - m]) * i_sqrt_2 + return rc diff --git a/tox.ini b/tox.ini index 2af417108..ed7ce7872 100644 --- a/tox.ini +++ b/tox.ini @@ -56,9 +56,14 @@ deps = {[testenv]metatensor-core-requirement} ase chemfiles + metatensor-operations pytest pytest-cov scipy + sympy + wigners + # TODO: add mops once support for windows available + # git+https://github.com/lab-cosmo/mops commands = pytest {[testenv]test_options} {posargs}