From 683cf22edfe632103b36343c95d94eb2bd8e5f26 Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 2 Jul 2024 14:30:14 +0200 Subject: [PATCH 01/16] Now with docstrings --- heat/core/io.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/heat/core/io.py b/heat/core/io.py index 779d783dc..f27d7f692 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -8,6 +8,7 @@ import torch import warnings import fnmatch +import pandas as pd from typing import Dict, Iterable, List, Optional, Tuple, Union @@ -1202,3 +1203,68 @@ def load_npy_from_path( x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) return x + + +def load_csv_from_folder( + path: str, + dtype: datatype = types.int32, + split: int = 0, + device: Optional[str] = None, + comm: Optional[Communication] = None, + func: Optional[pd.DataFrame] = None, +) -> DNDarray: + """ + Loads multiple .csv files into one DNDarray which will be returned. The data will be concatenated along the split axis provided as input. + + Parameters + ---------- + path : str + Path to the directory in which .csv-files are located. + dtype : datatype, optional + Data type of the resulting array. + split : int + Along which axis the loaded arrays should be concatenated. + device : str, optional + The device id on which to place the data, defaults to globally set default device. + comm : Communication, optional + The communication to use for the data distribution, default is 'heat.MPI_WORLD' + func : pandas.DataFrame, optional + The function the files have to go through before being added to the array. + """ + process_number = MPI_WORLD.size + file_list = [] + for file in os.listdir(path): + if fnmatch.fnmatch(file, "*.csv"): + file_list.append(file) + n_files = len(file_list) + + if n_files == 0: + raise ValueError("No .csv Files were found") + if (n_files < process_number) and (process_number > 1): + raise RuntimeError("Number of processes can't exceed number of files") + + rank = MPI_WORLD.rank + if rank + 1 != process_number: + n_for_procs = n_files // process_number + else: + n_for_procs = (n_files // process_number) + (n_files % process_number) + + local_list = [ + file_list[i] + for i in range( + rank * (n_files // process_number), rank * (n_files // process_number) + n_for_procs + ) + ] + + array_list = [] + for element in local_list: + df = pd.read_csv(element) + if (func is not None) and callable(func): + xf = func.to_numpy() + array_list.append(xf) + else: + array_list.append(df.to_numpy()) + larray = np.concatenate(array_list, split) + larray = torch.from_numpy(larray) + x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) + return x From 4c1a242e2dcfd375df3fcd5eab7f4d2dc4c3f8b9 Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Wed, 3 Jul 2024 12:37:30 +0200 Subject: [PATCH 02/16] Removed blank line after docstring --- heat/core/io.py | 13 ++++++--- heat/core/tests/test_io.py | 54 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index f27d7f692..dd38315af 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -37,6 +37,7 @@ "supports_hdf5", "supports_netcdf", "load_npy_from_path", + "load_csv_from_folder", ] try: @@ -1211,7 +1212,7 @@ def load_csv_from_folder( split: int = 0, device: Optional[str] = None, comm: Optional[Communication] = None, - func: Optional[pd.DataFrame] = None, + func: Optional[callable] = None, ) -> DNDarray: """ Loads multiple .csv files into one DNDarray which will be returned. The data will be concatenated along the split axis provided as input. @@ -1231,6 +1232,11 @@ def load_csv_from_folder( func : pandas.DataFrame, optional The function the files have to go through before being added to the array. """ + if not isinstance(path, str): + raise TypeError(f"path must be str, not {type(path)}") + elif split is not None and not isinstance(split, int): + raise TypeError(f"split must be None or int, not {type(split)}") + process_number = MPI_WORLD.size file_list = [] for file in os.listdir(path): @@ -1258,12 +1264,13 @@ def load_csv_from_folder( array_list = [] for element in local_list: - df = pd.read_csv(element) + df = pd.read_csv(path + "/" + element) if (func is not None) and callable(func): - xf = func.to_numpy() + xf = func(df).to_numpy() array_list.append(xf) else: array_list.append(df.to_numpy()) + larray = np.concatenate(array_list, split) larray = torch.from_numpy(larray) x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index ce747a488..ce7c725ac 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -2,8 +2,10 @@ import os import torch import tempfile -import random import time +import random +import shutil +import pandas as pd import fnmatch import heat as ht @@ -753,7 +755,6 @@ def test_load_npy_int(self): crea_array.append(x) int_array = np.concatenate(crea_array) ht.MPI_WORLD.Barrier() - load_array = ht.load_npy_from_path( os.path.join(os.getcwd(), "heat/datasets"), dtype=ht.int32, split=0 ) @@ -800,3 +801,52 @@ def test_load_npy_exception(self): if ht.MPI_WORLD.size > 1: with self.assertRaises(RuntimeError): ht.load_npy_from_path("heat/datasets/npy_dummy", dtype=ht.int64, split=0) + + def test_load_multiple_csv(self): + csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") + if ht.MPI_WORLD.rank == 0: + nplist = [] + os.mkdir(csv_path) + for i in range(0, 20): + a = np.random.randint(100, size=(5)) + b = np.random.randint(100, size=(5)) + c = np.random.randint(100, size=(5)) + + data = {"A": a, "B": b, "C": c} + df = pd.DataFrame(data) + nplist.append(df.to_numpy()) + df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) + + nparray = np.concatenate(nplist) + ht.MPI_WORLD.Barrier() + + load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) + + load_array_npy = load_array.numpy() + self.assertIsInstance(load_array, ht.DNDarray) + self.assertEqual(load_array.dtype, ht.int32) + + if ht.MPI_WORLD.rank == 0: + self.assertTrue((load_array_npy == nparray).all) + shutil.rmtree(csv_path) + + def test_load_multiple_csv_exception(self): + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path=1, split=0) + with self.assertRaises(TypeError): + ht.load_csv_from_folder("heat/datasets", split="ABC") + with self.assertRaises(ValueError): + ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0) + if ht.MPI_WORLD.size > 1: + if ht.MPI_WORLD.rank == 0: + os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) + df = pd.DataFrame({"A": [0, 0, 0]}) + df.to_csv( + (os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")), index=False + ) + ht.MPI_WORLD.Barrier() + + with self.assertRaises(RuntimeError): + ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) + if ht.MPI_WORLD.rank == 0: + shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) From 98052b3e2a6a24951f6dd3f541468c67a589aa8d Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Thu, 4 Jul 2024 16:00:11 +0200 Subject: [PATCH 03/16] Testing for callable function --- heat/core/io.py | 5 +++-- heat/core/tests/test_io.py | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index dd38315af..833faaf06 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1266,8 +1266,9 @@ def load_csv_from_folder( for element in local_list: df = pd.read_csv(path + "/" + element) if (func is not None) and callable(func): - xf = func(df).to_numpy() - array_list.append(xf) + xf = func(df) + print(xf) + array_list.append(xf.to_numpy()) else: array_list.append(df.to_numpy()) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index ce7c725ac..9698828a6 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -806,6 +806,7 @@ def test_load_multiple_csv(self): csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") if ht.MPI_WORLD.rank == 0: nplist = [] + npdroplist = [] os.mkdir(csv_path) for i in range(0, 20): a = np.random.randint(100, size=(5)) @@ -813,21 +814,35 @@ def test_load_multiple_csv(self): c = np.random.randint(100, size=(5)) data = {"A": a, "B": b, "C": c} + data2 = {"B": b, "C": c} df = pd.DataFrame(data) + df2 = pd.DataFrame(data2) nplist.append(df.to_numpy()) + npdroplist.append(df2.to_numpy()) df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) nparray = np.concatenate(nplist) + npdroparray = np.concatenate(npdroplist) ht.MPI_WORLD.Barrier() + def delete_first_col(dataf): + dataf.drop(dataf.columns[0], axis=1, inplace=True) + return dataf + load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) + load_func_array = ht.load_csv_from_folder( + csv_path, dtype=ht.int32, split=0, func=delete_first_col + ) load_array_npy = load_array.numpy() + load_func_array_npy = load_func_array.numpy() + self.assertIsInstance(load_array, ht.DNDarray) self.assertEqual(load_array.dtype, ht.int32) if ht.MPI_WORLD.rank == 0: self.assertTrue((load_array_npy == nparray).all) + self.assertTrue((load_func_array_npy == npdroparray).all) shutil.rmtree(csv_path) def test_load_multiple_csv_exception(self): From 4c6f255f9d7b7d29cb6543ebc52d61ab8350cf06 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 4 Jul 2024 15:01:37 +0000 Subject: [PATCH 04/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- heat/core/tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 72f067b65..34465b673 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -871,4 +871,4 @@ def test_load_multiple_csv_exception(self): with self.assertRaises(RuntimeError): ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) if ht.MPI_WORLD.rank == 0: - shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) \ No newline at end of file + shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) From c6fbdf034fc716e308f92523b7a43c74b0de8de8 Mon Sep 17 00:00:00 2001 From: Fabian Hoppe <112093564+mrfh92@users.noreply.github.com> Date: Thu, 4 Jul 2024 17:06:38 +0200 Subject: [PATCH 05/16] Update setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 668261891..c287456d1 100644 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ "scipy>=1.10.0", "pillow>=6.0.0", "torchvision>=0.12.0", + "pandas" ], extras_require={ "docutils": ["docutils>=0.16"], From 82d45d83748f6a6b68edee97f6542844fca54836 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 4 Jul 2024 15:07:02 +0000 Subject: [PATCH 06/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c287456d1..323a00d51 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ "scipy>=1.10.0", "pillow>=6.0.0", "torchvision>=0.12.0", - "pandas" + "pandas", ], extras_require={ "docutils": ["docutils>=0.16"], From 0b61f8fdbe553dd54e92c22131911c45dca9272c Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 9 Jul 2024 10:11:22 +0200 Subject: [PATCH 07/16] Added test for other dtype and list comprehension commented --- heat/core/io.py | 147 +++++++++++++++++++++---------------- heat/core/tests/test_io.py | 7 +- 2 files changed, 88 insertions(+), 66 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 833faaf06..02c3ede20 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -8,7 +8,6 @@ import torch import warnings import fnmatch -import pandas as pd from typing import Dict, Iterable, List, Optional, Tuple, Union @@ -1206,73 +1205,93 @@ def load_npy_from_path( return x -def load_csv_from_folder( - path: str, - dtype: datatype = types.int32, - split: int = 0, - device: Optional[str] = None, - comm: Optional[Communication] = None, - func: Optional[callable] = None, -) -> DNDarray: - """ - Loads multiple .csv files into one DNDarray which will be returned. The data will be concatenated along the split axis provided as input. - - Parameters - ---------- - path : str - Path to the directory in which .csv-files are located. - dtype : datatype, optional - Data type of the resulting array. - split : int - Along which axis the loaded arrays should be concatenated. - device : str, optional - The device id on which to place the data, defaults to globally set default device. - comm : Communication, optional - The communication to use for the data distribution, default is 'heat.MPI_WORLD' - func : pandas.DataFrame, optional - The function the files have to go through before being added to the array. - """ - if not isinstance(path, str): - raise TypeError(f"path must be str, not {type(path)}") - elif split is not None and not isinstance(split, int): - raise TypeError(f"split must be None or int, not {type(split)}") +try: + import pandas as pd +except ImportError: + # netCDF4 support is optional + def supports_pandas() -> bool: + """ + Returns ``True`` if Heat supports pandas library, ``False`` otherwise. + """ + return False - process_number = MPI_WORLD.size - file_list = [] - for file in os.listdir(path): - if fnmatch.fnmatch(file, "*.csv"): - file_list.append(file) - n_files = len(file_list) +else: - if n_files == 0: - raise ValueError("No .csv Files were found") - if (n_files < process_number) and (process_number > 1): - raise RuntimeError("Number of processes can't exceed number of files") + __all__.extend("load_csv_from_folder") - rank = MPI_WORLD.rank - if rank + 1 != process_number: - n_for_procs = n_files // process_number - else: - n_for_procs = (n_files // process_number) + (n_files % process_number) + def load_csv_from_folder( + path: str, + dtype: datatype = types.int32, + split: int = 0, + device: Optional[str] = None, + comm: Optional[Communication] = None, + func: Optional[callable] = None, + ) -> DNDarray: + """ + Loads multiple .csv files into one DNDarray which will be returned. The data will be concatenated along the split axis provided as input. - local_list = [ - file_list[i] - for i in range( - rank * (n_files // process_number), rank * (n_files // process_number) + n_for_procs - ) - ] + Parameters + ---------- + path : str + Path to the directory in which .csv-files are located. + dtype : datatype, optional + Data type of the resulting array. + split : int + Along which axis the loaded arrays should be concatenated. + device : str, optional + The device id on which to place the data, defaults to globally set default device. + comm : Communication, optional + The communication to use for the data distribution, default is 'heat.MPI_WORLD' + func : pandas.DataFrame, optional + The function the files have to go through before being added to the array. + """ + if not isinstance(path, str): + raise TypeError(f"path must be str, not {type(path)}") + elif split is not None and not isinstance(split, int): + raise TypeError(f"split must be None or int, not {type(split)}") - array_list = [] - for element in local_list: - df = pd.read_csv(path + "/" + element) - if (func is not None) and callable(func): - xf = func(df) - print(xf) - array_list.append(xf.to_numpy()) + process_number = MPI_WORLD.size + file_list = [] + for file in os.listdir(path): + if fnmatch.fnmatch(file, "*.csv"): + file_list.append(file) + n_files = len(file_list) + + if n_files == 0: + raise ValueError("No .csv Files were found") + if (n_files < process_number) and (process_number > 1): + raise RuntimeError("Number of processes can't exceed number of files") + + rank = MPI_WORLD.rank + if rank + 1 != process_number: + n_for_procs = n_files // process_number else: - array_list.append(df.to_numpy()) + n_for_procs = (n_files // process_number) + (n_files % process_number) - larray = np.concatenate(array_list, split) - larray = torch.from_numpy(larray) - x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) - return x + local_list = [ + file_list[i] + for i in range( + rank * (n_files // process_number), rank * (n_files // process_number) + n_for_procs + ) + ] + + array_list = [] + for element in local_list: + df = pd.read_csv(path + "/" + element) + if (func is not None) and callable(func): + xf = func(df) + array_list.append(xf.to_numpy()) + else: + array_list.append(df.to_numpy()) + + """n_for_procs = n_files // process_number + idx = rank * n_for_procs + if rank + 1 == process_number: + n_for_procs += n_files % process_number + array_list = [array_list.append(func(pd.read_csv(path + "/" + element)).to_numpy()) if ((func is not None)and(callable(func))) else array_list.append(pd.read_csv(path + "/" + element).to_numpy())for element in file_list[idx : idx + n_for_procs]] + """ + + larray = np.concatenate(array_list, split) + larray = torch.from_numpy(larray) + x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) + return x diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 9698828a6..3a667e258 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -802,7 +802,7 @@ def test_load_npy_exception(self): with self.assertRaises(RuntimeError): ht.load_npy_from_path("heat/datasets/npy_dummy", dtype=ht.int64, split=0) - def test_load_multiple_csv(self): + """def test_load_multiple_csv(self): csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") if ht.MPI_WORLD.rank == 0: nplist = [] @@ -833,12 +833,15 @@ def delete_first_col(dataf): load_func_array = ht.load_csv_from_folder( csv_path, dtype=ht.int32, split=0, func=delete_first_col ) + load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split = 0) load_array_npy = load_array.numpy() load_func_array_npy = load_func_array.numpy() self.assertIsInstance(load_array, ht.DNDarray) self.assertEqual(load_array.dtype, ht.int32) + self.assertEqual(load_array_float.dtype, ht.float32) + if ht.MPI_WORLD.rank == 0: self.assertTrue((load_array_npy == nparray).all) @@ -864,4 +867,4 @@ def test_load_multiple_csv_exception(self): with self.assertRaises(RuntimeError): ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) if ht.MPI_WORLD.rank == 0: - shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) + shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests"))""" From fd2a007e910ff0e4af9e3cd8bc0d9a93768adf7a Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 9 Jul 2024 11:02:39 +0200 Subject: [PATCH 08/16] List compr, TypError when non-callable, pandas optional --- heat/core/io.py | 46 ++++++-------- heat/core/tests/test_io.py | 120 +++++++++++++++++++------------------ 2 files changed, 81 insertions(+), 85 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index a07bd86ff..c28c1f9ff 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -36,7 +36,6 @@ "supports_hdf5", "supports_netcdf", "load_npy_from_path", - "load_csv_from_folder", ] try: @@ -1202,13 +1201,19 @@ def load_npy_from_path( # netCDF4 support is optional def supports_pandas() -> bool: """ - Returns ``True`` if Heat supports pandas library, ``False`` otherwise. + Returns ``True`` if pandas is installed , ``False`` otherwise. """ return False else: + # add functions to visible exports + __all__.extend(["load_csv_from_folder"]) - __all__.extend("load_csv_from_folder") + def supports_pandas() -> bool: + """ + Returns ``True`` if pandas is installed, ``False`` otherwise. + """ + return True def load_csv_from_folder( path: str, @@ -1240,6 +1245,8 @@ def load_csv_from_folder( raise TypeError(f"path must be str, not {type(path)}") elif split is not None and not isinstance(split, int): raise TypeError(f"split must be None or int, not {type(split)}") + elif (func is not None) and not callable(func): + raise TypeError("func needs to be a callable function or None") process_number = MPI_WORLD.size file_list = [] @@ -1254,33 +1261,18 @@ def load_csv_from_folder( raise RuntimeError("Number of processes can't exceed number of files") rank = MPI_WORLD.rank - if rank + 1 != process_number: - n_for_procs = n_files // process_number - else: - n_for_procs = (n_files // process_number) + (n_files % process_number) - - local_list = [ - file_list[i] - for i in range( - rank * (n_files // process_number), rank * (n_files // process_number) + n_for_procs - ) - ] - - array_list = [] - for element in local_list: - df = pd.read_csv(path + "/" + element) - if (func is not None) and callable(func): - xf = func(df) - array_list.append(xf.to_numpy()) - else: - array_list.append(df.to_numpy()) - - """n_for_procs = n_files // process_number + n_for_procs = n_files // process_number idx = rank * n_for_procs if rank + 1 == process_number: n_for_procs += n_files % process_number - array_list = [array_list.append(func(pd.read_csv(path + "/" + element)).to_numpy()) if ((func is not None)and(callable(func))) else array_list.append(pd.read_csv(path + "/" + element).to_numpy())for element in file_list[idx : idx + n_for_procs]] - """ + array_list = [ + ( + (func(pd.read_csv(path + "/" + element))).to_numpy() + if ((func is not None) and (callable(func))) + else (pd.read_csv(path + "/" + element)).to_numpy() + ) + for element in file_list[idx : idx + n_for_procs] + ] larray = np.concatenate(array_list, split) larray = torch.from_numpy(larray) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index d6d52a3d8..c0764ef7b 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -809,69 +809,73 @@ def test_load_npy_exception(self): if ht.MPI_WORLD.rank == 0: os.remove(os.path.join(os.getcwd(), "heat/datasets", "float_data.npy")) - """def test_load_multiple_csv(self): - csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") - if ht.MPI_WORLD.rank == 0: - nplist = [] - npdroplist = [] - os.mkdir(csv_path) - for i in range(0, 20): - a = np.random.randint(100, size=(5)) - b = np.random.randint(100, size=(5)) - c = np.random.randint(100, size=(5)) - - data = {"A": a, "B": b, "C": c} - data2 = {"B": b, "C": c} - df = pd.DataFrame(data) - df2 = pd.DataFrame(data2) - nplist.append(df.to_numpy()) - npdroplist.append(df2.to_numpy()) - df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) - - nparray = np.concatenate(nplist) - npdroparray = np.concatenate(npdroplist) - ht.MPI_WORLD.Barrier() - - def delete_first_col(dataf): - dataf.drop(dataf.columns[0], axis=1, inplace=True) - return dataf + def test_load_multiple_csv(self): + if ht.io.supports_pandas: + csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") + if ht.MPI_WORLD.rank == 0: + nplist = [] + npdroplist = [] + os.mkdir(csv_path) + for i in range(0, 20): + a = np.random.randint(100, size=(5)) + b = np.random.randint(100, size=(5)) + c = np.random.randint(100, size=(5)) + + data = {"A": a, "B": b, "C": c} + data2 = {"B": b, "C": c} + df = pd.DataFrame(data) + df2 = pd.DataFrame(data2) + nplist.append(df.to_numpy()) + npdroplist.append(df2.to_numpy()) + df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) + + nparray = np.concatenate(nplist) + npdroparray = np.concatenate(npdroplist) + ht.MPI_WORLD.Barrier() - load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) - load_func_array = ht.load_csv_from_folder( - csv_path, dtype=ht.int32, split=0, func=delete_first_col - ) - load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split = 0) + def delete_first_col(dataf): + dataf.drop(dataf.columns[0], axis=1, inplace=True) + return dataf - load_array_npy = load_array.numpy() - load_func_array_npy = load_func_array.numpy() + load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) + load_func_array = ht.load_csv_from_folder( + csv_path, dtype=ht.int32, split=0, func=delete_first_col + ) + load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split=0) - self.assertIsInstance(load_array, ht.DNDarray) - self.assertEqual(load_array.dtype, ht.int32) - self.assertEqual(load_array_float.dtype, ht.float32) + load_array_npy = load_array.numpy() + load_func_array_npy = load_func_array.numpy() + self.assertIsInstance(load_array, ht.DNDarray) + self.assertEqual(load_array.dtype, ht.int32) + self.assertEqual(load_array_float.dtype, ht.float32) - if ht.MPI_WORLD.rank == 0: - self.assertTrue((load_array_npy == nparray).all) - self.assertTrue((load_func_array_npy == npdroparray).all) - shutil.rmtree(csv_path) + if ht.MPI_WORLD.rank == 0: + self.assertTrue((load_array_npy == nparray).all) + self.assertTrue((load_func_array_npy == npdroparray).all) + shutil.rmtree(csv_path) def test_load_multiple_csv_exception(self): - with self.assertRaises(TypeError): - ht.load_csv_from_folder(path=1, split=0) - with self.assertRaises(TypeError): - ht.load_csv_from_folder("heat/datasets", split="ABC") - with self.assertRaises(ValueError): - ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0) - if ht.MPI_WORLD.size > 1: - if ht.MPI_WORLD.rank == 0: - os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) - df = pd.DataFrame({"A": [0, 0, 0]}) - df.to_csv( - (os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")), index=False - ) - ht.MPI_WORLD.Barrier() + if ht.io.supports_pandas: + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path=1, split=0) + with self.assertRaises(TypeError): + ht.load_csv_from_folder("heat/datasets", split="ABC") + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path="heat/datasets", func=1) + with self.assertRaises(ValueError): + ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0) + if ht.MPI_WORLD.size > 1: + if ht.MPI_WORLD.rank == 0: + os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) + df = pd.DataFrame({"A": [0, 0, 0]}) + df.to_csv( + (os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")), + index=False, + ) + ht.MPI_WORLD.Barrier() - with self.assertRaises(RuntimeError): - ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) - if ht.MPI_WORLD.rank == 0: - shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests"))""" + with self.assertRaises(RuntimeError): + ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) + if ht.MPI_WORLD.rank == 0: + shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) From aa54ed648df7c9066597bdce7b77728bdb800998 Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 9 Jul 2024 14:48:26 +0200 Subject: [PATCH 09/16] pandas optional requirement --- heat/core/io.py | 2 +- heat/core/tests/test_io.py | 4 ++++ setup.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index c28c1f9ff..ef71138ef 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1198,7 +1198,7 @@ def load_npy_from_path( try: import pandas as pd except ImportError: - # netCDF4 support is optional + # pandas support is optional def supports_pandas() -> bool: """ Returns ``True`` if pandas is installed , ``False`` otherwise. diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index c0764ef7b..5b4f88f63 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -854,6 +854,8 @@ def delete_first_col(dataf): self.assertTrue((load_array_npy == nparray).all) self.assertTrue((load_func_array_npy == npdroparray).all) shutil.rmtree(csv_path) + else: + self.skipTest("Requires pandas") def test_load_multiple_csv_exception(self): if ht.io.supports_pandas: @@ -879,3 +881,5 @@ def test_load_multiple_csv_exception(self): ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) if ht.MPI_WORLD.rank == 0: shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) + else: + self.skipTest("Requires pandas") diff --git a/setup.py b/setup.py index 323a00d51..84e251bf6 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,6 @@ "scipy>=1.10.0", "pillow>=6.0.0", "torchvision>=0.12.0", - "pandas", ], extras_require={ "docutils": ["docutils>=0.16"], @@ -48,5 +47,6 @@ "dev": ["pre-commit>=1.18.3"], "examples": ["scikit-learn>=0.24.0", "matplotlib>=3.1.0"], "cb": ["perun>=0.2.0"], + "pandas": ["pandas>=2.2.2"], }, ) From c9cfe24603956e17c35670ab1709f9843257689d Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 9 Jul 2024 16:31:13 +0200 Subject: [PATCH 10/16] Fixed issue with pandas --- heat/core/io.py | 2 +- heat/core/tests/test_io.py | 131 ++++++++++++++++++------------------- 2 files changed, 66 insertions(+), 67 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index ef71138ef..34730e2f9 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1197,7 +1197,7 @@ def load_npy_from_path( try: import pandas as pd -except ImportError: +except ModuleNotFoundError: # pandas support is optional def supports_pandas() -> bool: """ diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 5b4f88f63..a6e0e6315 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -5,7 +5,6 @@ import time import random import shutil -import pandas as pd import fnmatch import heat as ht @@ -810,76 +809,76 @@ def test_load_npy_exception(self): os.remove(os.path.join(os.getcwd(), "heat/datasets", "float_data.npy")) def test_load_multiple_csv(self): - if ht.io.supports_pandas: - csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") - if ht.MPI_WORLD.rank == 0: - nplist = [] - npdroplist = [] - os.mkdir(csv_path) - for i in range(0, 20): - a = np.random.randint(100, size=(5)) - b = np.random.randint(100, size=(5)) - c = np.random.randint(100, size=(5)) - - data = {"A": a, "B": b, "C": c} - data2 = {"B": b, "C": c} - df = pd.DataFrame(data) - df2 = pd.DataFrame(data2) - nplist.append(df.to_numpy()) - npdroplist.append(df2.to_numpy()) - df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) - - nparray = np.concatenate(nplist) - npdroparray = np.concatenate(npdroplist) - ht.MPI_WORLD.Barrier() + if not ht.io.supports_pandas(): + self.skipTest("Requires pandas") - def delete_first_col(dataf): - dataf.drop(dataf.columns[0], axis=1, inplace=True) - return dataf + csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") + if ht.MPI_WORLD.rank == 0: + nplist = [] + npdroplist = [] + os.mkdir(csv_path) + for i in range(0, 20): + a = np.random.randint(100, size=(5)) + b = np.random.randint(100, size=(5)) + c = np.random.randint(100, size=(5)) + + data = {"A": a, "B": b, "C": c} + data2 = {"B": b, "C": c} + df = pd.DataFrame(data) # noqa F821 + df2 = pd.DataFrame(data2) # noqa F821 + nplist.append(df.to_numpy()) + npdroplist.append(df2.to_numpy()) + df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) + + nparray = np.concatenate(nplist) + npdroparray = np.concatenate(npdroplist) + ht.MPI_WORLD.Barrier() - load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) - load_func_array = ht.load_csv_from_folder( - csv_path, dtype=ht.int32, split=0, func=delete_first_col - ) - load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split=0) + def delete_first_col(dataf): + dataf.drop(dataf.columns[0], axis=1, inplace=True) + return dataf - load_array_npy = load_array.numpy() - load_func_array_npy = load_func_array.numpy() + load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) + load_func_array = ht.load_csv_from_folder( + csv_path, dtype=ht.int32, split=0, func=delete_first_col + ) + load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split=0) - self.assertIsInstance(load_array, ht.DNDarray) - self.assertEqual(load_array.dtype, ht.int32) - self.assertEqual(load_array_float.dtype, ht.float32) + load_array_npy = load_array.numpy() + load_func_array_npy = load_func_array.numpy() - if ht.MPI_WORLD.rank == 0: - self.assertTrue((load_array_npy == nparray).all) - self.assertTrue((load_func_array_npy == npdroparray).all) - shutil.rmtree(csv_path) - else: - self.skipTest("Requires pandas") + self.assertIsInstance(load_array, ht.DNDarray) + self.assertEqual(load_array.dtype, ht.int32) + self.assertEqual(load_array_float.dtype, ht.float32) - def test_load_multiple_csv_exception(self): - if ht.io.supports_pandas: - with self.assertRaises(TypeError): - ht.load_csv_from_folder(path=1, split=0) - with self.assertRaises(TypeError): - ht.load_csv_from_folder("heat/datasets", split="ABC") - with self.assertRaises(TypeError): - ht.load_csv_from_folder(path="heat/datasets", func=1) - with self.assertRaises(ValueError): - ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0) - if ht.MPI_WORLD.size > 1: - if ht.MPI_WORLD.rank == 0: - os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) - df = pd.DataFrame({"A": [0, 0, 0]}) - df.to_csv( - (os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")), - index=False, - ) - ht.MPI_WORLD.Barrier() + if ht.MPI_WORLD.rank == 0: + self.assertTrue((load_array_npy == nparray).all) + self.assertTrue((load_func_array_npy == npdroparray).all) + shutil.rmtree(csv_path) - with self.assertRaises(RuntimeError): - ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) - if ht.MPI_WORLD.rank == 0: - shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) - else: + def test_load_multiple_csv_exception(self): + if not ht.io.supports_pandas(): self.skipTest("Requires pandas") + + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path=1, split=0) + with self.assertRaises(TypeError): + ht.load_csv_from_folder("heat/datasets", split="ABC") + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path="heat/datasets", func=1) + with self.assertRaises(ValueError): + ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0) + if ht.MPI_WORLD.size > 1: + if ht.MPI_WORLD.rank == 0: + os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) + df = pd.DataFrame({"A": [0, 0, 0]}) # noqa F821 + df.to_csv( + (os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")), + index=False, + ) + ht.MPI_WORLD.Barrier() + + with self.assertRaises(RuntimeError): + ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) + if ht.MPI_WORLD.rank == 0: + shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) From 30745e4d80b06d7f65e98919a7ac65c63d7e1b86 Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Wed, 10 Jul 2024 12:45:16 +0200 Subject: [PATCH 11/16] Importing pandas when available --- heat/core/tests/test_io.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index a6e0e6315..c4f2c7b52 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -812,6 +812,8 @@ def test_load_multiple_csv(self): if not ht.io.supports_pandas(): self.skipTest("Requires pandas") + import pandas as pd + csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") if ht.MPI_WORLD.rank == 0: nplist = [] @@ -860,6 +862,8 @@ def test_load_multiple_csv_exception(self): if not ht.io.supports_pandas(): self.skipTest("Requires pandas") + import pandas as pd + with self.assertRaises(TypeError): ht.load_csv_from_folder(path=1, split=0) with self.assertRaises(TypeError): From ab0ba7b899fb4243e8ba54fc057681aa9df3eb6a Mon Sep 17 00:00:00 2001 From: Fabian Hoppe <112093564+mrfh92@users.noreply.github.com> Date: Thu, 11 Jul 2024 16:24:03 +0200 Subject: [PATCH 12/16] Update ci.yaml added pandas in the CI-matrix with options --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ab0810911..b9923f6fc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,7 +17,7 @@ jobs: - '3.10' - 3.11 mpi: [ 'openmpi' ] - install-options: [ '.', '.[hdf5,netcdf]' ] + install-options: [ '.', '.[hdf5,netcdf,pandas]' ] pytorch-version: - 'torch==1.12.1+cpu torchvision==0.13.1+cpu torchaudio==0.12.1' - 'torch==1.13.1+cpu torchvision==0.14.1+cpu torchaudio==0.13.1' From 7d9bdcc8304eee07e0857bee9a0c33fb626f663b Mon Sep 17 00:00:00 2001 From: Fabian Hoppe <112093564+mrfh92@users.noreply.github.com> Date: Thu, 11 Jul 2024 16:41:03 +0200 Subject: [PATCH 13/16] Update setup.py support also older pandas --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 84e251bf6..9ec1507c3 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,6 @@ "dev": ["pre-commit>=1.18.3"], "examples": ["scikit-learn>=0.24.0", "matplotlib>=3.1.0"], "cb": ["perun>=0.2.0"], - "pandas": ["pandas>=2.2.2"], + "pandas": ["pandas>=1.4"], }, ) From a79c3cdc39903a205e18bfe30c7e015fd39b121d Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 16 Jul 2024 14:57:35 +0200 Subject: [PATCH 14/16] Added a ht.MPI_WORLD.Barrier() where needed --- heat/core/tests/test_io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index c4f2c7b52..aa5f62222 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -884,5 +884,6 @@ def test_load_multiple_csv_exception(self): with self.assertRaises(RuntimeError): ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) + ht.MPI_WORLD.Barrier() if ht.MPI_WORLD.rank == 0: shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) From 5ac77b851512a69a28594baff52be6503e899117 Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Mon, 12 Aug 2024 14:46:40 +0200 Subject: [PATCH 15/16] Fixed unbalanced Loading for csv. files --- heat/core/io.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index a1b9e9543..427c7b8d4 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1264,10 +1264,12 @@ def load_csv_from_folder( raise RuntimeError("Number of processes can't exceed number of files") rank = MPI_WORLD.rank - n_for_procs = n_files // process_number - idx = rank * n_for_procs - if rank + 1 == process_number: - n_for_procs += n_files % process_number + if rank < (n_files % process_number): + n_for_procs = n_files // process_number + 1 + idx = rank * n_for_procs + else: + n_for_procs = n_files // process_number + idx = rank * n_for_procs + (n_files % process_number) array_list = [ ( (func(pd.read_csv(path + "/" + element))).to_numpy() From a0b037a09d4618c3e8d802df14fc445563e34e63 Mon Sep 17 00:00:00 2001 From: "Nguyen Xuan, Tu" <tunguyenxuan152@gmail.com> Date: Tue, 13 Aug 2024 14:21:27 +0200 Subject: [PATCH 16/16] Tests with uneven amount of files --- heat/core/tests/test_io.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index aa5f62222..db1d2af8e 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -748,7 +748,7 @@ def test_load_npy_int(self): # testing for int arrays if ht.MPI_WORLD.rank == 0: crea_array = [] - for i in range(0, 20): + for i in range(0, ht.MPI_WORLD.size * 5): x = np.random.randint(1000, size=(random.randint(0, 30), 6, 11)) np.save(os.path.join(os.getcwd(), "heat/datasets", "int_data") + str(i), x) crea_array.append(x) @@ -771,7 +771,7 @@ def test_load_npy_float(self): # testing for float arrays and split dimension other than 0 if ht.MPI_WORLD.rank == 0: crea_array = [] - for i in range(0, 20): + for i in range(0, ht.MPI_WORLD.size * 5 + 1): x = np.random.rand(2, random.randint(1, 10), 11) np.save(os.path.join(os.getcwd(), "heat/datasets", "float_data") + str(i), x) crea_array.append(x) @@ -819,7 +819,7 @@ def test_load_multiple_csv(self): nplist = [] npdroplist = [] os.mkdir(csv_path) - for i in range(0, 20): + for i in range(0, ht.MPI_WORLD.size * 5 + 1): a = np.random.randint(100, size=(5)) b = np.random.randint(100, size=(5)) c = np.random.randint(100, size=(5))