diff --git a/docs/conf.py b/docs/conf.py index fa6916c3..a86fbc13 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -62,7 +62,16 @@ autosummary_generate = True autodoc_member_order = "bysource" -autodoc_mock_imports = ["cudf", "cuml", "cugraph", "cupy", "cupyx", "pylibraft", "cuvs"] +autodoc_mock_imports = [ + "cudf", + "cuml", + "cugraph", + "cupy", + "cupyx", + "pylibraft", + "dask", + "cuvs", +] default_role = "literal" napoleon_google_docstring = False napoleon_numpy_docstring = True @@ -108,6 +117,7 @@ "rmm": ("https://docs.rapids.ai/api/rmm/stable/", None), "statsmodels": ("https://www.statsmodels.org/stable/", None), "omnipath": ("https://omnipath.readthedocs.io/en/latest/", None), + "dask": ("https://docs.dask.org/en/stable/", None), } # List of patterns, relative to source directory, that match files and diff --git a/docs/release-notes/0.11.0.md b/docs/release-notes/0.11.0.md new file mode 100644 index 00000000..9d1bf581 --- /dev/null +++ b/docs/release-notes/0.11.0.md @@ -0,0 +1,14 @@ +### 0.10.11 {small}`the-future` + +```{rubric} Features +``` +* Adds support for Multi-GPU out-of-core support through Dask {pr}`179` {smaller}`S Dicks, I Gold & P Angerer` +```{rubric} Performance +``` + + +```{rubric} Bug fixes +``` + +```{rubric} Misc +``` diff --git a/docs/release-notes/index.md b/docs/release-notes/index.md index 1faf4152..2df902c1 100644 --- a/docs/release-notes/index.md +++ b/docs/release-notes/index.md @@ -2,6 +2,10 @@ # Release notes +## Version 0.11.0 +```{include} /release-notes/0.11.0.md +``` + ## Version 0.10.0 ```{include} /release-notes/0.10.12.md ``` @@ -33,27 +37,20 @@ ## Version 0.9.0 ```{include} /release-notes/0.9.6.md ``` - ```{include} /release-notes/0.9.5.md ``` - ```{include} /release-notes/0.9.4.md ``` - ```{include} /release-notes/0.9.3.md ``` - ```{include} /release-notes/0.9.2.md ``` - ```{include} /release-notes/0.9.1.md ``` - ```{include} /release-notes/0.9.0.md ``` ## Version 0.8.0 - ```{include} /release-notes/0.8.1.md ``` ```{include} /release-notes/0.8.0.md diff --git a/pyproject.toml b/pyproject.toml index d33bc1d2..bff1b868 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ dependencies = [ [project.optional-dependencies] rapids11 = ["cupy-cuda11x","cudf-cu11==24.10.*", "cuml-cu11==24.10.*", "cugraph-cu11==24.10.*"] -rapids12 = ["cupy-cuda12x","cudf-cu12==24.10.*", "cuml-cu12==24.10.*", "cugraph-cu12==24.10.*"] +rapids12 = ["cupy-cuda12x","cudf-cu12==24.8.*", "cuml-cu12==24.8.*", "cugraph-cu12==24.8.*"] doc = [ "sphinx>=4.5.0", "sphinx-copybutton", @@ -36,6 +36,7 @@ doc = [ "scanpydoc[typehints,theme]>=0.9.4", "readthedocs-sphinx-ext", "sphinx_copybutton", + "dask", "pytest", ] test = [ diff --git a/src/rapids_singlecell/_compat.py b/src/rapids_singlecell/_compat.py new file mode 100644 index 00000000..f569504b --- /dev/null +++ b/src/rapids_singlecell/_compat.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import cupy as cp +from cupyx.scipy.sparse import csr_matrix +from dask.array import Array as DaskArray # noqa: F401 + + +def _meta_dense(dtype): + return cp.zeros([0], dtype=dtype) + + +def _meta_sparse(dtype): + return csr_matrix(cp.array((1.0,), dtype=dtype)) diff --git a/src/rapids_singlecell/_testing.py b/src/rapids_singlecell/_testing.py index eee65f78..4e73261d 100644 --- a/src/rapids_singlecell/_testing.py +++ b/src/rapids_singlecell/_testing.py @@ -4,8 +4,10 @@ from typing import TYPE_CHECKING +import cupy as cp import pytest -from anndata.tests.helpers import asarray +from anndata.tests.helpers import as_dense_dask_array, as_sparse_dask_array, asarray +from cupyx.scipy import sparse as cusparse from scipy import sparse if TYPE_CHECKING: @@ -38,3 +40,17 @@ def param_with( ARRAY_TYPES_MEM = tuple( at for (strg, _), ats in MAP_ARRAY_TYPES.items() if strg == "mem" for at in ats ) + + +def as_sparse_cupy_dask_array(X): + da = as_sparse_dask_array(X) + da = da.rechunk((da.shape[0] // 2, da.shape[1])) + da = da.map_blocks(cusparse.csr_matrix, dtype=X.dtype) + return da + + +def as_dense_cupy_dask_array(X): + X = as_dense_dask_array(X) + X = X.map_blocks(cp.array) + X = X.rechunk((X.shape[0] // 2, X.shape[1])) + return X diff --git a/src/rapids_singlecell/_utils/__init__.py b/src/rapids_singlecell/_utils/__init__.py index 201809b6..8076b7f1 100644 --- a/src/rapids_singlecell/_utils/__init__.py +++ b/src/rapids_singlecell/_utils/__init__.py @@ -2,6 +2,13 @@ from typing import TYPE_CHECKING, Union +import cupy as cp import numpy as np +from cupyx.scipy.sparse import csc_matrix, csr_matrix +from dask.array import Array as DaskArray AnyRandom = Union[int, np.random.RandomState, None] # noqa: UP007 + + +ArrayTypes = Union[cp.ndarray, csc_matrix, csr_matrix] # noqa: UP007 +ArrayTypesDask = Union[cp.ndarray, csc_matrix, csr_matrix, DaskArray] # noqa: UP007 diff --git a/src/rapids_singlecell/preprocessing/__init__.py b/src/rapids_singlecell/preprocessing/__init__.py index 125c27e4..005ba396 100644 --- a/src/rapids_singlecell/preprocessing/__init__.py +++ b/src/rapids_singlecell/preprocessing/__init__.py @@ -5,11 +5,11 @@ from ._neighbors import bbknn, neighbors from ._normalize import log1p, normalize_pearson_residuals, normalize_total from ._pca import pca +from ._qc import calculate_qc_metrics from ._regress_out import regress_out from ._scale import scale from ._scrublet import scrublet, scrublet_simulate_doublets from ._simple import ( - calculate_qc_metrics, filter_cells, filter_genes, filter_highly_variable, diff --git a/src/rapids_singlecell/preprocessing/_hvg.py b/src/rapids_singlecell/preprocessing/_hvg.py index a4332af9..6db78c74 100644 --- a/src/rapids_singlecell/preprocessing/_hvg.py +++ b/src/rapids_singlecell/preprocessing/_hvg.py @@ -9,10 +9,12 @@ import cupy as cp import numpy as np import pandas as pd -from cupyx.scipy.sparse import issparse, isspmatrix_csc +from cupyx.scipy.sparse import csr_matrix, issparse, isspmatrix_csc from scanpy.get import _get_obs_rep -from ._simple import calculate_qc_metrics +from rapids_singlecell._compat import DaskArray, _meta_dense, _meta_sparse + +from ._qc import calculate_qc_metrics from ._utils import _check_gpu_X, _check_nonnegative_integers, _get_mean_var if TYPE_CHECKING: @@ -188,7 +190,11 @@ def highly_variable_genes( if batch_key is None: df = _highly_variable_genes_single_batch( - adata, layer=layer, cutoff=cutoff, n_bins=n_bins, flavor=flavor + adata, + layer=layer, + cutoff=cutoff, + n_bins=n_bins, + flavor=flavor, ) else: df = _highly_variable_genes_batched( @@ -260,6 +266,21 @@ def in_bounds( ) +def _hvg_expm1(X): + if isinstance(X, DaskArray): + if isinstance(X._meta, cp.ndarray): + X = X.map_blocks(lambda X: cp.expm1(X), meta=_meta_dense(X.dtype)) + elif isinstance(X._meta, csr_matrix): + X = X.map_blocks(lambda X: X.expm1(), meta=_meta_sparse(X.dtype)) + else: + X = X.copy() + if issparse(X): + X = X.expm1() + else: + X = cp.expm1(X) + return X + + def _highly_variable_genes_single_batch( adata: AnnData, *, @@ -277,18 +298,18 @@ def _highly_variable_genes_single_batch( `highly_variable`, `means`, `dispersions`, and `dispersions_norm`. """ X = _get_obs_rep(adata, layer=layer) - + _check_gpu_X(X, allow_dask=True) if hasattr(X, "_view_args"): # AnnData array view - # For compatibility with anndata<0.9 - X = X.copy() # Doesn't actually copy memory, just removes View class wrapper + X = X.copy() if flavor == "seurat": - X = X.copy() - if issparse(X): - X = X.expm1() - else: - X = cp.expm1(X) + X = _hvg_expm1(X) + mean, var = _get_mean_var(X, axis=0) + if isinstance(X, DaskArray): + import dask + + mean, var = dask.compute(mean, var) mean[mean == 0] = 1e-12 disp = var / mean if flavor == "seurat": # logarithmized mean as in Seurat @@ -420,7 +441,11 @@ def _highly_variable_genes_batched( adata_subset = adata_subset[:, filt] hvg = _highly_variable_genes_single_batch( - adata_subset, layer=layer, cutoff=cutoff, n_bins=n_bins, flavor=flavor + adata_subset, + layer=layer, + cutoff=cutoff, + n_bins=n_bins, + flavor=flavor, ) hvg.reset_index(drop=False, inplace=True, names=["gene"]) diff --git a/src/rapids_singlecell/preprocessing/_kernels/_qc_kernels_dask.py b/src/rapids_singlecell/preprocessing/_kernels/_qc_kernels_dask.py new file mode 100644 index 00000000..01172407 --- /dev/null +++ b/src/rapids_singlecell/preprocessing/_kernels/_qc_kernels_dask.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from cuml.common.kernel_utils import cuda_kernel_factory + +_sparse_qc_kernel_csr_dask_cells = r""" + (const int *indptr,const int *index,const {0} *data, + {0}* sums_cells, int* cell_ex, + int n_cells) { + int cell = blockDim.x * blockIdx.x + threadIdx.x; + if(cell >= n_cells){ + return; + } + int start_idx = indptr[cell]; + int stop_idx = indptr[cell+1]; + + {0} sums_cells_i = 0; + int cell_ex_i = 0; + for(int gene = start_idx; gene < stop_idx; gene++){ + {0} value = data[gene]; + int gene_number = index[gene]; + sums_cells_i += value; + cell_ex_i += 1; + } + sums_cells[cell] = sums_cells_i; + cell_ex[cell] = cell_ex_i; + } +""" + + +_sparse_qc_kernel_csr_dask_genes = r""" + (const int *index,const {0} *data, + {0}* sums_genes, int* gene_ex, + int nnz) { + int idx = blockDim.x * blockIdx.x + threadIdx.x; + if(idx >= nnz){ + return; + } + int minor_pos = index[idx]; + atomicAdd(&sums_genes[minor_pos], data[idx]); + atomicAdd(&gene_ex[minor_pos], 1); + } + """ + +_sparse_qc_kernel_dense_cells = r""" + (const {0} *data, + {0}* sums_cells, int* cell_ex, + int n_cells,int n_genes) { + int cell = blockDim.x * blockIdx.x + threadIdx.x; + int gene = blockDim.y * blockIdx.y + threadIdx.y; + if(cell >= n_cells || gene >=n_genes){ + return; + } + long long int index = static_cast(cell) * n_genes + gene; + {0} value = data[index]; + if (value>0.0){ + atomicAdd(&sums_cells[cell], value); + atomicAdd(&cell_ex[cell], 1); + } + } +""" + +_sparse_qc_kernel_dense_genes = r""" + (const {0} *data, + {0}* sums_genes,int* gene_ex, + int n_cells,int n_genes) { + int cell = blockDim.x * blockIdx.x + threadIdx.x; + int gene = blockDim.y * blockIdx.y + threadIdx.y; + if(cell >= n_cells || gene >=n_genes){ + return; + } + long long int index = static_cast(cell) * n_genes + gene; + {0} value = data[index]; + if (value>0.0){ + atomicAdd(&sums_genes[gene], value); + atomicAdd(&gene_ex[gene], 1); + } + } +""" + + +def _sparse_qc_csr_dask_cells(dtype): + return cuda_kernel_factory( + _sparse_qc_kernel_csr_dask_cells, (dtype,), "_sparse_qc_kernel_csr_dask_cells" + ) + + +def _sparse_qc_csr_dask_genes(dtype): + return cuda_kernel_factory( + _sparse_qc_kernel_csr_dask_genes, (dtype,), "_sparse_qc_kernel_csr_dask_genes" + ) + + +def _sparse_qc_dense_cells(dtype): + return cuda_kernel_factory( + _sparse_qc_kernel_dense_cells, (dtype,), "_sparse_qc_kernel_dense_cells" + ) + + +def _sparse_qc_dense_genes(dtype): + return cuda_kernel_factory( + _sparse_qc_kernel_dense_genes, (dtype,), "_sparse_qc_kernel_dense_genes" + ) diff --git a/src/rapids_singlecell/preprocessing/_kernels/_scale_kernel.py b/src/rapids_singlecell/preprocessing/_kernels/_scale_kernel.py index f44388cd..07f8e512 100644 --- a/src/rapids_singlecell/preprocessing/_kernels/_scale_kernel.py +++ b/src/rapids_singlecell/preprocessing/_kernels/_scale_kernel.py @@ -20,7 +20,7 @@ """ _csr_scale_diff_kernel = r""" -(const int *indptr, const int *indices, {0} *data, const double * std, const int *mask, {0} clipper,int nrows) { +(const int *indptr, const int *indices, {0} *data, const {0} * std, const int *mask, {0} clipper,int nrows) { int row = blockIdx.x; if(row >= nrows){ diff --git a/src/rapids_singlecell/preprocessing/_normalize.py b/src/rapids_singlecell/preprocessing/_normalize.py index 54b5a8cd..1acf9dd3 100644 --- a/src/rapids_singlecell/preprocessing/_normalize.py +++ b/src/rapids_singlecell/preprocessing/_normalize.py @@ -8,12 +8,20 @@ from cupyx.scipy import sparse from scanpy.get import _get_obs_rep, _set_obs_rep +from rapids_singlecell._compat import ( + DaskArray, + _meta_dense, + _meta_sparse, +) + from ._utils import _check_gpu_X, _check_nonnegative_integers if TYPE_CHECKING: from anndata import AnnData from cupyx.scipy.sparse import csr_matrix, spmatrix + from rapids_singlecell._utils import ArrayTypesDask + def normalize_total( adata: AnnData, @@ -43,7 +51,6 @@ def normalize_total( copy Whether to return a copy or update `adata`. Not compatible with inplace=False. - Returns ------- Returns a normalized copy or updates `adata` with a normalized version of @@ -55,7 +62,7 @@ def normalize_total( adata = adata.copy() X = _get_obs_rep(adata, layer=layer) - _check_gpu_X(X) + _check_gpu_X(X, allow_dask=True) if not inplace: X = X.copy() @@ -63,30 +70,24 @@ def normalize_total( if sparse.isspmatrix_csc(X): X = X.tocsr() if not target_sum: - if sparse.issparse(X): - from ._kernels._norm_kernel import _get_sparse_sum_major + target_sum = _get_target_sum(X) - counts_per_cell = cp.zeros(X.shape[0], dtype=X.dtype) - sum_kernel = _get_sparse_sum_major(X.dtype) - sum_kernel( - (X.shape[0],), - (64,), - (X.indptr, X.data, counts_per_cell, X.shape[0]), - ) - else: - counts_per_cell = X.sum(axis=1) - target_sum = cp.median(counts_per_cell) + X = _normalize_total(X, target_sum) - if sparse.isspmatrix_csr(X): - from ._kernels._norm_kernel import _mul_csr + if inplace: + _set_obs_rep(adata, X, layer=layer) + + if copy: + return adata + elif not inplace: + return X - mul_kernel = _mul_csr(X.dtype) - mul_kernel( - (math.ceil(X.shape[0] / 128),), - (128,), - (X.indptr, X.data, X.shape[0], int(target_sum)), - ) +def _normalize_total(X: ArrayTypesDask, target_sum: int): + if isinstance(X, sparse.csr_matrix): + return _normalize_total_csr(X, target_sum) + elif isinstance(X, DaskArray): + return _normalize_total_dask(X, target_sum) else: from ._kernels._norm_kernel import _mul_dense @@ -98,14 +99,114 @@ def normalize_total( (128,), (X, X.shape[0], X.shape[1], int(target_sum)), ) + return X - if inplace: - _set_obs_rep(adata, X, layer=layer) - if copy: - return adata - elif not inplace: - return X +def _normalize_total_csr(X: sparse.csr_matrix, target_sum: int) -> sparse.csr_matrix: + from ._kernels._norm_kernel import _mul_csr + + mul_kernel = _mul_csr(X.dtype) + mul_kernel( + (math.ceil(X.shape[0] / 128),), + (128,), + (X.indptr, X.data, X.shape[0], int(target_sum)), + ) + return X + + +def _normalize_total_dask(X: DaskArray, target_sum: int) -> DaskArray: + if isinstance(X._meta, sparse.csr_matrix): + from ._kernels._norm_kernel import _mul_csr + + mul_kernel = _mul_csr(X.dtype) + mul_kernel.compile() + + def __mul(X_part): + mul_kernel( + (math.ceil(X_part.shape[0] / 32),), + (32,), + (X_part.indptr, X_part.data, X_part.shape[0], int(target_sum)), + ) + return X_part + + X = X.map_blocks(__mul, meta=_meta_sparse(X.dtype)) + elif isinstance(X._meta, cp.ndarray): + from ._kernels._norm_kernel import _mul_dense + + mul_kernel = _mul_dense(X.dtype) + mul_kernel.compile() + + def __mul(X_part): + mul_kernel( + (math.ceil(X_part.shape[0] / 128),), + (128,), + (X_part, X_part.shape[0], X_part.shape[1], int(target_sum)), + ) + return X_part + + X = X.map_blocks(__mul, meta=_meta_dense(X.dtype)) + else: + raise ValueError(f"Cannot normalize {type(X)}") + return X + + +def _get_target_sum(X: ArrayTypesDask) -> int: + if isinstance(X, sparse.csr_matrix): + return _get_target_sum_csr(X) + elif isinstance(X, DaskArray): + return _get_target_sum_dask(X) + else: + return cp.median(X.sum(axis=1)) + + +def _get_target_sum_csr(X: sparse.csr_matrix) -> int: + from ._kernels._norm_kernel import _get_sparse_sum_major + + counts_per_cell = cp.zeros(X.shape[0], dtype=X.dtype) + sum_kernel = _get_sparse_sum_major(X.dtype) + sum_kernel( + (X.shape[0],), + (64,), + (X.indptr, X.data, counts_per_cell, X.shape[0]), + ) + counts_per_cell = counts_per_cell[counts_per_cell > 0] + target_sum = cp.median(counts_per_cell) + return target_sum + + +def _get_target_sum_dask(X: DaskArray) -> int: + if isinstance(X._meta, sparse.csr_matrix): + from ._kernels._norm_kernel import _get_sparse_sum_major + + sum_kernel = _get_sparse_sum_major(X.dtype) + sum_kernel.compile() + + def __sum(X_part): + counts_per_cell = cp.zeros(X_part.shape[0], dtype=X_part.dtype) + sum_kernel( + (X.shape[0],), + (64,), + (X_part.indptr, X_part.data, counts_per_cell, X_part.shape[0]), + ) + return counts_per_cell + + elif isinstance(X._meta, cp.ndarray): + + def __sum(X_part): + return X_part.sum(axis=1) + else: + raise ValueError(f"Cannot compute target sum for {type(X)}") + target_sum_chunk_matrices = X.map_blocks( + __sum, + meta=cp.array((1.0,), dtype=X.dtype), + dtype=X.dtype, + chunks=(X.chunksize[0],), + drop_axis=1, + ) + counts_per_cell = target_sum_chunk_matrices.compute() + counts_per_cell = counts_per_cell[counts_per_cell > 0] + target_sum = cp.median(counts_per_cell) + return target_sum def log1p( @@ -146,13 +247,20 @@ def log1p( adata = adata.copy() X = _get_obs_rep(adata, layer=layer, obsm=obsm) - _check_gpu_X(X) + _check_gpu_X(X, allow_dask=True) + + if not inplace: + X = X.copy() if isinstance(X, cp.ndarray): X = cp.log1p(X) - else: + elif sparse.issparse(X): X = X.log1p() - + elif isinstance(X, DaskArray): + if isinstance(X._meta, cp.ndarray): + X = X.map_blocks(lambda x: cp.log1p(x), meta=_meta_dense(X.dtype)) + elif isinstance(X._meta, sparse.csr_matrix): + X = X.map_blocks(lambda x: x.log1p(), meta=_meta_sparse(X.dtype)) adata.uns["log1p"] = {"base": None} if inplace: _set_obs_rep(adata, X, layer=layer, obsm=obsm) diff --git a/src/rapids_singlecell/preprocessing/_pca.py b/src/rapids_singlecell/preprocessing/_pca.py index 592d805a..c970dc7b 100644 --- a/src/rapids_singlecell/preprocessing/_pca.py +++ b/src/rapids_singlecell/preprocessing/_pca.py @@ -5,7 +5,6 @@ import cupy as cp import numpy as np -from cuml.decomposition import PCA, TruncatedSVD from cuml.internals.input_utils import sparse_scipy_to_cp from cupyx.scipy.sparse import csr_matrix, isspmatrix_csr from cupyx.scipy.sparse import issparse as cpissparse @@ -13,8 +12,11 @@ from scanpy.preprocessing._pca import _handle_mask_var from scipy.sparse import issparse +from rapids_singlecell._compat import DaskArray from rapids_singlecell.get import _get_obs_rep +from ._utils import _check_gpu_X + if TYPE_CHECKING: from anndata import AnnData from numpy.typing import NDArray @@ -28,7 +30,7 @@ def pca( zero_center: bool = True, svd_solver: str | None = None, random_state: int | None = 0, - mask_var: NDArray[np.bool_] | str | None | Empty = _empty, + mask_var: NDArray[np.bool] | str | None | Empty = _empty, use_highly_variable: bool | None = None, dtype: str = "float32", chunked: bool = False, @@ -137,51 +139,80 @@ def pca( n_comps = min_dim - 1 else: n_comps = 50 + if isinstance(X, DaskArray): + if chunked: + raise ValueError( + "Dask arrays are not supported for chunked PCA computation." + ) + _check_gpu_X(X, allow_dask=True) + if not zero_center: + raise ValueError("Dask arrays do not support non-zero centered PCA.") + if isinstance(X._meta, cp.ndarray): + from cuml.dask.decomposition import PCA + + if svd_solver == "auto": + svd_solver = "jacobi" + pca_func = PCA(n_components=n_comps, svd_solver=svd_solver, whiten=False) + X_pca = pca_func.fit_transform(X) + # cuml-issue #5883 + X_pca = X_pca.compute_chunk_sizes() + elif isinstance(X._meta, csr_matrix): + from ._sparse_pca._dask_sparse_pca import PCA_sparse_dask - if chunked: - from cuml.decomposition import IncrementalPCA + pca_func = PCA_sparse_dask(n_components=n_comps) + pca_func = pca_func.fit(X) + X_pca = pca_func.transform(X) - X_pca = np.zeros((X.shape[0], n_comps), X.dtype) + elif zero_center: + if chunked: + from cuml.decomposition import IncrementalPCA - pca_func = IncrementalPCA( - n_components=n_comps, output_type="numpy", batch_size=chunk_size - ) - pca_func.fit(X) - - n_batches = math.ceil(X.shape[0] / chunk_size) - for batch in range(n_batches): - start_idx = batch * chunk_size - stop_idx = min(batch * chunk_size + chunk_size, X.shape[0]) - chunk = X[start_idx:stop_idx, :] - if issparse(chunk) or cpissparse(chunk): - chunk = chunk.toarray() - X_pca[start_idx:stop_idx] = pca_func.transform(chunk) - else: - if zero_center: - if cpissparse(X) or issparse(X): - if issparse(X): - X = sparse_scipy_to_cp(X, dtype=X.dtype) - X = csr_matrix(X) - pca_func = PCA_sparse(n_components=n_comps) - X_pca = pca_func.fit_transform(X) - else: - pca_func = PCA( - n_components=n_comps, - svd_solver=svd_solver, - random_state=random_state, - output_type="numpy", - ) - X_pca = pca_func.fit_transform(X) - - elif not zero_center: - pca_func = TruncatedSVD( + X_pca = np.zeros((X.shape[0], n_comps), X.dtype) + + pca_func = IncrementalPCA( + n_components=n_comps, output_type="numpy", batch_size=chunk_size + ) + pca_func.fit(X) + + n_batches = math.ceil(X.shape[0] / chunk_size) + for batch in range(n_batches): + start_idx = batch * chunk_size + stop_idx = min(batch * chunk_size + chunk_size, X.shape[0]) + chunk = X[start_idx:stop_idx, :] + if issparse(chunk) or cpissparse(chunk): + chunk = chunk.toarray() + X_pca[start_idx:stop_idx] = pca_func.transform(chunk) + elif cpissparse(X) or issparse(X): + if issparse(X): + X = sparse_scipy_to_cp(X, dtype=X.dtype) + from ._sparse_pca._sparse_pca import PCA_sparse + + if not isspmatrix_csr(X): + X = X.tocsr() + pca_func = PCA_sparse(n_components=n_comps) + X_pca = pca_func.fit_transform(X) + else: + from cuml.decomposition import PCA + + pca_func = PCA( n_components=n_comps, + svd_solver=svd_solver, random_state=random_state, - algorithm=svd_solver, output_type="numpy", ) X_pca = pca_func.fit_transform(X) + else: # not zero_center + from cuml.decomposition import TruncatedSVD + + pca_func = TruncatedSVD( + n_components=n_comps, + random_state=random_state, + algorithm=svd_solver, + output_type="numpy", + ) + X_pca = pca_func.fit_transform(X) + if X_pca.dtype.descr != np.dtype(dtype).descr: X_pca = X_pca.astype(dtype) @@ -196,198 +227,21 @@ def pca( "mask_var": mask_var_param, **({"layer": layer} if layer is not None else {}), }, - "variance": pca_func.explained_variance_, - "variance_ratio": pca_func.explained_variance_ratio_, + "variance": _as_numpy(pca_func.explained_variance_), + "variance_ratio": _as_numpy(pca_func.explained_variance_ratio_), } if mask_var is not None: adata.varm[key_varm] = np.zeros(shape=(adata.n_vars, n_comps)) - adata.varm[key_varm][mask_var] = pca_func.components_.T + adata.varm[key_varm][mask_var] = _as_numpy(pca_func.components_.T) else: - adata.varm[key_varm] = pca_func.components_.T + adata.varm[key_varm] = _as_numpy(pca_func.components_.T) + if copy: return adata -class PCA_sparse: - def __init__(self, n_components) -> None: - self.n_components = n_components - - def fit(self, x): - if self.n_components is None: - n_rows = x.shape[0] - n_cols = x.shape[1] - self.n_components_ = min(n_rows, n_cols) - else: - self.n_components_ = self.n_components - - if not isspmatrix_csr(x): - x = x.tocsr() - _check_matrix_for_zero_genes(x) - self.n_samples_ = x.shape[0] - self.n_features_in_ = x.shape[1] if x.ndim == 2 else 1 - self.dtype = x.data.dtype - - covariance, self.mean_, _ = _cov_sparse(x=x, return_mean=True) - - self.explained_variance_, self.components_ = cp.linalg.eigh( - covariance, UPLO="U" - ) - - # NOTE: We reverse the eigen vector and eigen values here - # because cupy provides them in ascending order. Make a copy otherwise - # it is not C_CONTIGUOUS anymore and would error when converting to - # CumlArray - self.explained_variance_ = self.explained_variance_[::-1] - - self.components_ = cp.flip(self.components_, axis=1) - - self.components_ = self.components_.T[: self.n_components_, :] - - self.explained_variance_ratio_ = self.explained_variance_ / cp.sum( - self.explained_variance_ - ) - - self.explained_variance_ = self.explained_variance_[: self.n_components_] - - self.explained_variance_ratio_ = self.explained_variance_ratio_[ - : self.n_components_ - ] - - return self - - def transform(self, X): - precomputed_mean_impact = self.mean_ @ self.components_.T - mean_impact = cp.ones((X.shape[0], 1)) @ precomputed_mean_impact.reshape(1, -1) - X_transformed = X.dot(self.components_.T) - mean_impact - self.components_ = self.components_.get() - self.explained_variance_ = self.explained_variance_.get() - self.explained_variance_ratio_ = self.explained_variance_ratio_.get() - return X_transformed.get() - - def fit_transform(self, X, y=None): - return self.fit(X).transform(X) - - -def _cov_sparse(x, return_gram=False, return_mean=False): - """ - Computes the mean and the covariance of matrix X of - the form Cov(X, X) = E(XX) - E(X)E(X) - - This is a temporary fix for - cuml issue #5475 and cupy issue #7699, - where the operation `x.T.dot(x)` did not work for - larger sparse matrices. - - Parameters - ---------- - - x : cupyx.scipy.sparse of size (m, n) - return_gram : boolean (default = False) - If True, gram matrix of the form (1 / n) * X.T.dot(X) - will be returned. - When True, a copy will be created - to store the results of the covariance. - When False, the local gram matrix result - will be overwritten - return_mean: boolean (default = False) - If True, the Maximum Likelihood Estimate used to - calculate the mean of X and X will be returned, - of the form (1 / n) * mean(X) and (1 / n) * mean(X) - - Returns - ------- - - result : cov(X, X) when return_gram and return_mean are False - cov(X, X), gram(X, X) when return_gram is True, - return_mean is False - cov(X, X), mean(X), mean(X) when return_gram is False, - return_mean is True - cov(X, X), gram(X, X), mean(X), mean(X) - when return_gram is True and return_mean is True - """ - - from ._kernels._pca_sparse_kernel import ( - _copy_kernel, - _cov_kernel, - _gramm_kernel_csr, - ) - - gram_matrix = cp.zeros((x.shape[1], x.shape[1]), dtype=x.data.dtype) - - block = (128,) - grid = (x.shape[0],) - compute_mean_cov = _gramm_kernel_csr(x.data.dtype) - compute_mean_cov( - grid, - block, - ( - x.indptr, - x.indices, - x.data, - x.shape[0], - x.shape[1], - gram_matrix, - ), - ) - - copy_gram = _copy_kernel(x.data.dtype) - block = (32, 32) - grid = (math.ceil(x.shape[1] / block[0]), math.ceil(x.shape[1] / block[1])) - copy_gram( - grid, - block, - (gram_matrix, x.shape[1]), - ) - - mean_x = x.sum(axis=0) * (1 / x.shape[0]) - gram_matrix *= 1 / x.shape[0] - - if return_gram: - cov_result = cp.zeros( - (gram_matrix.shape[0], gram_matrix.shape[0]), - dtype=gram_matrix.dtype, - ) +def _as_numpy(X): + if isinstance(X, cp.ndarray): + return X.get() else: - cov_result = gram_matrix - - compute_cov = _cov_kernel(x.dtype) - - block_size = (32, 32) - grid_size = (math.ceil(gram_matrix.shape[0] / 8),) * 2 - compute_cov( - grid_size, - block_size, - (cov_result, gram_matrix, mean_x, mean_x, gram_matrix.shape[0]), - ) - - if not return_gram and not return_mean: - return cov_result - elif return_gram and not return_mean: - return cov_result, gram_matrix - elif not return_gram and return_mean: - return cov_result, mean_x, mean_x - elif return_gram and return_mean: - return cov_result, gram_matrix, mean_x, mean_x - - -def _check_matrix_for_zero_genes(X): - gene_ex = cp.zeros(X.shape[1], dtype=cp.int32) - - from ._kernels._pca_sparse_kernel import _zero_genes_kernel - - block = (32,) - grid = (int(math.ceil(X.nnz / block[0])),) - _zero_genes_kernel( - grid, - block, - ( - X.indices, - gene_ex, - X.nnz, - ), - ) - if cp.any(gene_ex == 0): - raise ValueError( - "There are genes with zero expression. " - "Please remove them before running PCA." - ) + return X diff --git a/src/rapids_singlecell/preprocessing/_qc.py b/src/rapids_singlecell/preprocessing/_qc.py new file mode 100644 index 00000000..e14618c3 --- /dev/null +++ b/src/rapids_singlecell/preprocessing/_qc.py @@ -0,0 +1,431 @@ +from __future__ import annotations + +import math +from typing import TYPE_CHECKING + +import cupy as cp +from cuml.internals.memory_utils import with_cupy_rmm +from cupyx.scipy import sparse +from scanpy.get import _get_obs_rep + +from rapids_singlecell._compat import DaskArray + +from ._utils import _check_gpu_X + +if TYPE_CHECKING: + from anndata import AnnData + + from rapids_singlecell._utils import ArrayTypesDask + + +def calculate_qc_metrics( + adata: AnnData, + *, + expr_type: str = "counts", + var_type: str = "genes", + qc_vars: str | list = None, + log1p: bool = True, + layer: str = None, +) -> None: + """\ + Calculates basic qc Parameters. Calculates number of genes per cell (n_genes) and number of counts per cell (n_counts). + Loosely based on calculate_qc_metrics from scanpy [Wolf et al. 2018]. Updates :attr:`~anndata.AnnData.obs` and :attr:`~anndata.AnnData.var` with columns with qc data. + + Parameters + ---------- + adata + AnnData object + expr_type + Name of kind of values in X. + var_type + The kind of thing the variables are. + qc_vars + Keys for boolean columns of :attr:`~anndata.AnnData.var` which identify variables you could want to control for (e.g. Mito). + Run flag_gene_family first + log1p + Set to `False` to skip computing `log1p` transformed annotations. + layer + If provided, use :attr:`~anndata.AnnData.layers` for expression values instead of :attr:`~anndata.AnnData.X`. + + Returns + ------- + adds the following columns in :attr:`~anndata.AnnData.obs` : + `total_{var_type}_by_{expr_type}` + E.g. 'total_genes_by_counts'. Number of genes with positive counts in a cell. + `total_{expr_type}` + E.g. 'total_counts'. Total number of counts for a cell. + for `qc_var` in `qc_vars` + `total_{expr_type}_{qc_var}` + number of counts per qc_var (e.g total counts mitochondrial genes) + `pct_{expr_type}_{qc_var}` + Proportion of counts of qc_var (percent of counts mitochondrial genes) + + adds the following columns in :attr:`~anndata.AnnData.var` : + `total_{expr_type}` + E.g. 'total_counts'. Sum of counts for a gene. + `n_genes_by_{expr_type}` + E.g. 'n_cells_by_counts'. Number of cells this expression is measured in. + `mean_{expr_type}` + E.g. "mean_counts". Mean expression over all cells. + `pct_dropout_by_{expr_type}` + E.g. 'pct_dropout_by_counts'. Percentage of cells this feature does not appear in. + + """ + + X = _get_obs_rep(adata, layer=layer) + + _check_gpu_X(X, allow_dask=True) + + sums_cells, sums_genes, genes_per_cell, cells_per_gene = _basic_qc(X) + # .var + adata.var[f"n_cells_by_{expr_type}"] = cp.asnumpy(cells_per_gene) + adata.var[f"total_{expr_type}"] = cp.asnumpy(sums_genes) + mean_array = sums_genes / adata.n_obs + adata.var[f"mean_{expr_type}"] = cp.asnumpy(mean_array) + adata.var[f"pct_dropout_by_{expr_type}"] = cp.asnumpy( + (1 - cells_per_gene / adata.n_obs) * 100 + ) + if log1p: + adata.var[f"log1p_total_{expr_type}"] = cp.asnumpy(cp.log1p(sums_genes)) + adata.var[f"log1p_mean_{expr_type}"] = cp.asnumpy(cp.log1p(mean_array)) + # .obs + adata.obs[f"n_{var_type}_by_{expr_type}"] = cp.asnumpy(genes_per_cell) + adata.obs[f"total_{expr_type}"] = cp.asnumpy(sums_cells) + if log1p: + adata.obs[f"log1p_n_{var_type}_by_{expr_type}"] = cp.asnumpy( + cp.log1p(genes_per_cell) + ) + adata.obs[f"log1p_total_{expr_type}"] = cp.asnumpy(cp.log1p(sums_cells)) + + if qc_vars: + if isinstance(qc_vars, str): + qc_vars = [qc_vars] + for qc_var in qc_vars: + mask = cp.array(adata.var[qc_var], dtype=cp.bool_) + sums_cells_sub = _geneset_qc(X, mask) + + adata.obs[f"total_{expr_type}_{qc_var}"] = cp.asnumpy(sums_cells_sub) + adata.obs[f"pct_{expr_type}_{qc_var}"] = cp.asnumpy( + sums_cells_sub / sums_cells * 100 + ) + if log1p: + adata.obs[f"log1p_total_{expr_type}_{qc_var}"] = cp.asnumpy( + cp.log1p(sums_cells_sub) + ) + + +def _basic_qc( + X: ArrayTypesDask, +) -> tuple[cp.ndarray, cp.ndarray, cp.ndarray, cp.ndarray]: + if isinstance(X, DaskArray): + return _basic_qc_dask(X) + + sums_cells = cp.zeros(X.shape[0], dtype=X.dtype) + sums_genes = cp.zeros(X.shape[1], dtype=X.dtype) + genes_per_cell = cp.zeros(X.shape[0], dtype=cp.int32) + cells_per_gene = cp.zeros(X.shape[1], dtype=cp.int32) + if sparse.issparse(X): + if sparse.isspmatrix_csr(X): + from ._kernels._qc_kernels import _sparse_qc_csr + + block = (32,) + grid = (int(math.ceil(X.shape[0] / block[0])),) + call_shape = X.shape[0] + sparse_qc_kernel = _sparse_qc_csr(X.data.dtype) + + elif sparse.isspmatrix_csc(X): + from ._kernels._qc_kernels import _sparse_qc_csc + + block = (32,) + grid = (int(math.ceil(X.shape[1] / block[0])),) + call_shape = X.shape[1] + sparse_qc_kernel = _sparse_qc_csc(X.data.dtype) + + else: + raise ValueError("Please use a csr or csc matrix") + sparse_qc_kernel( + grid, + block, + ( + X.indptr, + X.indices, + X.data, + sums_cells, + sums_genes, + genes_per_cell, + cells_per_gene, + call_shape, + ), + ) + else: + from ._kernels._qc_kernels import _sparse_qc_dense + + if not X.flags.c_contiguous: + X = cp.asarray(X, order="C") + block = (16, 16) + grid = ( + int(math.ceil(X.shape[0] / block[0])), + int(math.ceil(X.shape[1] / block[1])), + ) + sparse_qc_dense = _sparse_qc_dense(X.dtype) + sparse_qc_dense( + grid, + block, + ( + X, + sums_cells, + sums_genes, + genes_per_cell, + cells_per_gene, + X.shape[0], + X.shape[1], + ), + ) + return sums_cells, sums_genes, genes_per_cell, cells_per_gene + + +@with_cupy_rmm +def _basic_qc_dask( + X: DaskArray, +) -> tuple[cp.ndarray, cp.ndarray, cp.ndarray, cp.ndarray]: + import dask + + if isinstance(X._meta, sparse.csr_matrix): + from ._kernels._qc_kernels_dask import ( + _sparse_qc_csr_dask_cells, + _sparse_qc_csr_dask_genes, + ) + + sparse_qc_csr_cells = _sparse_qc_csr_dask_cells(X.dtype) + sparse_qc_csr_cells.compile() + + def __qc_calc_1(X_part): + sums_cells = cp.zeros(X_part.shape[0], dtype=X_part.dtype) + genes_per_cell = cp.zeros(X_part.shape[0], dtype=cp.int32) + block = (32,) + grid = (int(math.ceil(X_part.shape[0] / block[0])),) + + sparse_qc_csr_cells( + grid, + block, + ( + X_part.indptr, + X_part.indices, + X_part.data, + sums_cells, + genes_per_cell, + X_part.shape[0], + ), + ) + return cp.stack([sums_cells, genes_per_cell.astype(X_part.dtype)], axis=1) + + sparse_qc_csr_genes = _sparse_qc_csr_dask_genes(X.dtype) + sparse_qc_csr_genes.compile() + + def __qc_calc_2(X_part): + sums_genes = cp.zeros(X_part.shape[1], dtype=X_part.dtype) + cells_per_gene = cp.zeros(X_part.shape[1], dtype=cp.int32) + block = (32,) + grid = (int(math.ceil(X_part.nnz / block[0])),) + sparse_qc_csr_genes( + grid, + block, + ( + X_part.indices, + X_part.data, + sums_genes, + cells_per_gene, + X_part.nnz, + ), + ) + return cp.vstack([sums_genes, cells_per_gene.astype(X_part.dtype)])[ + None, ... + ] + + elif isinstance(X._meta, cp.ndarray): + from ._kernels._qc_kernels_dask import ( + _sparse_qc_dense_cells, + _sparse_qc_dense_genes, + ) + + sparse_qc_dense_cells = _sparse_qc_dense_cells(X.dtype) + sparse_qc_dense_cells.compile() + + def __qc_calc_1(X_part): + sums_cells = cp.zeros(X_part.shape[0], dtype=X_part.dtype) + genes_per_cell = cp.zeros(X_part.shape[0], dtype=cp.int32) + if not X_part.flags.c_contiguous: + X_part = cp.asarray(X_part, order="C") + block = (16, 16) + grid = ( + int(math.ceil(X_part.shape[0] / block[0])), + int(math.ceil(X_part.shape[1] / block[1])), + ) + sparse_qc_dense_cells( + grid, + block, + ( + X_part, + sums_cells, + genes_per_cell, + X_part.shape[0], + X_part.shape[1], + ), + ) + return cp.stack([sums_cells, genes_per_cell.astype(X_part.dtype)], axis=1) + + sparse_qc_dense_genes = _sparse_qc_dense_genes(X.dtype) + sparse_qc_dense_genes.compile() + + def __qc_calc_2(X_part): + sums_genes = cp.zeros((X_part.shape[1]), dtype=X_part.dtype) + cells_per_gene = cp.zeros((X_part.shape[1]), dtype=cp.int32) + if not X_part.flags.c_contiguous: + X_part = cp.asarray(X_part, order="C") + block = (16, 16) + grid = ( + int(math.ceil(X_part.shape[0] / block[0])), + int(math.ceil(X_part.shape[1] / block[1])), + ) + sparse_qc_dense_genes( + grid, + block, + ( + X_part, + sums_genes, + cells_per_gene, + X_part.shape[0], + X_part.shape[1], + ), + ) + return cp.vstack([sums_genes, cells_per_gene.astype(X_part.dtype)])[ + None, ... + ] + else: + raise ValueError( + "Please use a cupy csr_matrix or cp.ndarray. csc_matrix are not supported with dask." + ) + + cell_results = X.map_blocks( + __qc_calc_1, + chunks=(X.chunks[0], (2,)), + dtype=X.dtype, + meta=cp.empty((0, 2), dtype=X.dtype), + ) + sums_cells = cell_results[:, 0] + genes_per_cell = cell_results[:, 1] + + n_blocks = X.blocks.size + sums_genes, cells_per_gene = X.map_blocks( + __qc_calc_2, + new_axis=(1,), + chunks=((1,) * n_blocks, (2,), (X.shape[1],)), + dtype=X.dtype, + meta=cp.array([]), + ).sum(axis=0) + + sums_cells, genes_per_cell, sums_genes, cells_per_gene = dask.compute( + sums_cells, genes_per_cell, sums_genes, cells_per_gene + ) + + return ( + sums_cells.ravel(), + sums_genes.ravel(), + genes_per_cell.ravel().astype(cp.int32), + cells_per_gene.ravel().astype(cp.int32), + ) + + +def _geneset_qc(X: ArrayTypesDask, mask: cp.ndarray) -> cp.ndarray: + if isinstance(X, DaskArray): + return _geneset_qc_dask(X, mask) + sums_cells_sub = cp.zeros(X.shape[0], dtype=X.dtype) + if sparse.issparse(X): + if sparse.isspmatrix_csr(X): + from ._kernels._qc_kernels import _sparse_qc_csr_sub + + block = (32,) + grid = (int(math.ceil(X.shape[0] / block[0])),) + call_shape = X.shape[0] + sparse_qc_sub = _sparse_qc_csr_sub(X.data.dtype) + + elif sparse.isspmatrix_csc(X): + from ._kernels._qc_kernels import _sparse_qc_csc_sub + + block = (32,) + grid = (int(math.ceil(X.shape[1] / block[0])),) + call_shape = X.shape[1] + sparse_qc_sub = _sparse_qc_csc_sub(X.data.dtype) + + sparse_qc_sub( + grid, + block, + (X.indptr, X.indices, X.data, sums_cells_sub, mask, call_shape), + ) + else: + from ._kernels._qc_kernels import _sparse_qc_dense_sub + + block = (16, 16) + grid = ( + int(math.ceil(X.shape[0] / block[0])), + int(math.ceil(X.shape[1] / block[1])), + ) + sparse_qc_dense_sub = _sparse_qc_dense_sub(X.dtype) + sparse_qc_dense_sub( + grid, block, (X, sums_cells_sub, mask, X.shape[0], X.shape[1]) + ) + return sums_cells_sub + + +@with_cupy_rmm +def _geneset_qc_dask(X: DaskArray, mask: cp.ndarray) -> cp.ndarray: + if isinstance(X._meta, sparse.csr_matrix): + from ._kernels._qc_kernels import _sparse_qc_csr_sub + + sparse_qc_csr = _sparse_qc_csr_sub(X.dtype) + sparse_qc_csr.compile() + + def __qc_calc(X_part): + sums_cells_sub = cp.zeros(X_part.shape[0], dtype=X_part.dtype) + block = (32,) + grid = (int(math.ceil(X_part.shape[0] / block[0])),) + sparse_qc_csr( + grid, + block, + ( + X_part.indptr, + X_part.indices, + X_part.data, + sums_cells_sub, + mask, + X_part.shape[0], + ), + ) + return sums_cells_sub + + elif isinstance(X._meta, cp.ndarray): + from ._kernels._qc_kernels import _sparse_qc_dense_sub + + sparse_qc_dense = _sparse_qc_dense_sub(X.dtype) + sparse_qc_dense.compile() + + def __qc_calc(X_part): + sums_cells_sub = cp.zeros(X_part.shape[0], dtype=X_part.dtype) + if not X_part.flags.c_contiguous: + X_part = cp.asarray(X_part, order="C") + block = (16, 16) + grid = ( + int(math.ceil(X_part.shape[0] / block[0])), + int(math.ceil(X_part.shape[1] / block[1])), + ) + sparse_qc_dense( + grid, + block, + (X_part, sums_cells_sub, mask, X_part.shape[0], X_part.shape[1]), + ) + return sums_cells_sub + + sums_cells_sub = X.map_blocks( + __qc_calc, dtype=X.dtype, meta=cp.array([]), drop_axis=1 + ).compute() + return sums_cells_sub.ravel() diff --git a/src/rapids_singlecell/preprocessing/_scale.py b/src/rapids_singlecell/preprocessing/_scale.py index 37f6cc4b..13678258 100644 --- a/src/rapids_singlecell/preprocessing/_scale.py +++ b/src/rapids_singlecell/preprocessing/_scale.py @@ -4,11 +4,18 @@ from typing import Union import cupy as cp +import dask +import dask.array as da import numpy as np from anndata import AnnData from cupyx.scipy import sparse from scanpy._utils import view_to_actual +from rapids_singlecell._compat import ( + DaskArray, + _meta_dense, + _meta_sparse, +) from rapids_singlecell.get import _check_mask, _get_obs_rep, _set_obs_rep from rapids_singlecell.preprocessing._utils import ( _check_gpu_X, @@ -61,6 +68,7 @@ def scale( inplace If True, update AnnData with results. Otherwise, return results. See below for details of what is returned. + Returns ------- Returns a scaled copy or updates `adata` with a scaled version of the original :attr:`~anndata.AnnData.X` and `adata.layers['layer']`, \ @@ -76,7 +84,7 @@ def scale( view_to_actual(adata) X = _get_obs_rep(adata, layer=layer, obsm=obsm) - _check_gpu_X(X) + _check_gpu_X(X, allow_dask=True) str_mean_std = ("mean", "std") if mask_obs is not None: @@ -86,7 +94,16 @@ def scale( str_mean_std = ("mean with mask", "std with mask") mask_obs = _check_mask(adata, mask_obs, "obs") - if isinstance(X, cp.ndarray): + if isinstance(X, DaskArray): + X, means, std = _scale_dask( + X, + mask_obs=mask_obs, + zero_center=zero_center, + inplace=inplace, + max_value=max_value, + ) + + elif isinstance(X, cp.ndarray): X, means, std = _scale_array( X, mask_obs=mask_obs, @@ -245,12 +262,182 @@ def _scale_sparse_csr( scale_csr( (X.shape[0],), (64,), - (X.indptr, X.indices, X.data, std, mask_array, max_value, X.shape[0]), + ( + X.indptr, + X.indices, + X.data, + std.astype(X.dtype), + mask_array, + max_value, + X.shape[0], + ), ) return X, mean, std +def _scale_dask(X, *, mask_obs=None, zero_center=True, inplace=True, max_value=None): + if not inplace: + X = X.copy() + mean, var = dask.compute( + *_get_mean_var(X[mask_obs if mask_obs is not None else slice(None), :]) + ) + if mask_obs is None: + mask_array = cp.ones(X.shape[0], dtype=cp.int32) + else: + mask_array = cp.array(mask_obs).astype(cp.int32) + std = cp.sqrt(var) + std[std == 0] = 1 + max_value = _get_max_value(max_value, X.dtype) + + mask_array = da.from_array( + mask_array, chunks=(X.chunks[0],), meta=_meta_dense(mask_array.dtype) + ) + + if isinstance(X._meta, sparse.csr_matrix) and zero_center: + from ._kernels._sparse2dense import _sparse2densekernel + + kernel = _sparse2densekernel(X.dtype) + kernel.compile() + + def __dense(X_part): + major, minor = X_part.shape + dense = cp.zeros(X_part.shape, order="C", dtype=X_part.dtype) + max_nnz = cp.diff(X_part.indptr).max() + tpb = (32, 32) + bpg_x = math.ceil(major / tpb[0]) + bpg_y = math.ceil(max_nnz / tpb[1]) + bpg = (bpg_x, bpg_y) + + kernel( + bpg, + tpb, + (X_part.indptr, X_part.indices, X_part.data, dense, major, minor, 1), + ) + return dense + + X = X.map_blocks( + lambda x: __dense(x), + dtype=X.dtype, + meta=_meta_dense(X.dtype), + ) + scale = _scale_dask_array_zc + elif isinstance(X._meta, sparse.csr_matrix) and not zero_center: + scale = _scale_sparse_csr_dask + elif isinstance(X._meta, cp.ndarray) and zero_center: + scale = _scale_dask_array_zc + elif isinstance(X._meta, cp.ndarray) and not zero_center: + scale = _scale_dask_array_nzc + else: + raise ValueError( + "Invalid `._meta` type only supports `cupyx.scipy.sparse.csr_matrix` and `cp.ndarray`" + ) + return scale(X, mask_array=mask_array, mean=mean, std=std, max_value=max_value) + + +def _scale_dask_array_zc(X, *, mask_array, mean, std, max_value): + from ._kernels._scale_kernel import _dense_center_scale_kernel + + scale_kernel_center = _dense_center_scale_kernel(X.dtype) + scale_kernel_center.compile() + + mean_ = mean.astype(X.dtype) + std_ = std.astype(X.dtype) + + def __scale_kernel_center(X_part, mask_part): + scale_kernel_center( + (math.ceil(X_part.shape[0] / 32), math.ceil(X_part.shape[1] / 32)), + (32, 32), + ( + X_part, + mean_, + std_, + mask_part, + max_value, + X_part.shape[0], + X_part.shape[1], + ), + ) + return X_part + + X = da.blockwise( + __scale_kernel_center, + "ij", + X, + "ij", + mask_array, + "i", + meta=_meta_dense(X.dtype), + dtype=X.dtype, + ) + return X, mean, std + + +def _scale_dask_array_nzc(X, *, mask_array, mean, std, max_value): + from ._kernels._scale_kernel import _dense_scale_kernel + + scale_kernel = _dense_scale_kernel(X.dtype) + scale_kernel.compile() + std_ = std.astype(X.dtype) + + def __scale_kernel(X_part, mask_part): + scale_kernel( + (math.ceil(X_part.shape[0] / 32), math.ceil(X_part.shape[1] / 32)), + (32, 32), + (X_part, std_, mask_part, max_value, X_part.shape[0], X_part.shape[1]), + ) + + return X_part + + X = da.blockwise( + __scale_kernel, + "ij", + X, + "ij", + mask_array, + "i", + meta=_meta_dense(X.dtype), + dtype=X.dtype, + ) + return X, mean, std + + +def _scale_sparse_csr_dask(X, *, mask_array, mean, std, max_value): + from ._kernels._scale_kernel import _csr_scale_kernel + + scale_kernel_csr = _csr_scale_kernel(X.dtype) + scale_kernel_csr.compile() + std_ = std.astype(X.dtype) + + def __scale_kernel_csr(X_part, mask_part): + scale_kernel_csr( + (X_part.shape[0],), + (64,), + ( + X_part.indptr, + X_part.indices, + X_part.data, + std_, + mask_part, + max_value, + X_part.shape[0], + ), + ) + return X_part + + X = da.blockwise( + __scale_kernel_csr, + "ij", + X, + "ij", + mask_array, + "i", + meta=_meta_sparse(X.dtype), + dtype=X.dtype, + ) + return X, mean, std + + def _get_max_value(val, dtype): if val is None: val = np.inf diff --git a/src/rapids_singlecell/preprocessing/_simple.py b/src/rapids_singlecell/preprocessing/_simple.py index e0db508f..3e63445f 100644 --- a/src/rapids_singlecell/preprocessing/_simple.py +++ b/src/rapids_singlecell/preprocessing/_simple.py @@ -1,211 +1,16 @@ from __future__ import annotations -import math from typing import TYPE_CHECKING import cupy as cp import numpy as np -from cupyx.scipy import sparse -from scanpy.get import _get_obs_rep -from ._utils import _check_gpu_X +from ._qc import calculate_qc_metrics if TYPE_CHECKING: from anndata import AnnData -def calculate_qc_metrics( - adata: AnnData, - *, - expr_type: str = "counts", - var_type: str = "genes", - qc_vars: str | list = None, - log1p: bool = True, - layer: str = None, -) -> None: - """\ - Calculates basic qc Parameters. Calculates number of genes per cell (n_genes) and number of counts per cell (n_counts). - Loosely based on calculate_qc_metrics from scanpy [Wolf et al. 2018]. Updates :attr:`~anndata.AnnData.obs` and :attr:`~anndata.AnnData.var` with columns with qc data. - - Parameters - ---------- - adata - AnnData object - expr_type - Name of kind of values in X. - var_type - The kind of thing the variables are. - qc_vars - Keys for boolean columns of :attr:`~anndata.AnnData.var` which identify variables you could want to control for (e.g. Mito). - Run flag_gene_family first - log1p - Set to `False` to skip computing `log1p` transformed annotations. - layer - If provided, use :attr:`~anndata.AnnData.layers` for expression values instead of :attr:`~anndata.AnnData.X`. - - Returns - ------- - adds the following columns in :attr:`~anndata.AnnData.obs` : - `total_{var_type}_by_{expr_type}` - E.g. 'total_genes_by_counts'. Number of genes with positive counts in a cell. - `total_{expr_type}` - E.g. 'total_counts'. Total number of counts for a cell. - for `qc_var` in `qc_vars` - `total_{expr_type}_{qc_var}` - number of counts per qc_var (e.g total counts mitochondrial genes) - `pct_{expr_type}_{qc_var}` - Proportion of counts of qc_var (percent of counts mitochondrial genes) - - adds the following columns in :attr:`~anndata.AnnData.var` : - `total_{expr_type}` - E.g. 'total_counts'. Sum of counts for a gene. - `n_genes_by_{expr_type}` - E.g. 'n_cells_by_counts'. Number of cells this expression is measured in. - `mean_{expr_type}` - E.g. "mean_counts". Mean expression over all cells. - `pct_dropout_by_{expr_type}` - E.g. 'pct_dropout_by_counts'. Percentage of cells this feature does not appear in. - - """ - - X = _get_obs_rep(adata, layer=layer) - - _check_gpu_X(X) - - sums_cells = cp.zeros(X.shape[0], dtype=X.dtype) - sums_genes = cp.zeros(X.shape[1], dtype=X.dtype) - cell_ex = cp.zeros(X.shape[0], dtype=cp.int32) - gene_ex = cp.zeros(X.shape[1], dtype=cp.int32) - if sparse.issparse(X): - if sparse.isspmatrix_csr(X): - from ._kernels._qc_kernels import _sparse_qc_csr - - block = (32,) - grid = (int(math.ceil(X.shape[0] / block[0])),) - sparse_qc_csr = _sparse_qc_csr(X.data.dtype) - sparse_qc_csr( - grid, - block, - ( - X.indptr, - X.indices, - X.data, - sums_cells, - sums_genes, - cell_ex, - gene_ex, - X.shape[0], - ), - ) - elif sparse.isspmatrix_csc(X): - from ._kernels._qc_kernels import _sparse_qc_csc - - block = (32,) - grid = (int(math.ceil(X.shape[1] / block[0])),) - sparse_qc_csc = _sparse_qc_csc(X.data.dtype) - sparse_qc_csc( - grid, - block, - ( - X.indptr, - X.indices, - X.data, - sums_cells, - sums_genes, - cell_ex, - gene_ex, - X.shape[1], - ), - ) - else: - raise ValueError("Please use a csr or csc matrix") - else: - from ._kernels._qc_kernels import _sparse_qc_dense - - if not X.flags.c_contiguous: - X = cp.asarray(X, order="C") - block = (16, 16) - grid = ( - int(math.ceil(X.shape[0] / block[0])), - int(math.ceil(X.shape[1] / block[1])), - ) - sparse_qc_dense = _sparse_qc_dense(X.dtype) - sparse_qc_dense( - grid, - block, - (X, sums_cells, sums_genes, cell_ex, gene_ex, X.shape[0], X.shape[1]), - ) - - # .var - adata.var[f"n_cells_by_{expr_type}"] = cp.asnumpy(gene_ex) - adata.var[f"total_{expr_type}"] = cp.asnumpy(sums_genes) - mean_array = sums_genes / adata.n_obs - adata.var[f"mean_{expr_type}"] = cp.asnumpy(mean_array) - adata.var[f"pct_dropout_by_{expr_type}"] = cp.asnumpy( - (1 - gene_ex / adata.n_obs) * 100 - ) - if log1p: - adata.var[f"log1p_total_{expr_type}"] = cp.asnumpy(cp.log1p(sums_genes)) - adata.var[f"log1p_mean_{expr_type}"] = cp.asnumpy(cp.log1p(mean_array)) - # .obs - adata.obs[f"n_{var_type}_by_{expr_type}"] = cp.asnumpy(cell_ex) - adata.obs[f"total_{expr_type}"] = cp.asnumpy(sums_cells) - if log1p: - adata.obs[f"log1p_n_{var_type}_by_{expr_type}"] = cp.asnumpy(cp.log1p(cell_ex)) - adata.obs[f"log1p_total_{expr_type}"] = cp.asnumpy(cp.log1p(sums_cells)) - - if qc_vars: - if isinstance(qc_vars, str): - qc_vars = [qc_vars] - for qc_var in qc_vars: - sums_cells_sub = cp.zeros(X.shape[0], dtype=X.dtype) - mask = cp.array(adata.var[qc_var], dtype=cp.bool_) - if sparse.issparse(X): - if sparse.isspmatrix_csr(X): - from ._kernels._qc_kernels import _sparse_qc_csr_sub - - block = (32,) - grid = (int(math.ceil(X.shape[0] / block[0])),) - sparse_qc_csr_sub = _sparse_qc_csr_sub(X.data.dtype) - sparse_qc_csr_sub( - grid, - block, - (X.indptr, X.indices, X.data, sums_cells_sub, mask, X.shape[0]), - ) - elif sparse.isspmatrix_csc(X): - from ._kernels._qc_kernels import _sparse_qc_csc_sub - - block = (32,) - grid = (int(math.ceil(X.shape[1] / block[0])),) - sparse_qc_csc_sub = _sparse_qc_csc_sub(X.data.dtype) - sparse_qc_csc_sub( - grid, - block, - (X.indptr, X.indices, X.data, sums_cells_sub, mask, X.shape[1]), - ) - - else: - from ._kernels._qc_kernels import _sparse_qc_dense_sub - - block = (16, 16) - grid = ( - int(math.ceil(X.shape[0] / block[0])), - int(math.ceil(X.shape[1] / block[1])), - ) - sparse_qc_dense_sub = _sparse_qc_dense_sub(X.dtype) - sparse_qc_dense_sub( - grid, block, (X, sums_cells_sub, mask, X.shape[0], X.shape[1]) - ) - adata.obs[f"total_{expr_type}_{qc_var}"] = cp.asnumpy(sums_cells_sub) - adata.obs[f"pct_{expr_type}_{qc_var}"] = cp.asnumpy( - sums_cells_sub / sums_cells * 100 - ) - if log1p: - adata.obs[f"log1p_total_{expr_type}_{qc_var}"] = cp.asnumpy( - cp.log1p(sums_cells_sub) - ) - - def flag_gene_family( adata: AnnData, *, diff --git a/src/rapids_singlecell/preprocessing/_sparse_pca/_dask_sparse_pca.py b/src/rapids_singlecell/preprocessing/_sparse_pca/_dask_sparse_pca.py new file mode 100644 index 00000000..41001eb7 --- /dev/null +++ b/src/rapids_singlecell/preprocessing/_sparse_pca/_dask_sparse_pca.py @@ -0,0 +1,200 @@ +from __future__ import annotations + +import math + +import cupy as cp +from cuml.internals.memory_utils import with_cupy_rmm + +from rapids_singlecell._compat import ( + _meta_dense, +) +from rapids_singlecell.preprocessing._utils import _get_mean_var + + +class PCA_sparse_dask: + def __init__(self, n_components) -> None: + self.n_components = n_components + + def fit(self, x): + if self.n_components is None: + n_rows = x.shape[0] + n_cols = x.shape[1] + self.n_components_ = min(n_rows, n_cols) + else: + self.n_components_ = self.n_components + + self.n_samples_ = x.shape[0] + self.n_features_in_ = x.shape[1] if x.ndim == 2 else 1 + self.dtype = x.dtype + covariance, self.mean_, _ = _cov_sparse_dask(x=x, return_mean=True) + self.explained_variance_, self.components_ = cp.linalg.eigh( + covariance, UPLO="U" + ) + # NOTE: We reverse the eigen vector and eigen values here + # because cupy provides them in ascending order. Make a copy otherwise + # it is not C_CONTIGUOUS anymore and would error when converting to + # CumlArray + self.explained_variance_ = self.explained_variance_[::-1] + + self.components_ = cp.flip(self.components_, axis=1) + + self.components_ = self.components_.T[: self.n_components_, :] + + self.explained_variance_ratio_ = self.explained_variance_ / cp.sum( + self.explained_variance_ + ) + if self.n_components_ < min(self.n_samples_, self.n_features_in_): + self.noise_variance_ = self.explained_variance_[self.n_components_ :].mean() + else: + self.noise_variance_ = cp.array([0.0]) + self.explained_variance_ = self.explained_variance_[: self.n_components_] + + self.explained_variance_ratio_ = self.explained_variance_ratio_[ + : self.n_components_ + ] + return self + + def transform(self, X): + def _transform(X_part, mean_, components_): + pre_mean = mean_ @ components_.T + mean_impact = cp.ones( + (X_part.shape[0], 1), dtype=X_part.dtype + ) @ pre_mean.reshape(1, -1) + X_transformed = X_part.dot(components_.T) - mean_impact + return X_transformed + + X_pca = X.map_blocks( + _transform, + mean_=self.mean_, + components_=self.components_, + dtype=X.dtype, + chunks=(X.chunks[0], self.n_components_), + meta=_meta_dense(X.dtype), + ) + + self.components_ = self.components_.get() + self.explained_variance_ = self.explained_variance_.get() + self.explained_variance_ratio_ = self.explained_variance_ratio_.get() + return X_pca + + def fit_transform(self, X, y=None): + return self.fit(X).transform(X) + + +@with_cupy_rmm +def _cov_sparse_dask(x, return_gram=False, return_mean=False): + """ + Computes the mean and the covariance of matrix X of + the form Cov(X, X) = E(XX) - E(X)E(X) + + This is a temporary fix for + cuml issue #5475 and cupy issue #7699, + where the operation `x.T.dot(x)` did not work for + larger sparse matrices. + + Parameters + ---------- + + x : cupyx.scipy.sparse of size (m, n) + return_gram : boolean (default = False) + If True, gram matrix of the form (1 / n) * X.T.dot(X) + will be returned. + When True, a copy will be created + to store the results of the covariance. + When False, the local gram matrix result + will be overwritten + return_mean: boolean (default = False) + If True, the Maximum Likelihood Estimate used to + calculate the mean of X and X will be returned, + of the form (1 / n) * mean(X) and (1 / n) * mean(X) + + Returns + ------- + + result : cov(X, X) when return_gram and return_mean are False + cov(X, X), gram(X, X) when return_gram is True, + return_mean is False + cov(X, X), mean(X), mean(X) when return_gram is False, + return_mean is True + cov(X, X), gram(X, X), mean(X), mean(X) + when return_gram is True and return_mean is True + """ + import dask + + from ._kernels._pca_sparse_kernel import ( + _copy_kernel, + _cov_kernel, + _gramm_kernel_csr, + ) + + compute_mean_cov = _gramm_kernel_csr(x.dtype) + compute_mean_cov.compile() + n_cols = x.shape[1] + + def __gram_block(x_part): + gram_matrix = cp.zeros((n_cols, n_cols), dtype=x.dtype) + + block = (128,) + grid = (x_part.shape[0],) + compute_mean_cov( + grid, + block, + ( + x_part.indptr, + x_part.indices, + x_part.data, + x_part.shape[0], + n_cols, + gram_matrix, + ), + ) + return gram_matrix[None, ...] # need new axis for summing + + n_blocks = x.blocks.size + gram_matrix = x.map_blocks( + __gram_block, + new_axis=(1,), + chunks=((1,) * n_blocks, (x.shape[1],), (x.shape[1],)), + meta=cp.array([]), + dtype=x.dtype, + ).sum(axis=0) + mean_x, _ = _get_mean_var(x) + gram_matrix, mean_x = dask.compute(gram_matrix, mean_x) + mean_x = mean_x.astype(x.dtype) + copy_gram = _copy_kernel(x.dtype) + block = (32, 32) + grid = (math.ceil(n_cols / block[0]), math.ceil(n_cols / block[1])) + copy_gram( + grid, + block, + (gram_matrix, n_cols), + ) + + gram_matrix *= 1 / x.shape[0] + + if return_gram: + cov_result = cp.zeros( + (gram_matrix.shape[0], gram_matrix.shape[0]), + dtype=gram_matrix.dtype, + ) + else: + cov_result = gram_matrix + + compute_cov = _cov_kernel(gram_matrix.dtype) + + block_size = (32, 32) + grid_size = (math.ceil(gram_matrix.shape[0] / 8),) * 2 + compute_cov( + grid_size, + block_size, + (cov_result, gram_matrix, mean_x, mean_x, gram_matrix.shape[0]), + ) + + if not return_gram and not return_mean: + return cov_result + elif return_gram and not return_mean: + return cov_result, gram_matrix + elif not return_gram and return_mean: + return cov_result, mean_x, mean_x + elif return_gram and return_mean: + return cov_result, gram_matrix, mean_x, mean_x diff --git a/src/rapids_singlecell/preprocessing/_kernels/_pca_sparse_kernel.py b/src/rapids_singlecell/preprocessing/_sparse_pca/_kernels/_pca_sparse_kernel.py similarity index 99% rename from src/rapids_singlecell/preprocessing/_kernels/_pca_sparse_kernel.py rename to src/rapids_singlecell/preprocessing/_sparse_pca/_kernels/_pca_sparse_kernel.py index 11aa197d..878d79de 100644 --- a/src/rapids_singlecell/preprocessing/_kernels/_pca_sparse_kernel.py +++ b/src/rapids_singlecell/preprocessing/_sparse_pca/_kernels/_pca_sparse_kernel.py @@ -61,6 +61,7 @@ } """ + _zero_genes_kernel = cp.RawKernel(check_zero_genes, "check_zero_genes") diff --git a/src/rapids_singlecell/preprocessing/_sparse_pca/_sparse_pca.py b/src/rapids_singlecell/preprocessing/_sparse_pca/_sparse_pca.py new file mode 100644 index 00000000..2f9d5117 --- /dev/null +++ b/src/rapids_singlecell/preprocessing/_sparse_pca/_sparse_pca.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +import math + +import cupy as cp + + +class PCA_sparse: + def __init__(self, n_components) -> None: + self.n_components = n_components + + def fit(self, x): + if self.n_components is None: + n_rows = x.shape[0] + n_cols = x.shape[1] + self.n_components_ = min(n_rows, n_cols) + else: + self.n_components_ = self.n_components + + _check_matrix_for_zero_genes(x) + self.n_samples_ = x.shape[0] + self.n_features_in_ = x.shape[1] if x.ndim == 2 else 1 + self.dtype = x.data.dtype + + covariance, self.mean_, _ = _cov_sparse(x=x, return_mean=True) + + self.explained_variance_, self.components_ = cp.linalg.eigh( + covariance, UPLO="U" + ) + + # NOTE: We reverse the eigen vector and eigen values here + # because cupy provides them in ascending order. Make a copy otherwise + # it is not C_CONTIGUOUS anymore and would error when converting to + # CumlArray + self.explained_variance_ = self.explained_variance_[::-1] + + self.components_ = cp.flip(self.components_, axis=1) + + self.components_ = self.components_.T[: self.n_components_, :] + + self.explained_variance_ratio_ = self.explained_variance_ / cp.sum( + self.explained_variance_ + ) + + self.explained_variance_ = self.explained_variance_[: self.n_components_] + + self.explained_variance_ratio_ = self.explained_variance_ratio_[ + : self.n_components_ + ] + + return self + + def transform(self, X): + precomputed_mean_impact = self.mean_ @ self.components_.T + mean_impact = cp.ones( + (X.shape[0], 1), dtype=cp.float32 + ) @ precomputed_mean_impact.reshape(1, -1) + X_transformed = X.dot(self.components_.T) - mean_impact + # X = X - self.mean_ + # X_transformed = X.dot(self.components_.T) + self.components_ = self.components_.get() + self.explained_variance_ = self.explained_variance_.get() + self.explained_variance_ratio_ = self.explained_variance_ratio_.get() + return X_transformed.get() + + def fit_transform(self, X, y=None): + return self.fit(X).transform(X) + + +def _cov_sparse(x, return_gram=False, return_mean=False): + """ + Computes the mean and the covariance of matrix X of + the form Cov(X, X) = E(XX) - E(X)E(X) + + This is a temporary fix for + cuml issue #5475 and cupy issue #7699, + where the operation `x.T.dot(x)` did not work for + larger sparse matrices. + + Parameters + ---------- + + x : cupyx.scipy.sparse of size (m, n) + return_gram : boolean (default = False) + If True, gram matrix of the form (1 / n) * X.T.dot(X) + will be returned. + When True, a copy will be created + to store the results of the covariance. + When False, the local gram matrix result + will be overwritten + return_mean: boolean (default = False) + If True, the Maximum Likelihood Estimate used to + calculate the mean of X and X will be returned, + of the form (1 / n) * mean(X) and (1 / n) * mean(X) + + Returns + ------- + + result : cov(X, X) when return_gram and return_mean are False + cov(X, X), gram(X, X) when return_gram is True, + return_mean is False + cov(X, X), mean(X), mean(X) when return_gram is False, + return_mean is True + cov(X, X), gram(X, X), mean(X), mean(X) + when return_gram is True and return_mean is True + """ + + from ._kernels._pca_sparse_kernel import ( + _copy_kernel, + _cov_kernel, + _gramm_kernel_csr, + ) + + gram_matrix = cp.zeros((x.shape[1], x.shape[1]), dtype=x.data.dtype) + + block = (128,) + grid = (x.shape[0],) + compute_mean_cov = _gramm_kernel_csr(x.data.dtype) + compute_mean_cov( + grid, + block, + ( + x.indptr, + x.indices, + x.data, + x.shape[0], + x.shape[1], + gram_matrix, + ), + ) + + copy_gram = _copy_kernel(x.data.dtype) + block = (32, 32) + grid = (math.ceil(x.shape[1] / block[0]), math.ceil(x.shape[1] / block[1])) + copy_gram( + grid, + block, + (gram_matrix, x.shape[1]), + ) + + mean_x = x.sum(axis=0) * (1 / x.shape[0]) + gram_matrix *= 1 / x.shape[0] + + if return_gram: + cov_result = cp.zeros( + (gram_matrix.shape[0], gram_matrix.shape[0]), + dtype=gram_matrix.dtype, + ) + else: + cov_result = gram_matrix + + compute_cov = _cov_kernel(x.dtype) + + block_size = (32, 32) + grid_size = (math.ceil(gram_matrix.shape[0] / 8),) * 2 + compute_cov( + grid_size, + block_size, + (cov_result, gram_matrix, mean_x, mean_x, gram_matrix.shape[0]), + ) + + if not return_gram and not return_mean: + return cov_result + elif return_gram and not return_mean: + return cov_result, gram_matrix + elif not return_gram and return_mean: + return cov_result, mean_x, mean_x + elif return_gram and return_mean: + return cov_result, gram_matrix, mean_x, mean_x + + +def _check_matrix_for_zero_genes(X): + gene_ex = cp.zeros(X.shape[1], dtype=cp.int32) + + from ._kernels._pca_sparse_kernel import _zero_genes_kernel + + block = (32,) + grid = (int(math.ceil(X.nnz / block[0])),) + _zero_genes_kernel( + grid, + block, + ( + X.indices, + gene_ex, + X.nnz, + ), + ) + if cp.any(gene_ex == 0): + raise ValueError( + "There are genes with zero expression. " + "Please remove them before running PCA." + ) diff --git a/src/rapids_singlecell/preprocessing/_utils.py b/src/rapids_singlecell/preprocessing/_utils.py index 1bb533f2..09e8eb43 100644 --- a/src/rapids_singlecell/preprocessing/_utils.py +++ b/src/rapids_singlecell/preprocessing/_utils.py @@ -5,8 +5,11 @@ import cupy as cp import numpy as np +from cuml.internals.memory_utils import with_cupy_rmm from cupyx.scipy.sparse import issparse, isspmatrix_csc, isspmatrix_csr, spmatrix +from rapids_singlecell._compat import DaskArray + if TYPE_CHECKING: from anndata import AnnData @@ -75,6 +78,127 @@ def _mean_var_minor(X, major, minor): return mean, var +@with_cupy_rmm +def _mean_var_minor_dask(X, major, minor): + """ + Implements sum operation for dask array when the backend is cupy sparse csr matrix + """ + + from rapids_singlecell.preprocessing._kernels._mean_var_kernel import ( + _get_mean_var_minor, + ) + + get_mean_var_minor = _get_mean_var_minor(X.dtype) + get_mean_var_minor.compile() + + def __mean_var(X_part): + mean = cp.zeros(minor, dtype=cp.float64) + var = cp.zeros(minor, dtype=cp.float64) + block = (32,) + grid = (int(math.ceil(X_part.nnz / block[0])),) + get_mean_var_minor( + grid, block, (X_part.indices, X_part.data, mean, var, major, X_part.nnz) + ) + return cp.vstack([mean, var])[None, ...] # new axis for summing + + n_blocks = X.blocks.size + mean, var = X.map_blocks( + __mean_var, + new_axis=(1,), + chunks=((1,) * n_blocks, (2,), (minor,)), + dtype=cp.float64, + meta=cp.array([]), + ).sum(axis=0) + var = (var - mean**2) * (major / (major - 1)) + return mean, var + + +# todo: Implement this dynamically for csc matrix as well +@with_cupy_rmm +def _mean_var_major_dask(X, major, minor): + """ + Implements sum operation for dask array when the backend is cupy sparse csr matrix + """ + + from rapids_singlecell.preprocessing._kernels._mean_var_kernel import ( + _get_mean_var_major, + ) + + get_mean_var_major = _get_mean_var_major(X.dtype) + get_mean_var_major.compile() + + def __mean_var(X_part): + mean = cp.zeros(X_part.shape[0], dtype=cp.float64) + var = cp.zeros(X_part.shape[0], dtype=cp.float64) + block = (64,) + grid = (X_part.shape[0],) + get_mean_var_major( + grid, + block, + ( + X_part.indptr, + X_part.indices, + X_part.data, + mean, + var, + X_part.shape[0], + minor, + ), + ) + return cp.vstack([mean, var]) + + mean, var = X.map_blocks( + __mean_var, + chunks=((2,), X.chunks[0]), + dtype=cp.float64, + meta=cp.array([]), + ) + + mean = mean / minor + var = var / minor + var -= cp.power(mean, 2) + var *= minor / (minor - 1) + return mean, var + + +@with_cupy_rmm +def _mean_var_dense_dask(X, axis): + """ + Implements sum operation for dask array when the backend is cupy dense matrix + """ + from ._kernels._mean_var_kernel import mean_sum, sq_sum + + def __mean_var(X_part): + var = sq_sum(X_part, axis=axis) + mean = mean_sum(X_part, axis=axis) + if axis == 0: + mean = mean.reshape(-1, 1) + var = var.reshape(-1, 1) + return cp.vstack([mean.ravel(), var.ravel()])[ + None if 1 - axis else slice(None, None), ... + ] + + n_blocks = X.blocks.size + mean_var = X.map_blocks( + __mean_var, + new_axis=(1,) if axis - 1 else None, + chunks=((2,), X.chunks[0]) if axis else ((1,) * n_blocks, (2,), (X.shape[1],)), + dtype=cp.float64, + meta=cp.array([]), + ) + + if axis == 0: + mean, var = mean_var.sum(axis=0) + else: + mean, var = mean_var + + mean = mean / X.shape[axis] + var = var / X.shape[axis] + var -= mean**2 + var *= X.shape[axis] / (X.shape[axis] - 1) + return mean, var + + def _mean_var_dense(X, axis): from ._kernels._mean_var_kernel import mean_sum, sq_sum @@ -107,6 +231,18 @@ def _get_mean_var(X, axis=0): major = X.shape[1] minor = X.shape[0] mean, var = _mean_var_minor(X, major, minor) + elif isinstance(X, DaskArray): + if isspmatrix_csr(X._meta): + if axis == 0: + major = X.shape[0] + minor = X.shape[1] + mean, var = _mean_var_minor_dask(X, major, minor) + if axis == 1: + major = X.shape[0] + minor = X.shape[1] + mean, var = _mean_var_major_dask(X, major, minor) + elif isinstance(X._meta, cp.ndarray): + mean, var = _mean_var_dense_dask(X, axis) else: mean, var = _mean_var_dense(X, axis) return mean, var @@ -127,11 +263,27 @@ def _check_nonnegative_integers(X): return True -def _check_gpu_X(X, require_cf=False): - if isinstance(X, cp.ndarray): +def _check_gpu_X(X, require_cf=False, allow_dask=False, allow_csc=True): + if isinstance(X, DaskArray): + if allow_dask: + return _check_gpu_X(X._meta, allow_csc=False) + else: + raise TypeError( + "The input is a DaskArray. " + "Rapids-singlecell doesn't support DaskArray in this function, " + "so your input must be a CuPy ndarray or a CuPy sparse matrix. " + ) + elif isinstance(X, cp.ndarray): return True - elif issparse(X): - if X.has_canonical_format or not require_cf: + elif isspmatrix_csc(X) or isspmatrix_csr(X): + if not allow_csc and isspmatrix_csc(X): + raise TypeError( + "When using Dask, only CuPy ndarrays and CSR matrices are supported as " + "meta arrays. Please convert your data to CSR format if it is in CSC." + ) + elif not require_cf: + return True + elif X.has_canonical_format: return True else: X.sort_indices() diff --git a/tests/dask/conftest.py b/tests/dask/conftest.py new file mode 100644 index 00000000..c2ded911 --- /dev/null +++ b/tests/dask/conftest.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import pytest +from dask.distributed import Client +from dask_cuda import LocalCUDACluster +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny + + +@pytest.fixture(scope="module") +def cluster(): + cluster = LocalCUDACluster( + CUDA_VISIBLE_DEVICES="0", + protocol="tcp", + scheduler_port=0, + worker_class=IncreasedCloseTimeoutNanny, + ) + yield cluster + cluster.close() + + +@pytest.fixture(scope="function") +def client(cluster): + client = Client(cluster) + yield client + client.close() diff --git a/tests/dask/test_dask_pca.py b/tests/dask/test_dask_pca.py new file mode 100644 index 00000000..08d6d5d9 --- /dev/null +++ b/tests/dask/test_dask_pca.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import cupy as cp +import numpy as np +import pytest +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k, pbmc3k_processed +from scipy import sparse + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +def test_pca_dask(client, data_kind): + adata_1 = pbmc3k_processed() + adata_2 = pbmc3k_processed() + + if data_kind == "sparse": + adata_1.X = sparse.csr_matrix(adata_1.X.astype(np.float64)) + adata_2.X = as_sparse_cupy_dask_array(adata_2.X.astype(np.float64)) + elif data_kind == "dense": + adata_1.X = cp.array(adata_1.X.astype(np.float64)) + adata_2.X = as_dense_cupy_dask_array(adata_2.X.astype(np.float64)) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + rsc.pp.pca(adata_1, svd_solver="full") + rsc.pp.pca(adata_2, svd_solver="full") + + cp.testing.assert_allclose( + np.abs(adata_1.obsm["X_pca"]), + cp.abs(adata_2.obsm["X_pca"].compute()), + rtol=1e-7, + atol=1e-6, + ) + + cp.testing.assert_allclose( + np.abs(adata_1.varm["PCs"]), + np.abs(adata_2.varm["PCs"]), + rtol=1e-7, + atol=1e-6, + ) + + cp.testing.assert_allclose( + np.abs(adata_1.uns["pca"]["variance_ratio"]), + np.abs(adata_2.uns["pca"]["variance_ratio"]), + rtol=1e-7, + atol=1e-6, + ) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +def test_pca_dask_full_pipeline(client, data_kind): + adata_1 = pbmc3k() + adata_2 = pbmc3k() + + if data_kind == "sparse": + adata_1.X = cusparse.csr_matrix(sparse.csr_matrix(adata_1.X.astype(np.float64))) + adata_2.X = as_sparse_cupy_dask_array(adata_2.X.astype(np.float64)) + elif data_kind == "dense": + adata_1.X = cp.array(adata_1.X.astype(np.float64).toarray()) + adata_2.X = as_dense_cupy_dask_array(adata_2.X.astype(np.float64).toarray()) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + rsc.pp.filter_genes(adata_1, min_count=500) + rsc.pp.filter_genes(adata_2, min_count=500) + + rsc.pp.normalize_total(adata_1, target_sum=1e4) + rsc.pp.normalize_total(adata_2, target_sum=1e4) + + rsc.pp.log1p(adata_1) + rsc.pp.log1p(adata_2) + + rsc.pp.pca(adata_1, svd_solver="full") + rsc.pp.pca(adata_2, svd_solver="full") + + cp.testing.assert_allclose( + np.abs(adata_1.obsm["X_pca"]), + cp.abs(adata_2.obsm["X_pca"].compute()), + rtol=1e-7, + atol=1e-6, + ) + + cp.testing.assert_allclose( + np.abs(adata_1.varm["PCs"]), np.abs(adata_2.varm["PCs"]), rtol=1e-7, atol=1e-6 + ) + + cp.testing.assert_allclose( + np.abs(adata_1.uns["pca"]["variance_ratio"]), + np.abs(adata_2.uns["pca"]["variance_ratio"]), + rtol=1e-7, + atol=1e-6, + ) diff --git a/tests/dask/test_hvg_dask.py b/tests/dask/test_hvg_dask.py new file mode 100644 index 00000000..2cc5913d --- /dev/null +++ b/tests/dask/test_hvg_dask.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import cupy as cp +import pandas as pd +import pytest +import scanpy as sc +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +def _get_anndata(): + adata = pbmc3k() + sc.pp.filter_cells(adata, min_genes=100) + sc.pp.filter_genes(adata, min_cells=3) + sc.pp.normalize_total(adata) + sc.pp.log1p(adata) + return adata + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +@pytest.mark.parametrize("flavor", ["seurat", "cell_ranger"]) +def test_highly_variable_genes(client, data_kind, flavor): + adata = _get_anndata() + adata.X = adata.X.astype("float64") + dask_data = adata.copy() + + if data_kind == "dense": + dask_data.X = as_dense_cupy_dask_array(dask_data.X).persist() + adata.X = cp.array(adata.X.toarray()) + elif data_kind == "sparse": + dask_data.X = as_sparse_cupy_dask_array(dask_data.X).persist() + adata.X = cusparse.csr_matrix(adata.X) + else: + raise ValueError(f"Unknown data_kind: {data_kind}") + + rsc.pp.highly_variable_genes(adata, flavor=flavor) + rsc.pp.highly_variable_genes(dask_data, flavor=flavor) + + cp.testing.assert_allclose(adata.var["means"], dask_data.var["means"]) + cp.testing.assert_allclose(adata.var["dispersions"], dask_data.var["dispersions"]) + cp.testing.assert_allclose( + adata.var["dispersions_norm"], dask_data.var["dispersions_norm"] + ) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +@pytest.mark.parametrize("flavor", ["seurat", "cell_ranger"]) +def test_highly_variable_genes_batched(client, data_kind, flavor): + adata = _get_anndata() + adata.obs["batch"] = ( + "source_" + pd.array([*range(1, 6), 5]).repeat(500).astype("string") + )[: adata.n_obs] + dask_data = adata.copy() + + if data_kind == "dense": + dask_data.X = as_dense_cupy_dask_array(dask_data.X).persist() + adata.X = cp.array(adata.X.toarray()) + elif data_kind == "sparse": + dask_data.X = as_sparse_cupy_dask_array(dask_data.X).persist() + adata.X = cusparse.csr_matrix(adata.X) + else: + raise ValueError(f"Unknown data_kind: {data_kind}") + + rsc.pp.highly_variable_genes(adata, batch_key="batch") + rsc.pp.highly_variable_genes(dask_data, batch_key="batch") + cp.testing.assert_allclose(adata.var["means"], dask_data.var["means"]) + cp.testing.assert_allclose(adata.var["dispersions"], dask_data.var["dispersions"]) + cp.testing.assert_allclose( + adata.var["dispersions_norm"], dask_data.var["dispersions_norm"] + ) diff --git a/tests/dask/test_normalize_dask.py b/tests/dask/test_normalize_dask.py new file mode 100644 index 00000000..5a76111a --- /dev/null +++ b/tests/dask/test_normalize_dask.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import cupy as cp +import pytest +import scanpy as sc +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +def test_normalize_total(client, data_kind): + adata = pbmc3k() + sc.pp.filter_cells(adata, min_genes=100) + sc.pp.filter_genes(adata, min_cells=3) + dask_data = adata.copy() + + if data_kind == "sparse": + dask_data.X = as_sparse_cupy_dask_array(dask_data.X) + adata.X = cusparse.csr_matrix(adata.X) + elif data_kind == "dense": + dask_data.X = as_dense_cupy_dask_array(dask_data.X) + adata.X = cp.array(adata.X.toarray()) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + rsc.pp.normalize_total(adata) + rsc.pp.normalize_total(dask_data) + + if data_kind == "sparse": + adata_X = adata.X.toarray() + dask_X = dask_data.X.compute().toarray() + else: + adata_X = adata.X + dask_X = dask_data.X.compute() + + cp.testing.assert_allclose(adata_X, dask_X) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +def test_log1p(client, data_kind): + adata = pbmc3k() + sc.pp.filter_cells(adata, min_genes=100) + sc.pp.filter_genes(adata, min_cells=3) + sc.pp.normalize_total(adata) + dask_data = adata.copy() + + if data_kind == "sparse": + dask_data.X = as_sparse_cupy_dask_array(dask_data.X) + adata.X = cusparse.csr_matrix(adata.X) + elif data_kind == "dense": + dask_data.X = as_dense_cupy_dask_array(dask_data.X) + adata.X = cp.array(adata.X.toarray()) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + rsc.pp.log1p(adata) + rsc.pp.log1p(dask_data) + + if data_kind == "sparse": + adata_X = adata.X.toarray() + dask_X = dask_data.X.compute().toarray() + else: + adata_X = adata.X + dask_X = dask_data.X.compute() + + cp.testing.assert_allclose(adata_X, dask_X) diff --git a/tests/dask/test_qc_dask.py b/tests/dask/test_qc_dask.py new file mode 100644 index 00000000..3b350dbe --- /dev/null +++ b/tests/dask/test_qc_dask.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import cupy as cp +import numpy as np +import pytest +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +def test_qc_metrics(client, data_kind): + adata = pbmc3k() + adata.var["mt"] = adata.var_names.str.startswith("MT-") + dask_data = adata.copy() + if data_kind == "sparse": + dask_data.X = as_sparse_cupy_dask_array(dask_data.X) + adata.X = cusparse.csr_matrix(adata.X) + elif data_kind == "dense": + dask_data.X = as_dense_cupy_dask_array(dask_data.X) + adata.X = cp.array(adata.X.toarray()) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + rsc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], log1p=True) + rsc.pp.calculate_qc_metrics(dask_data, qc_vars=["mt"], log1p=True) + np.testing.assert_allclose( + adata.obs["n_genes_by_counts"], dask_data.obs["n_genes_by_counts"] + ) + np.testing.assert_allclose(adata.obs["total_counts"], dask_data.obs["total_counts"]) + np.testing.assert_allclose( + adata.obs["log1p_n_genes_by_counts"], dask_data.obs["log1p_n_genes_by_counts"] + ) + np.testing.assert_allclose( + adata.obs["log1p_total_counts"], dask_data.obs["log1p_total_counts"] + ) + np.testing.assert_allclose( + adata.obs["pct_counts_mt"], dask_data.obs["pct_counts_mt"] + ) + np.testing.assert_allclose( + adata.obs["total_counts_mt"], dask_data.obs["total_counts_mt"] + ) + np.testing.assert_allclose( + adata.obs["log1p_total_counts_mt"], dask_data.obs["log1p_total_counts_mt"] + ) + np.testing.assert_allclose( + adata.var["n_cells_by_counts"], dask_data.var["n_cells_by_counts"] + ) + np.testing.assert_allclose(adata.var["total_counts"], dask_data.var["total_counts"]) + np.testing.assert_allclose(adata.var["mean_counts"], dask_data.var["mean_counts"]) + np.testing.assert_allclose( + adata.var["pct_dropout_by_counts"], dask_data.var["pct_dropout_by_counts"] + ) + np.testing.assert_allclose( + adata.var["log1p_total_counts"], dask_data.var["log1p_total_counts"] + ) + np.testing.assert_allclose( + adata.var["log1p_mean_counts"], dask_data.var["log1p_mean_counts"] + ) diff --git a/tests/dask/test_scale_dask.py b/tests/dask/test_scale_dask.py new file mode 100644 index 00000000..d8c3eb1f --- /dev/null +++ b/tests/dask/test_scale_dask.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import cupy as cp +import numpy as np +import pytest +import scanpy as sc +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +def _get_anndata(): + adata = pbmc3k() + sc.pp.filter_cells(adata, min_genes=100) + sc.pp.filter_genes(adata, min_cells=3) + sc.pp.normalize_total(adata) + sc.pp.log1p(adata) + sc.pp.highly_variable_genes(adata, n_top_genes=1000, subset=True) + return adata.copy() + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +@pytest.mark.parametrize("zero_center", [True, False]) +def test_scale(client, data_kind, zero_center): + adata = _get_anndata() + mask = np.random.randint(0, 2, adata.shape[0], dtype=bool) + dask_data = adata.copy() + + if data_kind == "sparse": + dask_data.X = as_sparse_cupy_dask_array(dask_data.X.astype(np.float64)) + adata.X = cusparse.csr_matrix(adata.X.astype(np.float64)) + elif data_kind == "dense": + dask_data.X = as_dense_cupy_dask_array(dask_data.X.astype(np.float64)) + adata.X = cp.array(adata.X.toarray().astype(np.float64)) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + rsc.pp.scale(adata, zero_center=zero_center, mask_obs=mask, max_value=10) + rsc.pp.scale(dask_data, zero_center=zero_center, mask_obs=mask, max_value=10) + if data_kind == "sparse" and not zero_center: + adata_X = adata.X.toarray() + dask_X = dask_data.X.compute().toarray() + else: + adata_X = adata.X + dask_X = dask_data.X.compute() + cp.testing.assert_allclose(adata_X, dask_X)