Skip to content

Commit

Permalink
Enhancing the tests
Browse files Browse the repository at this point in the history
Signed-off-by: Julio Faracco <[email protected]>
  • Loading branch information
jcfaracco committed Mar 13, 2024
1 parent ac58888 commit 41e6d4b
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 5 deletions.
4 changes: 2 additions & 2 deletions dasf/pipeline/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ def __init__(
self.dtype = TaskExecutorType.single_cpu

# Share dtype attribute to client
if not hasattr(self.client, "dtype"):
setattr(self.client, "dtype", self.dtype)
setattr(self.client, "dtype", self.dtype)

self._tasks_map = dict()

Expand All @@ -245,6 +244,7 @@ def pre_run(self, pipeline):

# TODO: we need to consider other branches for complex pipelines
dag_paths = nx.all_simple_paths(pipeline._dag, nodes[0], nodes[-1])

all_paths = []
for path in dag_paths:
all_paths.append(path)
Expand Down
25 changes: 25 additions & 0 deletions tests/ml/cluster/test_agglomerative.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
except ImportError:
pass

from mock import patch, Mock
from sklearn.datasets import make_blobs

from dasf.ml.cluster import AgglomerativeClustering
Expand Down Expand Up @@ -66,3 +67,27 @@ def test_agglomerative_gpu(self):
y1, y2 = self.__match_randomly_labels_created(y.get(), self.y)

self.assertTrue(np.array_equal(y1, y2, equal_nan=True))

@patch('dasf.ml.cluster.agglomerative.is_gpu_supported', Mock(return_value=False))
def test_agglomerative_cpu_labels(self):
sc = AgglomerativeClustering(n_clusters=self.centers)

y = sc._fit_cpu(self.X)

y1, y2 = self.__match_randomly_labels_created(y.labels_, self.y)

self.assertTrue(np.array_equal(y1, y2, equal_nan=True))

@unittest.skipIf(not is_gpu_supported(),
"not supported CUDA in this platform")
def test_agglomerative_gpu_labels(self):
# For GPUs we need to specify which data we are handling with `output_type`.
sc = AgglomerativeClustering(n_clusters=self.centers, output_type='cupy')

cp_X = cp.asarray(self.X)

y = sc._fit_gpu(cp_X)

y1, y2 = self.__match_randomly_labels_created(y.labels_.get(), self.y)

self.assertTrue(np.array_equal(y1, y2, equal_nan=True))
14 changes: 12 additions & 2 deletions tests/ml/cluster/test_spectral.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ def test_spectral_cpu(self):

def test_spectral_mcpu(self):
sc = SpectralClustering(n_clusters=self.centers,
random_state=self.random_state,
n_components=250)
random_state=self.random_state)

da_X = da.from_array(self.X)

Expand All @@ -65,3 +64,14 @@ def test_spectral_mcpu(self):

# Check if the accurary is higher than 99%.
self.assertTrue(len(np.setdiff1d(y1, y2)) <= int(self.size*0.01))

def test_spectral_cpu_labels(self):
sc = SpectralClustering(n_clusters=2,
random_state=self.random_state,
assign_labels='discretize')

y = sc._fit_cpu(self.X)

y1, y2 = self.__match_randomly_labels_created(y.labels_, self.y)

self.assertTrue(np.array_equal(y1, y2, equal_nan=True))
78 changes: 77 additions & 1 deletion tests/pipeline/executors/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import tempfile
import unittest
import urllib.parse
import networkx as nx

from mock import patch
from mock import patch, Mock

from dask.distributed import Client, LocalCluster

from dasf.utils.funcs import is_gpu_supported
from dasf.pipeline.executors import DaskPipelineExecutor
from dasf.pipeline.executors import DaskTasksPipelineExecutor
from dasf.pipeline.executors.dask import setup_dask_protocol


Expand Down Expand Up @@ -144,6 +146,7 @@ def test_dask_executor_local_gpu_and_unknown_allocator(self):
dask.shutdown(gracefully=True)
dask.close()

self.assertTrue('\'foo\' GPU Memory allocator is not known' in str(context.exception))
self.assertFalse(dask.is_connected)

def test_dask_executor_scheduler_file(self):
Expand Down Expand Up @@ -172,3 +175,76 @@ def test_dask_executor_scheduler_file(self):
def tearDown(self):
if os.path.isfile(self.scheduler_file) or os.path.islink(self.scheduler_file):
os.remove(self.scheduler_file)


class TestDaskTasksPipelineExecutor(unittest.TestCase):
def setUp(self):
self.scheduler_file = os.path.abspath(f"{tempfile.gettempdir()}/scheduler.json")

def test_dask_tasks_executor_remote(self):

with LocalCluster() as cluster:
conn = urllib.parse.urlsplit(cluster.scheduler.address)

dask = DaskTasksPipelineExecutor(address=conn.hostname, port=conn.port, use_gpu=False)

# Compute everything to gracefully shutdown
dask.shutdown(gracefully=True)
dask.close()

self.assertFalse(dask.is_connected)

@unittest.skipIf(not is_gpu_supported(),
"not supported CUDA in this platform")
def test_dask_tasks_executor_local_gpu(self):
with patch.dict(os.environ, {'CUDA_VISIBLE_DEVICES': '0'}):
dask = DaskTasksPipelineExecutor(local=True, use_gpu=True)

# Compute everything to gracefully shutdown
dask.shutdown(gracefully=False)
dask.close()

self.assertFalse(dask.is_connected)

def test_dask_tasks_executor_local_execution(self):

dask = DaskTasksPipelineExecutor(local=True, use_gpu=False)

def func1():
return 2

def func2(X):
return X + 4

def func3(X):
return X - 4

def func4(X, Y):
return X + Y

pipeline = Mock()
pipeline._dag = nx.DiGraph([(hash(func1), hash(func2)),
(hash(func1), hash(func3)),
(hash(func2), hash(func4)),
(hash(func3), hash(func4))])

dask.pre_run(pipeline)

X_1 = dask.execute(func1)
X_2 = dask.execute(func2, X_1)
X_3 = dask.execute(func3, X_1)
X_4 = dask.execute(func4, X=X_2, Y=X_3)

self.assertEqual(X_4.result(), 4)

dask.post_run(pipeline)

# Compute everything to gracefully shutdown
dask.shutdown(gracefully=True)
dask.close()

self.assertFalse(dask.is_connected)

def tearDown(self):
if os.path.isfile(self.scheduler_file) or os.path.islink(self.scheduler_file):
os.remove(self.scheduler_file)

0 comments on commit 41e6d4b

Please sign in to comment.