diff --git a/dasf/pipeline/executors/dask.py b/dasf/pipeline/executors/dask.py index f345484..6adc78a 100644 --- a/dasf/pipeline/executors/dask.py +++ b/dasf/pipeline/executors/dask.py @@ -111,8 +111,8 @@ def __init__( rmm.reinitialize(managed_memory=True) cp.cuda.set_allocator(rmm.rmm_cupy_allocator) else: - raise Exception(f"'{gpu_allocator}' GPU Memory allocator is not " - "known") + raise ValueError(f"'{gpu_allocator}' GPU Memory allocator is not " + "known") else: self.dtype = TaskExecutorType.multi_cpu @@ -141,7 +141,7 @@ def __init__( @property def ngpus(self): - return len(get_dask_gpu_count()) + return get_dask_gpu_count() @property def is_connected(self): @@ -181,6 +181,9 @@ def shutdown(self, gracefully=True): else: self.client.shutdown() + def close(self): + self.client.close() + class DaskTasksPipelineExecutor(DaskPipelineExecutor): """ @@ -293,6 +296,9 @@ def shutdown(self, gracefully=True): else: self.client.shutdown() + def close(self): + self.client.close() + class DaskPBSPipelineExecutor(Executor): def __init__(self, **kwargs): diff --git a/dasf/transforms/operations.py b/dasf/transforms/operations.py index cbf2842..cf55051 100644 --- a/dasf/transforms/operations.py +++ b/dasf/transforms/operations.py @@ -69,6 +69,9 @@ def transform(self, X): if self.x > 1 or self.y > 1 or self.z > 1: raise Exception("Percentages cannot be higher than 100% (1.0)") + if self.x <= 0 or self.y <= 0 or self.z <= 0: + raise Exception("Percentages cannot be negative or 0") + if X.ndim == 1: return X[0 : int(self.x * X.shape[0])] elif X.ndim == 2: diff --git a/dasf/utils/funcs.py b/dasf/utils/funcs.py index 43cb796..d7ac495 100644 --- a/dasf/utils/funcs.py +++ b/dasf/utils/funcs.py @@ -65,7 +65,12 @@ def get_worker_info(client) -> list: Returns a list of workers (sorted), and the DNS name for the master host The master is the 0th worker's host """ - workers = client.scheduler_info()["workers"] + info = client.scheduler_info() + + if "workers" not in info: + return [] + + workers = info["workers"] worker_keys = sorted(workers.keys()) workers_by_host = {} for key in worker_keys: diff --git a/tests/pipeline/executors/test_dask.py b/tests/pipeline/executors/test_dask.py index be53fd5..95620c1 100644 --- a/tests/pipeline/executors/test_dask.py +++ b/tests/pipeline/executors/test_dask.py @@ -5,9 +5,30 @@ import unittest import urllib.parse +from mock import patch + from dask.distributed import Client, LocalCluster +from dasf.utils.funcs import is_gpu_supported from dasf.pipeline.executors import DaskPipelineExecutor +from dasf.pipeline.executors.dask import setup_dask_protocol + + +class TestDaskProtocol(unittest.TestCase): + def test_setup_dask_protocol_none(self): + self.assertEqual(setup_dask_protocol(), "tcp://") + + def test_setup_dask_protocol_tcp(self): + self.assertEqual(setup_dask_protocol("tcp"), "tcp://") + + def test_setup_dask_protocol_ucx(self): + self.assertEqual(setup_dask_protocol("ucx"), "ucx://") + + def test_setup_dask_protocol_foo(self): + with self.assertRaises(Exception) as context: + setup_dask_protocol("foo") + + self.assertTrue('Protocol foo is not supported.' in str(context.exception)) class TestDaskExecutor(unittest.TestCase): @@ -15,13 +36,17 @@ def setUp(self): self.scheduler_file = os.path.abspath(f"{tempfile.gettempdir()}/scheduler.json") def test_dask_executor_remote(self): + with LocalCluster() as cluster: conn = urllib.parse.urlsplit(cluster.scheduler.address) dask = DaskPipelineExecutor(address=conn.hostname, port=conn.port) # Compute everything to gracefully shutdown - dask.client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) def test_dask_executor_local_no_args(self): dask = DaskPipelineExecutor() @@ -32,18 +57,94 @@ def test_dask_executor_local_no_args(self): # Compute everything to gracefully shutdown client.close() - dask.client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) + + def test_dask_executor_local_no_args_no_gracefully(self): + dask = DaskPipelineExecutor() + + client = Client.current() + + self.assertEqual(hash(dask.client), hash(client)) + + # Compute everything to gracefully shutdown + client.close() + dask.shutdown(gracefully=False) + dask.close() + + self.assertFalse(dask.is_connected) def test_dask_executor_local(self): dask = DaskPipelineExecutor(local=True) client = Client.current() + self.assertTrue(dask.is_connected) self.assertEqual(hash(dask.client), hash(client)) # Compute everything to gracefully shutdown client.close() - dask.client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_dask_executor_local_gpu(self): + with patch.dict(os.environ, {'CUDA_VISIBLE_DEVICES': '0'}): + + dask = DaskPipelineExecutor(local=True, use_gpu=True) + + client = Client.current() + + self.assertEqual(hash(dask.client), hash(client)) + self.assertGreater(dask.ngpus, 0) + + # Compute everything to gracefully shutdown + client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_dask_executor_local_gpu_and_rmm(self): + with patch.dict(os.environ, {'CUDA_VISIBLE_DEVICES': '0'}): + + dask = DaskPipelineExecutor(local=True, use_gpu=True, gpu_allocator="rmm") + + client = Client.current() + + self.assertEqual(hash(dask.client), hash(client)) + + # Compute everything to gracefully shutdown + client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_dask_executor_local_gpu_and_unknown_allocator(self): + with self.assertRaises(ValueError) as context: + + dask = DaskPipelineExecutor(local=True, use_gpu=True, gpu_allocator="foo") + + client = Client.current() + + self.assertEqual(hash(dask.client), hash(client)) + + # Compute everything to gracefully shutdown + client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) def test_dask_executor_scheduler_file(self): with LocalCluster() as cluster: @@ -63,7 +164,10 @@ def test_dask_executor_scheduler_file(self): self.assertEqual(hash(dask.client), hash(client)) # Compute everything to gracefully shutdown - dask.client.close() + dask.shutdown(gracefully=True) + dask.close() + + self.assertFalse(dask.is_connected) def tearDown(self): if os.path.isfile(self.scheduler_file) or os.path.islink(self.scheduler_file): diff --git a/tests/transforms/test_operations.py b/tests/transforms/test_operations.py index 38d46ae..cc1ce65 100644 --- a/tests/transforms/test_operations.py +++ b/tests/transforms/test_operations.py @@ -23,6 +23,7 @@ from dasf.utils.funcs import is_gpu_supported from dasf.transforms.operations import Reshape from dasf.transforms.operations import SliceArray +from dasf.transforms.operations import SliceArrayByPercent class TestReshape(unittest.TestCase): @@ -317,3 +318,180 @@ def test_slice_array_unknown_dim(self): y = slice_t.transform(data) self.assertTrue('The dimmension is not known' in str(context.exception)) + + +class TestSliceArrayByPercent(unittest.TestCase): + def test_slice_array_cpu_1d(self): + data = np.random.random((40,)) + + slice_t = SliceArrayByPercent(x=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_cpu_array(y)) + self.assertEqual(y.shape, (10,)) + + def test_slice_dask_array_cpu_1d(self): + data = da.random.random((40,), chunks=(5)) + + slice_t = SliceArrayByPercent(x=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_dask_cpu_array(y)) + self.assertEqual(y.shape, (10,)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_slice_array_gpu_1d(self): + data = cp.random.random((40,)) + + slice_t = SliceArrayByPercent(x=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_gpu_array(y)) + self.assertEqual(y.shape, (10,)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_slice_dask_array_gpu_1d(self): + data = cp.random.random((40,)) + data = da.from_array(data, chunks=(5)) + + slice_t = SliceArrayByPercent(x=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_dask_gpu_array(y)) + self.assertEqual(y.shape, (10,)) + + def test_slice_array_cpu_2d(self): + data = np.random.random((40, 40)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_cpu_array(y)) + self.assertEqual(y.shape, (10, 10)) + + def test_slice_dask_array_cpu_2d(self): + data = da.random.random((40, 40), chunks=(5, 5)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_dask_cpu_array(y)) + self.assertEqual(y.shape, (10, 10)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_slice_array_gpu_2d(self): + data = cp.random.random((40, 40)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_gpu_array(y)) + self.assertEqual(y.shape, (10, 10)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_slice_dask_array_gpu_2d(self): + data = cp.random.random((40, 40)) + data = da.from_array(data, chunks=(5, 5)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_dask_gpu_array(y)) + self.assertEqual(y.shape, (10, 10)) + + def test_slice_array_cpu_3d(self): + data = np.random.random((40, 40, 40)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0, z=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_cpu_array(y)) + self.assertEqual(y.shape, (10, 10, 10)) + + def test_slice_dask_array_cpu_3d(self): + data = da.random.random((40, 40, 40), chunks=(5, 5, 5)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0, z=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_dask_cpu_array(y)) + self.assertEqual(y.shape, (10, 10, 10)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_slice_array_gpu_3d(self): + data = cp.random.random((40, 40, 40)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0, z=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_gpu_array(y)) + self.assertEqual(y.shape, (10, 10, 10)) + + @unittest.skipIf(not is_gpu_supported(), + "not supported CUDA in this platform") + def test_slice_dask_array_gpu_3d(self): + data = cp.random.random((40, 40, 40)) + data = da.from_array(data, chunks=(5, 5, 5)) + + slice_t = SliceArrayByPercent(x=25.0, y=25.0, z=25.0) + + y = slice_t.transform(data) + + self.assertTrue(is_dask_gpu_array(y)) + self.assertEqual(y.shape, (10, 10, 10)) + + def test_slice_array_unknown_dim(self): + data = np.random.random((2, 2, 2, 2)) + + slice_t = SliceArrayByPercent(x=50.0, y=50.0, z=50.0) + + with self.assertRaises(Exception) as context: + y = slice_t.transform(data) + + self.assertTrue('The dimmension is not known' in str(context.exception)) + + def test_slice_array_exceeding_percentage(self): + data = np.random.random((4, 4, 4)) + + slice_t = SliceArrayByPercent(x=150.0, y=150.0, z=50.0) + + with self.assertRaises(Exception) as context: + y = slice_t.transform(data) + + self.assertTrue('Percentages cannot be higher than 100% (1.0)' in str(context.exception)) + + def test_slice_array_zero_percentage(self): + data = np.random.random((4, 4, 4)) + + slice_t = SliceArrayByPercent(x=50.0, y=0.0, z=50.0) + + with self.assertRaises(Exception) as context: + y = slice_t.transform(data) + + self.assertTrue('Percentages cannot be negative or 0' in str(context.exception)) + + def test_slice_array_negative_percentage(self): + data = np.random.random((4, 4, 4)) + + slice_t = SliceArrayByPercent(x=-50.0, y=50.0, z=50.0) + + with self.assertRaises(Exception) as context: + y = slice_t.transform(data) + + self.assertTrue('Percentages cannot be negative or 0' in str(context.exception))