Skip to content

Commit

Permalink
support usm_ndarray in onedal.spmd (#1216)
Browse files Browse the repository at this point in the history
support usm_ndarray in onedal
  • Loading branch information
samir-nasibli authored Mar 24, 2023
1 parent 93577ee commit a72a6f8
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 21 deletions.
6 changes: 5 additions & 1 deletion examples/sklearnex/basic_statistics_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from mpi4py import MPI

from dpctl import SyclQueue
import dpctl.tensor as dpt
from sklearnex.spmd.basic_statistics import BasicStatistics as BasicStatisticsSpmd


Expand Down Expand Up @@ -51,11 +52,14 @@ def generate_data(par, size, seed=777):
data, weights = generate_data(params_spmd, size)
weighted_data = np.diag(weights) @ data

dpt_data = dpt.asarray(data, usm_type="device", sycl_queue=q)
dpt_weights = dpt.asarray(weights, usm_type="device", sycl_queue=q)

gtr_mean = np.mean(weighted_data, axis=0)
gtr_std = np.std(weighted_data, axis=0)

bss = BasicStatisticsSpmd(["mean", "standard_deviation"])
res = bss.compute(data, weights, queue=q)
res = bss.compute(dpt_data, dpt_weights)

print(f"Computed mean on rank {rank}:\n", res["mean"])
print(f"Computed std on rank {rank}:\n", res["standard_deviation"])
14 changes: 10 additions & 4 deletions examples/sklearnex/knn_bf_classification_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from warnings import warn
from mpi4py import MPI
import dpctl
import dpctl.tensor as dpt
from sklearnex.spmd.neighbors import KNeighborsClassifier


Expand Down Expand Up @@ -48,18 +49,23 @@ def generate_X_y(par, seed):
X_train, y_train = generate_X_y(params_train, rank)
X_test, y_test = generate_X_y(params_test, rank + 99)

dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q)
dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q)
dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q)
dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q)

model_spmd = KNeighborsClassifier(algorithm='brute',
n_neighbors=20,
weights='uniform',
p=2,
metric='minkowski')
model_spmd.fit(X_train, y_train, queue=q)
model_spmd.fit(dpt_X_train, dpt_y_train)

y_predict = model_spmd.predict(X_test, queue=q)
y_predict = model_spmd.predict(dpt_X_test)

print("Brute Force Distributed kNN classification results:")
print("Ground truth (first 5 observations on rank {}):\n{}".format(rank, y_test[:5]))
print("Classification results (first 5 observations on rank {}):\n{}"
.format(rank, y_predict[:5]))
.format(rank, dpt.to_numpy(y_predict)[:5]))
print("Accuracy for entire rank {} (256 classes): {}\n"
.format(rank, accuracy_score(y_test, y_predict)))
.format(rank, accuracy_score(y_test, dpt.to_numpy(y_predict))))
14 changes: 10 additions & 4 deletions examples/sklearnex/knn_bf_regression_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from warnings import warn
from mpi4py import MPI
import dpctl
import dpctl.tensor as dpt
from numpy.testing import assert_allclose
from sklearnex.spmd.neighbors import KNeighborsRegressor

Expand Down Expand Up @@ -52,20 +53,25 @@ def generate_X_y(par, coef_seed, data_seed):
X_train, y_train, coef_train = generate_X_y(params_train, 10, rank)
X_test, y_test, coef_test = generate_X_y(params_test, 10, rank + 99)

dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q)
dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q)
dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q)
# dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q)

assert_allclose(coef_train, coef_test)

model_spmd = KNeighborsRegressor(algorithm='brute',
n_neighbors=5,
weights='uniform',
p=2,
metric='minkowski')
model_spmd.fit(X_train, y_train, queue=q)
model_spmd.fit(dpt_X_train, dpt_y_train)

y_predict = model_spmd.predict(X_test, queue=q)
y_predict = model_spmd.predict(dpt_X_test)

print("Brute Force Distributed kNN regression results:")
print("Ground truth (first 5 observations on rank {}):\n{}".format(rank, y_test[:5]))
print("Regression results (first 5 observations on rank {}):\n{}"
.format(rank, y_predict[:5]))
.format(rank, dpt.to_numpy(y_predict)[:5]))
print("RMSE for entire rank {}: {}\n"
.format(rank, mean_squared_error(y_test, y_predict, squared=False)))
.format(rank, mean_squared_error(y_test, dpt.to_numpy(y_predict), squared=False)))
11 changes: 8 additions & 3 deletions examples/sklearnex/linear_regression_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from mpi4py import MPI
from dpctl import SyclQueue
import dpctl.tensor as dpt
from sklearnex.spmd.linear_model import LinearRegression


