diff --git a/examples/sklearnex/basic_statistics_spmd.py b/examples/sklearnex/basic_statistics_spmd.py index 71b2651e9b..c64a583a82 100644 --- a/examples/sklearnex/basic_statistics_spmd.py +++ b/examples/sklearnex/basic_statistics_spmd.py @@ -18,6 +18,7 @@ from mpi4py import MPI from dpctl import SyclQueue +import dpctl.tensor as dpt from sklearnex.spmd.basic_statistics import BasicStatistics as BasicStatisticsSpmd @@ -51,11 +52,14 @@ def generate_data(par, size, seed=777): data, weights = generate_data(params_spmd, size) weighted_data = np.diag(weights) @ data +dpt_data = dpt.asarray(data, usm_type="device", sycl_queue=q) +dpt_weights = dpt.asarray(weights, usm_type="device", sycl_queue=q) + gtr_mean = np.mean(weighted_data, axis=0) gtr_std = np.std(weighted_data, axis=0) bss = BasicStatisticsSpmd(["mean", "standard_deviation"]) -res = bss.compute(data, weights, queue=q) +res = bss.compute(dpt_data, dpt_weights) print(f"Computed mean on rank {rank}:\n", res["mean"]) print(f"Computed std on rank {rank}:\n", res["standard_deviation"]) diff --git a/examples/sklearnex/knn_bf_classification_spmd.py b/examples/sklearnex/knn_bf_classification_spmd.py index b2a1e40e5d..501bec3242 100644 --- a/examples/sklearnex/knn_bf_classification_spmd.py +++ b/examples/sklearnex/knn_bf_classification_spmd.py @@ -19,6 +19,7 @@ from warnings import warn from mpi4py import MPI import dpctl +import dpctl.tensor as dpt from sklearnex.spmd.neighbors import KNeighborsClassifier @@ -48,18 +49,23 @@ def generate_X_y(par, seed): X_train, y_train = generate_X_y(params_train, rank) X_test, y_test = generate_X_y(params_test, rank + 99) +dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q) +dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q) +dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q) +dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q) + model_spmd = KNeighborsClassifier(algorithm='brute', n_neighbors=20, weights='uniform', p=2, metric='minkowski') -model_spmd.fit(X_train, y_train, queue=q) +model_spmd.fit(dpt_X_train, dpt_y_train) -y_predict = model_spmd.predict(X_test, queue=q) +y_predict = model_spmd.predict(dpt_X_test) print("Brute Force Distributed kNN classification results:") print("Ground truth (first 5 observations on rank {}):\n{}".format(rank, y_test[:5])) print("Classification results (first 5 observations on rank {}):\n{}" - .format(rank, y_predict[:5])) + .format(rank, dpt.to_numpy(y_predict)[:5])) print("Accuracy for entire rank {} (256 classes): {}\n" - .format(rank, accuracy_score(y_test, y_predict))) + .format(rank, accuracy_score(y_test, dpt.to_numpy(y_predict)))) diff --git a/examples/sklearnex/knn_bf_regression_spmd.py b/examples/sklearnex/knn_bf_regression_spmd.py index 223cd910dd..ac0e8ad555 100644 --- a/examples/sklearnex/knn_bf_regression_spmd.py +++ b/examples/sklearnex/knn_bf_regression_spmd.py @@ -19,6 +19,7 @@ from warnings import warn from mpi4py import MPI import dpctl +import dpctl.tensor as dpt from numpy.testing import assert_allclose from sklearnex.spmd.neighbors import KNeighborsRegressor @@ -52,6 +53,11 @@ def generate_X_y(par, coef_seed, data_seed): X_train, y_train, coef_train = generate_X_y(params_train, 10, rank) X_test, y_test, coef_test = generate_X_y(params_test, 10, rank + 99) +dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q) +dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q) +dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q) +# dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q) + assert_allclose(coef_train, coef_test) model_spmd = KNeighborsRegressor(algorithm='brute', @@ -59,13 +65,13 @@ def generate_X_y(par, coef_seed, data_seed): weights='uniform', p=2, metric='minkowski') -model_spmd.fit(X_train, y_train, queue=q) +model_spmd.fit(dpt_X_train, dpt_y_train) -y_predict = model_spmd.predict(X_test, queue=q) +y_predict = model_spmd.predict(dpt_X_test) print("Brute Force Distributed kNN regression results:") print("Ground truth (first 5 observations on rank {}):\n{}".format(rank, y_test[:5])) print("Regression results (first 5 observations on rank {}):\n{}" - .format(rank, y_predict[:5])) + .format(rank, dpt.to_numpy(y_predict)[:5])) print("RMSE for entire rank {}: {}\n" - .format(rank, mean_squared_error(y_test, y_predict, squared=False))) + .format(rank, mean_squared_error(y_test, dpt.to_numpy(y_predict), squared=False))) diff --git a/examples/sklearnex/linear_regression_spmd.py b/examples/sklearnex/linear_regression_spmd.py index 17982c46dc..a37f7f2686 100755 --- a/examples/sklearnex/linear_regression_spmd.py +++ b/examples/sklearnex/linear_regression_spmd.py @@ -19,6 +19,7 @@ from mpi4py import MPI from dpctl import SyclQueue +import dpctl.tensor as dpt from sklearnex.spmd.linear_model import LinearRegression @@ -56,13 +57,17 @@ def get_test_data(rank): queue = SyclQueue("gpu") -model = LinearRegression().fit(X, y, queue) +dpt_X = dpt.asarray(X, usm_type="device", sycl_queue=queue) +dpt_y = dpt.asarray(y, usm_type="device", sycl_queue=queue) + +model = LinearRegression().fit(dpt_X, dpt_y) print(f"Coefficients on rank {rank}:\n", model.coef_) print(f"Intercept on rank {rank}:\n", model.intercept_) X_test, _ = get_test_data(rank) +dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=queue) -result = model.predict(X_test, queue) +result = model.predict(dpt_X_test) -print(f"Result on rank {rank}:\n", result) +print(f"Result on rank {rank}:\n", dpt.to_numpy(result)) diff --git a/examples/sklearnex/pca_spmd.py b/examples/sklearnex/pca_spmd.py index 3972e9547e..64e61a5a19 100644 --- a/examples/sklearnex/pca_spmd.py +++ b/examples/sklearnex/pca_spmd.py @@ -17,6 +17,7 @@ import numpy as np from mpi4py import MPI import dpctl +import dpctl.tensor as dpt from sklearnex.spmd.decomposition import PCA @@ -33,8 +34,9 @@ def get_data(data_seed): size = comm.Get_size() X = get_data(rank) +dpt_X = dpt.asarray(X, usm_type="device", sycl_queue=q) -pca = PCA(n_components=2).fit(X, q) +pca = PCA(n_components=2).fit(dpt_X) print(f"Singular values on rank {rank}:\n", pca.singular_values_) print(f"Explained variance Ratio on rank {rank}:\n", pca.explained_variance_ratio_) diff --git a/examples/sklearnex/random_forest_classifier_spmd.py b/examples/sklearnex/random_forest_classifier_spmd.py index 799bb63928..83e6539ec3 100644 --- a/examples/sklearnex/random_forest_classifier_spmd.py +++ b/examples/sklearnex/random_forest_classifier_spmd.py @@ -53,7 +53,6 @@ def generate_X_y(par, seed): dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q) dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q) dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q) -dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q) rf = RandomForestClassifier(max_depth=2, random_state=0).fit(dpt_X_train, dpt_y_train) diff --git a/examples/sklearnex/random_forest_regressor_spmd.py b/examples/sklearnex/random_forest_regressor_spmd.py index 367a8557d9..1fe1954a02 100644 --- a/examples/sklearnex/random_forest_regressor_spmd.py +++ b/examples/sklearnex/random_forest_regressor_spmd.py @@ -59,7 +59,6 @@ def generate_X_y(par, coef_seed, data_seed): dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q) dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q) dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q) -# dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q) rf = RandomForestRegressor(max_depth=2, random_state=0).fit(dpt_X_train, dpt_y_train) diff --git a/onedal/_device_offload.py b/onedal/_device_offload.py new file mode 100644 index 0000000000..09cd48f681 --- /dev/null +++ b/onedal/_device_offload.py @@ -0,0 +1,77 @@ +#=============================================================================== +# Copyright 2023 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#=============================================================================== + +from functools import wraps + +try: + from sklearnex._device_offload import (_get_global_queue, + _transfer_to_host, + _copy_to_usm) + _sklearnex_available = True +except ImportError: + import logging + logging.warning('Device support requires ' + 'Intel(R) Extension for Scikit-learn*.') + _sklearnex_available = False + + +def _get_host_inputs(*args, **kwargs): + q = _get_global_queue() + q, hostargs = _transfer_to_host(q, *args) + q, hostvalues = _transfer_to_host(q, *kwargs.values()) + hostkwargs = dict(zip(kwargs.keys(), hostvalues)) + return q, hostargs, hostkwargs + + +def _extract_usm_iface(*args, **kwargs): + allargs = (*args, *kwargs.values()) + if len(allargs) == 0: + return None + return getattr(allargs[0], + '__sycl_usm_array_interface__', + None) + + +def _run_on_device(func, obj=None, *args, **kwargs): + if obj is not None: + return func(obj, *args, **kwargs) + return func(*args, **kwargs) + + +def support_usm_ndarray(freefunc=False): + def decorator(func): + def wrapper_impl(obj, *args, **kwargs): + if _sklearnex_available: + usm_iface = _extract_usm_iface(*args, **kwargs) + data_queue, hostargs, hostkwargs = _get_host_inputs(*args, **kwargs) + hostkwargs['queue'] = data_queue + result = _run_on_device(func, obj, *hostargs, **hostkwargs) + if usm_iface is not None and hasattr(result, '__array_interface__'): + return _copy_to_usm(data_queue, result) + return result + return _run_on_device(func, obj, *args, **kwargs) + + if freefunc: + @wraps(func) + def wrapper_free(*args, **kwargs): + return wrapper_impl(None, *args, **kwargs) + return wrapper_free + + @wraps(func) + def wrapper_with_self(self, *args, **kwargs): + return wrapper_impl(self, *args, **kwargs) + return wrapper_with_self + return decorator diff --git a/onedal/spmd/basic_statistics/basic_statistics.py b/onedal/spmd/basic_statistics/basic_statistics.py index 6fa3b6422b..af4a5e2429 100644 --- a/onedal/spmd/basic_statistics/basic_statistics.py +++ b/onedal/spmd/basic_statistics/basic_statistics.py @@ -16,6 +16,7 @@ from abc import ABC from ...common._spmd_policy import _get_spmd_policy +from ..._device_offload import support_usm_ndarray from onedal.basic_statistics import BasicStatistics as BasicStatistics_Batch @@ -25,4 +26,7 @@ def _get_policy(self, queue, *data): class BasicStatistics(BaseBasicStatisticsSPMD, BasicStatistics_Batch): - pass + + @support_usm_ndarray() + def compute(self, data, weights=None, queue=None): + return super().compute(data, weights, queue) diff --git a/onedal/spmd/decomposition/pca.py b/onedal/spmd/decomposition/pca.py index 5790089313..a511170ec2 100644 --- a/onedal/spmd/decomposition/pca.py +++ b/onedal/spmd/decomposition/pca.py @@ -16,6 +16,7 @@ from ...common._spmd_policy import _get_spmd_policy +from ..._device_offload import support_usm_ndarray from onedal.decomposition.pca import PCA as PCABatch @@ -25,4 +26,11 @@ def _get_policy(self, queue, *data): class PCA(BasePCASPMD, PCABatch): - pass + + @support_usm_ndarray() + def fit(self, X, queue): + return super().fit(X, queue) + + @support_usm_ndarray() + def predict(self, X, queue): + return super().predict(X, queue) diff --git a/onedal/spmd/linear_model/linear_model.py b/onedal/spmd/linear_model/linear_model.py index 72a77d28c3..d07eb7df28 100644 --- a/onedal/spmd/linear_model/linear_model.py +++ b/onedal/spmd/linear_model/linear_model.py @@ -16,6 +16,7 @@ from abc import ABC from ...common._spmd_policy import _get_spmd_policy +from ..._device_offload import support_usm_ndarray from onedal.linear_model import LinearRegression as LinearRegression_Batch @@ -25,4 +26,11 @@ def _get_policy(self, queue, *data): class LinearRegression(BaseLinearRegressionSPMD, LinearRegression_Batch): - pass + + @support_usm_ndarray() + def fit(self, X, y, queue=None): + return super().fit(X, y, queue) + + @support_usm_ndarray() + def predict(self, X, queue=None): + return super().predict(X, queue) diff --git a/onedal/spmd/neighbors/neighbors.py b/onedal/spmd/neighbors/neighbors.py index 067bac6d3c..02981599b9 100644 --- a/onedal/spmd/neighbors/neighbors.py +++ b/onedal/spmd/neighbors/neighbors.py @@ -16,6 +16,7 @@ from abc import ABC from ...common._spmd_policy import _get_spmd_policy +from ..._device_offload import support_usm_ndarray from onedal.neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch from onedal.neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch @@ -26,11 +27,27 @@ def _get_policy(self, queue, *data): class KNeighborsClassifier(NeighborsCommonBaseSPMD, KNeighborsClassifier_Batch): + + @support_usm_ndarray() + def fit(self, X, y, queue=None): + return super().fit(X, y, queue) + + @support_usm_ndarray() + def predict(self, X, queue=None): + return super().predict(X, queue) + + @support_usm_ndarray() def predict_proba(self, X, queue=None): raise NotImplementedError("predict_proba not supported in distributed mode.") + @support_usm_ndarray() + def kneighbors(self, X=None, n_neighbors=None, + return_distance=True, queue=None): + return super().kneighbors(X, n_neighbors, return_distance, queue) + class KNeighborsRegressor(NeighborsCommonBaseSPMD, KNeighborsRegressor_Batch): + @support_usm_ndarray() def fit(self, X, y, queue=None): if queue is not None and queue.sycl_device.is_gpu: return super()._fit(X, y, queue=queue) @@ -38,6 +55,12 @@ def fit(self, X, y, queue=None): raise ValueError('SPMD version of kNN is not implemented for ' 'CPU. Consider running on it on GPU.') + @support_usm_ndarray() + def kneighbors(self, X=None, n_neighbors=None, + return_distance=True, queue=None): + return super().kneighbors(X, n_neighbors, return_distance, queue) + + @support_usm_ndarray() def predict(self, X, queue=None): return self._predict_gpu(X, queue=queue) @@ -49,4 +72,12 @@ def _get_onedal_params(self, X, y=None): class NearestNeighbors(NeighborsCommonBaseSPMD): - pass + + @support_usm_ndarray() + def fit(self, X, y, queue=None): + return super().fit(X, y, queue) + + @support_usm_ndarray() + def kneighbors(self, X=None, n_neighbors=None, + return_distance=True, queue=None): + return super().kneighbors(X, n_neighbors, return_distance, queue) diff --git a/setup_sklearnex.py b/setup_sklearnex.py index d8ca24d152..f8289b4b72 100755 --- a/setup_sklearnex.py +++ b/setup_sklearnex.py @@ -89,8 +89,13 @@ if build_distribute: packages_with_tests += [ 'sklearnex.spmd', - 'sklearnex.spmd.ensemble', - 'sklearnex.spmd.linear_model'] + 'sklearnex.spmd.decomposition', + 'sklearnex.spmd.ensemble'] + if ONEDAL_VERSION >= 20230100: + packages_with_tests += [ + 'sklearnex.spmd.basic_statistics', + 'sklearnex.spmd.linear_model', + 'sklearnex.spmd.neighbors'] # sklearnex setup setup(name="scikit-learn-intelex",