From 27f097791eef5f7cea52f8eb70df399ac79a72f2 Mon Sep 17 00:00:00 2001 From: JuanPedroGHM Date: Thu, 15 Aug 2024 11:58:09 +0200 Subject: [PATCH 1/6] fix: explicit cast of counts and displacements to int --- heat/core/communication.py | 18 +++++++++++------- heat/core/manipulations.py | 2 +- setup.py | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/heat/core/communication.py b/heat/core/communication.py index 98cc44682..cf1708529 100644 --- a/heat/core/communication.py +++ b/heat/core/communication.py @@ -258,7 +258,7 @@ def mpi_type_of(cls, dtype: torch.dtype) -> MPI.Datatype: def mpi_type_and_elements_of( cls, obj: Union[DNDarray, torch.Tensor], - counts: Tuple[int], + counts: Optional[Tuple[int]], displs: Tuple[int], is_contiguous: Optional[bool], ) -> Tuple[MPI.Datatype, Tuple[int, ...]]: @@ -289,7 +289,7 @@ def mpi_type_and_elements_of( if is_contiguous: if counts is None: return mpi_type, elements - factor = np.prod(obj.shape[1:]) + factor = np.prod(obj.shape[1:], dtype=np.int32) return ( mpi_type, ( @@ -317,7 +317,7 @@ def mpi_type_and_elements_of( return mpi_type, elements @classmethod - def as_mpi_memory(cls, obj) -> MPI.memory: + def as_mpi_memory(cls, obj) -> MPI.buffer: """ Converts the passed ``torch.Tensor`` into an MPI compatible memory view. @@ -326,16 +326,16 @@ def as_mpi_memory(cls, obj) -> MPI.memory: obj : torch.Tensor The tensor to be converted into a MPI memory view. """ - return MPI.memory.fromaddress(obj.data_ptr(), 0) + return MPI.buffer.fromaddress(obj.data_ptr(), 0) @classmethod def as_buffer( cls, obj: torch.Tensor, - counts: Tuple[int] = None, - displs: Tuple[int] = None, + counts: Optional[Tuple[int]] = None, + displs: Optional[Tuple[int]] = None, is_contiguous: Optional[bool] = None, - ) -> List[Union[MPI.memory, Tuple[int, int], MPI.Datatype]]: + ) -> List[Union[MPI.buffer, Tuple[int, int], MPI.Datatype]]: """ Converts a passed ``torch.Tensor`` into a memory buffer object with associated number of elements and MPI data type. @@ -356,6 +356,10 @@ def as_buffer( obj.unsqueeze_(-1) squ = True + if counts is not None: + counts = (int(c) for c in counts) + if displs is not None: + displs = (int(d) for d in displs) mpi_type, elements = cls.mpi_type_and_elements_of(obj, counts, displs, is_contiguous) mpi_mem = cls.as_mpi_memory(obj) if squ: diff --git a/heat/core/manipulations.py b/heat/core/manipulations.py index d1958c27a..0e72fecd3 100644 --- a/heat/core/manipulations.py +++ b/heat/core/manipulations.py @@ -3477,7 +3477,7 @@ def vsplit(x: DNDarray, indices_or_sections: Iterable) -> List[DNDarray, ...]: return split(x, indices_or_sections, 0) -def resplit(arr: DNDarray, axis: int = None) -> DNDarray: +def resplit(arr: DNDarray, axis: Optional[int] = None) -> DNDarray: """ Out-of-place redistribution of the content of the `DNDarray`. Allows to "unsplit" (i.e. gather) all values from all nodes, as well as to define a new axis along which the array is split without changes to the values. diff --git a/setup.py b/setup.py index b98401ec1..60d1416e0 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ "Topic :: Scientific/Engineering", ], install_requires=[ - "mpi4py>=3.0.0, <4.0.0", + "mpi4py>=3.0.0", "numpy>=1.22.0, <2", "torch>=2.0.0, <2.3.2", "scipy>=1.10.0", From 849d49a1d50845e619ee2709d33fc08e2553d23d Mon Sep 17 00:00:00 2001 From: JuanPedroGHM Date: Thu, 15 Aug 2024 13:09:14 +0200 Subject: [PATCH 2/6] fix: casting done whenever counts and displacements are used --- heat/core/communication.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/heat/core/communication.py b/heat/core/communication.py index cf1708529..bf907584e 100644 --- a/heat/core/communication.py +++ b/heat/core/communication.py @@ -357,9 +357,9 @@ def as_buffer( squ = True if counts is not None: - counts = (int(c) for c in counts) + counts = tuple(int(c) for c in counts) if displs is not None: - displs = (int(d) for d in displs) + displs = tuple(int(d) for d in displs) mpi_type, elements = cls.mpi_type_and_elements_of(obj, counts, displs, is_contiguous) mpi_mem = cls.as_mpi_memory(obj) if squ: From 2c2d9a3b028d3301c83c7f87b574f3ca64592945 Mon Sep 17 00:00:00 2001 From: JuanPedroGHM Date: Fri, 16 Aug 2024 14:28:01 +0200 Subject: [PATCH 3/6] feat: tests can be skipped based on correct number of nodes, skipped save_csv test --- heat/core/tests/test_io.py | 9 +++++++-- heat/core/tests/test_suites/basic_test.py | 12 ++++++++++++ heat/optim/tests/test_dp_optimizer.py | 9 +++++---- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 5b8ece13b..444f7be74 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -5,6 +5,7 @@ import random import time import fnmatch +import unittest import heat as ht from .test_suites.basic_test import TestCase @@ -147,6 +148,10 @@ def test_load_csv(self): with self.assertRaises(TypeError): ht.load_csv(self.CSV_PATH, header_lines="3", sep=";", split=0) + @unittest.skipIf( + len(TestCase.get_hostnames()) > 1, + "Test only works on single node, file creation is not synchronized across nodes", + ) def test_save_csv(self): for rnd_type in [ (ht.random.randint, ht.types.int32), @@ -159,11 +164,11 @@ def test_save_csv(self): for headers in [None, ["# This", "# is a", "# test."]]: for shape in [(1, 1), (10, 10), (20, 1), (1, 20), (25, 4), (4, 25)]: if rnd_type[0] == ht.random.randint: - data = rnd_type[0]( + data: ht.DNDarray = rnd_type[0]( -1000, 1000, size=shape, dtype=rnd_type[1], split=split ) else: - data = rnd_type[0]( + data: ht.DNDarray = rnd_type[0]( shape[0], shape[1], split=split, diff --git a/heat/core/tests/test_suites/basic_test.py b/heat/core/tests/test_suites/basic_test.py index 965eda75f..a222203d9 100644 --- a/heat/core/tests/test_suites/basic_test.py +++ b/heat/core/tests/test_suites/basic_test.py @@ -1,4 +1,5 @@ import unittest +import platform import os from heat.core import dndarray, MPICommunication, MPI, types, factories @@ -12,6 +13,7 @@ class TestCase(unittest.TestCase): __comm = MPICommunication() __device = None + _hostnames: list[str] = None @property def comm(self): @@ -62,6 +64,16 @@ def get_rank(self): def get_size(self): return self.comm.size + @classmethod + def get_hostnames(cls): + if not cls._hostnames: + if platform.system() == "Windows": + host = platform.uname().node + else: + host = os.uname()[1] + cls._hostnames = set(cls.__comm.handle.allgather(host)) + return cls._hostnames + def assert_array_equal(self, heat_array, expected_array): """ Check if the heat_array is equivalent to the expected_array. Therefore first the split heat_array is compared to diff --git a/heat/optim/tests/test_dp_optimizer.py b/heat/optim/tests/test_dp_optimizer.py index 1ee7ab525..ccbe7bb64 100644 --- a/heat/optim/tests/test_dp_optimizer.py +++ b/heat/optim/tests/test_dp_optimizer.py @@ -7,11 +7,12 @@ from heat.core.tests.test_suites.basic_test import TestCase -@unittest.skipIf( - int(os.getenv("SLURM_NNODES", "1")) < 2 or torch.cuda.device_count() == 0, - "only supported for GPUs and at least two nodes", -) class TestDASO(TestCase): + + @unittest.skipUnless( + len(TestCase.get_hostnames()) >= 2 and torch.cuda.device_count() > 0, + f"only supported for GPUs and at least two nodes, Nodes = {TestCase.get_hostnames()}, torch.cuda.device_count() = {torch.cuda.device_count()}, rank = {ht.MPI_WORLD.rank}", + ) def test_daso(self): import heat.nn.functional as F import heat.optim as optim From eda45be531ef8a7ecfdbb7de5dd5e75e72713fa1 Mon Sep 17 00:00:00 2001 From: JuanPedroGHM Date: Fri, 16 Aug 2024 15:07:32 +0200 Subject: [PATCH 4/6] skip daso test when HEAT_TEST_USE_DEVICE set to cpu --- heat/optim/tests/test_dp_optimizer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/heat/optim/tests/test_dp_optimizer.py b/heat/optim/tests/test_dp_optimizer.py index ccbe7bb64..42464b714 100644 --- a/heat/optim/tests/test_dp_optimizer.py +++ b/heat/optim/tests/test_dp_optimizer.py @@ -10,13 +10,19 @@ class TestDASO(TestCase): @unittest.skipUnless( - len(TestCase.get_hostnames()) >= 2 and torch.cuda.device_count() > 0, + len(TestCase.get_hostnames()) >= 2 + and torch.cuda.device_count() > 1 + and TestCase.device == "cuda", f"only supported for GPUs and at least two nodes, Nodes = {TestCase.get_hostnames()}, torch.cuda.device_count() = {torch.cuda.device_count()}, rank = {ht.MPI_WORLD.rank}", ) def test_daso(self): import heat.nn.functional as F import heat.optim as optim + print( + f"rank = {ht.MPI_WORLD.rank}, host = {os.uname()[1]}, torch.cuda.device_count() = {torch.cuda.device_count()}, torch.cuda.current_device() = {torch.cuda.current_device()}, NNodes = {len(TestCase.get_hostnames())}" + ) + class Model(ht.nn.Module): def __init__(self): super(Model, self).__init__() From ac9126834f9744867151751775ce13b854d08c05 Mon Sep 17 00:00:00 2001 From: JuanPedroGHM Date: Thu, 22 Aug 2024 09:40:18 +0200 Subject: [PATCH 5/6] fix: backwards compatibility of mpi4py and better skip conditions for save_csv --- heat/core/communication.py | 4 ++-- heat/core/tests/test_io.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/heat/core/communication.py b/heat/core/communication.py index bf907584e..4bfb78feb 100644 --- a/heat/core/communication.py +++ b/heat/core/communication.py @@ -317,7 +317,7 @@ def mpi_type_and_elements_of( return mpi_type, elements @classmethod - def as_mpi_memory(cls, obj) -> MPI.buffer: + def as_mpi_memory(cls, obj) -> MPI.memory: """ Converts the passed ``torch.Tensor`` into an MPI compatible memory view. @@ -326,7 +326,7 @@ def as_mpi_memory(cls, obj) -> MPI.buffer: obj : torch.Tensor The tensor to be converted into a MPI memory view. """ - return MPI.buffer.fromaddress(obj.data_ptr(), 0) + return MPI.memory.fromaddress(obj.data_ptr(), 0) @classmethod def as_buffer( diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 444f7be74..ffab43088 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -149,8 +149,8 @@ def test_load_csv(self): ht.load_csv(self.CSV_PATH, header_lines="3", sep=";", split=0) @unittest.skipIf( - len(TestCase.get_hostnames()) > 1, - "Test only works on single node, file creation is not synchronized across nodes", + len(TestCase.get_hostnames()) > 1 and not os.environ.get("TMPDIR"), + "Requires the environment variable 'TMPDIR' to point to a globally accessible path. Otherwise the test will be skiped on multi-node setups.", ) def test_save_csv(self): for rnd_type in [ From 6d082627bdb00f8d066743b20f404a8c548936d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guti=C3=A9rrez=20Hermosillo=20Muriedas=2C=20Juan=20Pedro?= Date: Mon, 26 Aug 2024 09:47:49 +0200 Subject: [PATCH 6/6] fix: TODO comment warning of future deprecation --- heat/core/communication.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/heat/core/communication.py b/heat/core/communication.py index 4bfb78feb..8e41cff59 100644 --- a/heat/core/communication.py +++ b/heat/core/communication.py @@ -326,6 +326,7 @@ def as_mpi_memory(cls, obj) -> MPI.memory: obj : torch.Tensor The tensor to be converted into a MPI memory view. """ + # TODO: MPI.memory might be depraecated in future versions of mpi4py. The following code might need to be adapted and use MPI.buffer instead. return MPI.memory.fromaddress(obj.data_ptr(), 0) @classmethod @@ -335,7 +336,7 @@ def as_buffer( counts: Optional[Tuple[int]] = None, displs: Optional[Tuple[int]] = None, is_contiguous: Optional[bool] = None, - ) -> List[Union[MPI.buffer, Tuple[int, int], MPI.Datatype]]: + ) -> List[Union[MPI.memory, Tuple[int, int], MPI.Datatype]]: """ Converts a passed ``torch.Tensor`` into a memory buffer object with associated number of elements and MPI data type.