Expand Down Expand Up @@ -56,13 +57,17 @@ def get_test_data(rank):

queue = SyclQueue("gpu")

model = LinearRegression().fit(X, y, queue)
dpt_X = dpt.asarray(X, usm_type="device", sycl_queue=queue)
dpt_y = dpt.asarray(y, usm_type="device", sycl_queue=queue)

model = LinearRegression().fit(dpt_X, dpt_y)

print(f"Coefficients on rank {rank}:\n", model.coef_)
print(f"Intercept on rank {rank}:\n", model.intercept_)

X_test, _ = get_test_data(rank)
dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=queue)

result = model.predict(X_test, queue)
result = model.predict(dpt_X_test)

print(f"Result on rank {rank}:\n", result)
print(f"Result on rank {rank}:\n", dpt.to_numpy(result))
4 changes: 3 additions & 1 deletion examples/sklearnex/pca_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import numpy as np
from mpi4py import MPI
import dpctl
import dpctl.tensor as dpt
from sklearnex.spmd.decomposition import PCA


Expand All @@ -33,8 +34,9 @@ def get_data(data_seed):
size = comm.Get_size()

X = get_data(rank)
dpt_X = dpt.asarray(X, usm_type="device", sycl_queue=q)

pca = PCA(n_components=2).fit(X, q)
pca = PCA(n_components=2).fit(dpt_X)

print(f"Singular values on rank {rank}:\n", pca.singular_values_)
print(f"Explained variance Ratio on rank {rank}:\n", pca.explained_variance_ratio_)
1 change: 0 additions & 1 deletion examples/sklearnex/random_forest_classifier_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def generate_X_y(par, seed):
dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q)
dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q)
dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q)
dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q)

rf = RandomForestClassifier(max_depth=2, random_state=0).fit(dpt_X_train, dpt_y_train)

Expand Down
1 change: 0 additions & 1 deletion examples/sklearnex/random_forest_regressor_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def generate_X_y(par, coef_seed, data_seed):
dpt_X_train = dpt.asarray(X_train, usm_type="device", sycl_queue=q)
dpt_y_train = dpt.asarray(y_train, usm_type="device", sycl_queue=q)
dpt_X_test = dpt.asarray(X_test, usm_type="device", sycl_queue=q)
# dpt_y_test = dpt.asarray(y_test, usm_type="device", sycl_queue=q)

rf = RandomForestRegressor(max_depth=2, random_state=0).fit(dpt_X_train, dpt_y_train)

Expand Down
77 changes: 77 additions & 0 deletions onedal/_device_offload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#===============================================================================
# Copyright 2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#===============================================================================

from functools import wraps

try:
from sklearnex._device_offload import (_get_global_queue,
_transfer_to_host,
_copy_to_usm)
_sklearnex_available = True
except ImportError:
import logging
logging.warning('Device support requires '
'Intel(R) Extension for Scikit-learn*.')
_sklearnex_available = False


def _get_host_inputs(*args, **kwargs):
q = _get_global_queue()
q, hostargs = _transfer_to_host(q, *args)
q, hostvalues = _transfer_to_host(q, *kwargs.values())
hostkwargs = dict(zip(kwargs.keys(), hostvalues))
return q, hostargs, hostkwargs


def _extract_usm_iface(*args, **kwargs):
allargs = (*args, *kwargs.values())
if len(allargs) == 0:
return None
return getattr(allargs[0],
'__sycl_usm_array_interface__',
None)


def _run_on_device(func, obj=None, *args, **kwargs):
if obj is not None:
return func(obj, *args, **kwargs)
return func(*args, **kwargs)


def support_usm_ndarray(freefunc=False):
def decorator(func):
def wrapper_impl(obj, *args, **kwargs):
if _sklearnex_available:
usm_iface = _extract_usm_iface(*args, **kwargs)
data_queue, hostargs, hostkwargs = _get_host_inputs(*args, **kwargs)
hostkwargs['queue'] = data_queue
result = _run_on_device(func, obj, *hostargs, **hostkwargs)
if usm_iface is not None and hasattr(result, '__array_interface__'):
return _copy_to_usm(data_queue, result)
return result
return _run_on_device(func, obj, *args, **kwargs)

if freefunc:
@wraps(func)
def wrapper_free(*args, **kwargs):
return wrapper_impl(None, *args, **kwargs)
return wrapper_free

