diff --git a/dasf/pipeline/executors/dask.py b/dasf/pipeline/executors/dask.py index 6adc78a..e428542 100644 --- a/dasf/pipeline/executors/dask.py +++ b/dasf/pipeline/executors/dask.py @@ -235,8 +235,7 @@ def __init__( self.dtype = TaskExecutorType.single_cpu # Share dtype attribute to client - if not hasattr(self.client, "dtype"): - setattr(self.client, "dtype", self.dtype) + setattr(self.client, "dtype", self.dtype) self._tasks_map = dict() @@ -245,6 +244,7 @@ def pre_run(self, pipeline): # TODO: we need to consider other branches for complex pipelines dag_paths = nx.all_simple_paths(pipeline._dag, nodes[0], nodes[-1]) + all_paths = [] for path in dag_paths: all_paths.append(path) diff --git a/tests/ml/cluster/test_agglomerative.py b/tests/ml/cluster/test_agglomerative.py index 32d30dc..49d5809 100644 --- a/tests/ml/cluster/test_agglomerative.py +++ b/tests/ml/cluster/test_agglomerative.py @@ -8,6 +8,7 @@ except ImportError: pass +from mock import patch, Mock from sklearn.datasets import make_blobs from dasf.ml.cluster import AgglomerativeClustering @@ -66,3 +67,27 @@ def test_agglomerative_gpu(self): y1, y2 = self.__match_randomly_labels_created(y.get(), self.y) self.assertTrue(np.array_equal(y1, y2, equal_nan=True)) + + @patch('dasf.ml.cluster.agglomerative.is_gpu_supported', Mock(return_value=False)) + def test_agglomerative_cpu_labels(self): + sc = AgglomerativeClustering(n_clusters=self.centers) + + y = sc._fit_cpu(self.X) + + y1, y2 = self.__match_randomly_labels_created(y.labels_, self.y) + + self.assertTrue(np.array_equal(y1, y2, equal_nan=True)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_agglomerative_gpu_labels(self): + # For GPUs we need to specify which data we are handling with `output_type`. + sc = AgglomerativeClustering(n_clusters=self.centers, output_type='cupy') + + cp_X = cp.asarray(self.X) + + y = sc._fit_gpu(cp_X) + + y1, y2 = self.__match_randomly_labels_created(y.labels_.get(), self.y) + + self.assertTrue(np.array_equal(y1, y2, equal_nan=True)) diff --git a/tests/ml/cluster/test_spectral.py b/tests/ml/cluster/test_spectral.py index 9bb13a4..a56a4ac 100644 --- a/tests/ml/cluster/test_spectral.py +++ b/tests/ml/cluster/test_spectral.py @@ -49,8 +49,7 @@ def test_spectral_cpu(self): def test_spectral_mcpu(self): sc = SpectralClustering(n_clusters=self.centers, - random_state=self.random_state, - n_components=250) + random_state=self.random_state) da_X = da.from_array(self.X) @@ -65,3 +64,14 @@ def test_spectral_mcpu(self): # Check if the accurary is higher than 99%. self.assertTrue(len(np.setdiff1d(y1, y2)) <= int(self.size*0.01)) + + def test_spectral_cpu_labels(self): + sc = SpectralClustering(n_clusters=2, + random_state=self.random_state, + assign_labels='discretize') + + y = sc._fit_cpu(self.X) + + y1, y2 = self.__match_randomly_labels_created(y.labels_, self.y) + + self.assertTrue(np.array_equal(y1, y2, equal_nan=True)) diff --git a/tests/pipeline/executors/test_dask.py b/tests/pipeline/executors/test_dask.py index 95620c1..e9b667e 100644 --- a/tests/pipeline/executors/test_dask.py +++ b/tests/pipeline/executors/test_dask.py @@ -4,13 +4,15 @@ import tempfile import unittest import urllib.parse +import networkx as nx -from mock import patch +from mock import patch, Mock from dask.distributed import Client, LocalCluster from dasf.utils.funcs import is_gpu_supported from dasf.pipeline.executors import DaskPipelineExecutor +from dasf.pipeline.executors import DaskTasksPipelineExecutor from dasf.pipeline.executors.dask import setup_dask_protocol @@ -144,6 +146,7 @@ def test_dask_executor_local_gpu_and_unknown_allocator(self): dask.shutdown(gracefully=True) dask.close() + self.assertTrue('\'foo\' GPU Memory allocator is not known' in str(context.exception)) self.assertFalse(dask.is_connected) def test_dask_executor_scheduler_file(self): @@ -172,3 +175,76 @@ def test_dask_executor_scheduler_file(self): def tearDown(self): if os.path.isfile(self.scheduler_file) or os.path.islink(self.scheduler_file): os.remove(self.scheduler_file) + + +class TestDaskTasksPipelineExecutor(unittest.TestCase): + def setUp(self): + self.scheduler_file = os.path.abspath(f"{tempfile.gettempdir()}/scheduler.json") + + def test_dask_tasks_executor_remote(self): + + with LocalCluster() as cluster: + conn = urllib.parse.urlsplit(cluster.scheduler.address) + + dask = DaskTasksPipelineExecutor(address=conn.hostname, port=conn.port, use_gpu=False) + + # Compute everything to gracefully shutdown + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_dask_tasks_executor_local_gpu(self): + with patch.dict(os.environ, {'CUDA_VISIBLE_DEVICES': '0'}): + dask = DaskTasksPipelineExecutor(local=True, use_gpu=True) + + # Compute everything to gracefully shutdown + dask.shutdown(gracefully=False) + dask.close() + + self.assertFalse(dask.is_connected) + + def test_dask_tasks_executor_local_execution(self): + + dask = DaskTasksPipelineExecutor(local=True, use_gpu=False) + + def func1(): + return 2 + + def func2(X): + return X + 4 + + def func3(X): + return X - 4 + + def func4(X, Y): + return X + Y + + pipeline = Mock() + pipeline._dag = nx.DiGraph([(hash(func1), hash(func2)), + (hash(func1), hash(func3)), + (hash(func2), hash(func4)), + (hash(func3), hash(func4))]) + + dask.pre_run(pipeline) + + X_1 = dask.execute(func1) + X_2 = dask.execute(func2, X_1) + X_3 = dask.execute(func3, X_1) + X_4 = dask.execute(func4, X=X_2, Y=X_3) + + self.assertEqual(X_4.result(), 4) + + dask.post_run(pipeline) + + # Compute everything to gracefully shutdown + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) + + def tearDown(self): + if os.path.isfile(self.scheduler_file) or os.path.islink(self.scheduler_file): + os.remove(self.scheduler_file)