@wraps(func)
def wrapper_with_self(self, *args, **kwargs):
return wrapper_impl(self, *args, **kwargs)
return wrapper_with_self
return decorator
6 changes: 5 additions & 1 deletion onedal/spmd/basic_statistics/basic_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from abc import ABC
from ...common._spmd_policy import _get_spmd_policy
from ..._device_offload import support_usm_ndarray
from onedal.basic_statistics import BasicStatistics as BasicStatistics_Batch


Expand All @@ -25,4 +26,7 @@ def _get_policy(self, queue, *data):


class BasicStatistics(BaseBasicStatisticsSPMD, BasicStatistics_Batch):
pass

@support_usm_ndarray()
def compute(self, data, weights=None, queue=None):
return super().compute(data, weights, queue)
10 changes: 9 additions & 1 deletion onedal/spmd/decomposition/pca.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


from ...common._spmd_policy import _get_spmd_policy
from ..._device_offload import support_usm_ndarray
from onedal.decomposition.pca import PCA as PCABatch


Expand All @@ -25,4 +26,11 @@ def _get_policy(self, queue, *data):


class PCA(BasePCASPMD, PCABatch):
pass

@support_usm_ndarray()
def fit(self, X, queue):
return super().fit(X, queue)

@support_usm_ndarray()
def predict(self, X, queue):
return super().predict(X, queue)
10 changes: 9 additions & 1 deletion onedal/spmd/linear_model/linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from abc import ABC
from ...common._spmd_policy import _get_spmd_policy
from ..._device_offload import support_usm_ndarray
from onedal.linear_model import LinearRegression as LinearRegression_Batch


Expand All @@ -25,4 +26,11 @@ def _get_policy(self, queue, *data):


class LinearRegression(BaseLinearRegressionSPMD, LinearRegression_Batch):
pass

@support_usm_ndarray()
def fit(self, X, y, queue=None):
return super().fit(X, y, queue)

@support_usm_ndarray()
def predict(self, X, queue=None):
return super().predict(X, queue)
33 changes: 32 additions & 1 deletion onedal/spmd/neighbors/neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from abc import ABC
from ...common._spmd_policy import _get_spmd_policy
from ..._device_offload import support_usm_ndarray
from onedal.neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch
from onedal.neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch

Expand All @@ -26,18 +27,40 @@ def _get_policy(self, queue, *data):


class KNeighborsClassifier(NeighborsCommonBaseSPMD, KNeighborsClassifier_Batch):

@support_usm_ndarray()
def fit(self, X, y, queue=None):
return super().fit(X, y, queue)

@support_usm_ndarray()
def predict(self, X, queue=None):
return super().predict(X, queue)

@support_usm_ndarray()
def predict_proba(self, X, queue=None):
raise NotImplementedError("predict_proba not supported in distributed mode.")

@support_usm_ndarray()
def kneighbors(self, X=None, n_neighbors=None,
return_distance=True, queue=None):
return super().kneighbors(X, n_neighbors, return_distance, queue)


class KNeighborsRegressor(NeighborsCommonBaseSPMD, KNeighborsRegressor_Batch):
@support_usm_ndarray()
def fit(self, X, y, queue=None):
if queue is not None and queue.sycl_device.is_gpu:
return super()._fit(X, y, queue=queue)
else:
raise ValueError('SPMD version of kNN is not implemented for '
'CPU. Consider running on it on GPU.')

@support_usm_ndarray()
def kneighbors(self, X=None, n_neighbors=None,
return_distance=True, queue=None):
return super().kneighbors(X, n_neighbors, return_distance, queue)

@support_usm_ndarray()
def predict(self, X, queue=None):
return self._predict_gpu(X, queue=queue)

Expand All @@ -49,4 +72,12 @@ def _get_onedal_params(self, X, y=None):


class NearestNeighbors(NeighborsCommonBaseSPMD):
pass

@support_usm_ndarray()
def fit(self, X, y, queue=None):
return super().fit(X, y, queue)

@support_usm_ndarray()
def kneighbors(self, X=None, n_neighbors=None,
return_distance=True, queue=None):
return super().kneighbors(X, n_neighbors, return_distance, queue)
9 changes: 7 additions & 2 deletions setup_sklearnex.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,13 @@
if build_distribute:
packages_with_tests += [
'sklearnex.spmd',
'sklearnex.spmd.ensemble',
'sklearnex.spmd.linear_model']
'sklearnex.spmd.decomposition',
'sklearnex.spmd.ensemble']
if ONEDAL_VERSION >= 20230100:
packages_with_tests += [
'sklearnex.spmd.basic_statistics',
'sklearnex.spmd.linear_model',
'sklearnex.spmd.neighbors']

# sklearnex setup
setup(name="scikit-learn-intelex",
Expand Down

0 comments on commit a72a6f8

Please sign in to comment.