diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index daec3fac6..8d6194d91 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,7 +17,7 @@ jobs: - 3.11 - 3.12 mpi: [ 'openmpi' ] - install-options: [ '.', '.[hdf5,netcdf]' ] + install-options: [ '.', '.[hdf5,netcdf,pandas]' ] pytorch-version: - 'torch==2.0.1 torchvision==0.15.2 torchaudio==2.0.2' - 'torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2' diff --git a/heat/core/io.py b/heat/core/io.py index 3135ee18d..427c7b8d4 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1196,3 +1196,90 @@ def load_npy_from_path( x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) return x + + +try: + import pandas as pd +except ModuleNotFoundError: + # pandas support is optional + def supports_pandas() -> bool: + """ + Returns ``True`` if pandas is installed , ``False`` otherwise. + """ + return False + +else: + # add functions to visible exports + __all__.extend(["load_csv_from_folder"]) + + def supports_pandas() -> bool: + """ + Returns ``True`` if pandas is installed, ``False`` otherwise. + """ + return True + + def load_csv_from_folder( + path: str, + dtype: datatype = types.int32, + split: int = 0, + device: Optional[str] = None, + comm: Optional[Communication] = None, + func: Optional[callable] = None, + ) -> DNDarray: + """ + Loads multiple .csv files into one DNDarray which will be returned. The data will be concatenated along the split axis provided as input. + + Parameters + ---------- + path : str + Path to the directory in which .csv-files are located. + dtype : datatype, optional + Data type of the resulting array. + split : int + Along which axis the loaded arrays should be concatenated. + device : str, optional + The device id on which to place the data, defaults to globally set default device. + comm : Communication, optional + The communication to use for the data distribution, default is 'heat.MPI_WORLD' + func : pandas.DataFrame, optional + The function the files have to go through before being added to the array. + """ + if not isinstance(path, str): + raise TypeError(f"path must be str, not {type(path)}") + elif split is not None and not isinstance(split, int): + raise TypeError(f"split must be None or int, not {type(split)}") + elif (func is not None) and not callable(func): + raise TypeError("func needs to be a callable function or None") + + process_number = MPI_WORLD.size + file_list = [] + for file in os.listdir(path): + if fnmatch.fnmatch(file, "*.csv"): + file_list.append(file) + n_files = len(file_list) + + if n_files == 0: + raise ValueError("No .csv Files were found") + if (n_files < process_number) and (process_number > 1): + raise RuntimeError("Number of processes can't exceed number of files") + + rank = MPI_WORLD.rank + if rank < (n_files % process_number): + n_for_procs = n_files // process_number + 1 + idx = rank * n_for_procs + else: + n_for_procs = n_files // process_number + idx = rank * n_for_procs + (n_files % process_number) + array_list = [ + ( + (func(pd.read_csv(path + "/" + element))).to_numpy() + if ((func is not None) and (callable(func))) + else (pd.read_csv(path + "/" + element)).to_numpy() + ) + for element in file_list[idx : idx + n_for_procs] + ] + + larray = np.concatenate(array_list, split) + larray = torch.from_numpy(larray) + x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm) + return x diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 5b8ece13b..db1d2af8e 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -2,8 +2,9 @@ import os import torch import tempfile -import random import time +import random +import shutil import fnmatch import heat as ht @@ -747,13 +748,12 @@ def test_load_npy_int(self): # testing for int arrays if ht.MPI_WORLD.rank == 0: crea_array = [] - for i in range(0, 20): + for i in range(0, ht.MPI_WORLD.size * 5): x = np.random.randint(1000, size=(random.randint(0, 30), 6, 11)) np.save(os.path.join(os.getcwd(), "heat/datasets", "int_data") + str(i), x) crea_array.append(x) int_array = np.concatenate(crea_array) ht.MPI_WORLD.Barrier() - load_array = ht.load_npy_from_path( os.path.join(os.getcwd(), "heat/datasets"), dtype=ht.int32, split=0 ) @@ -771,7 +771,7 @@ def test_load_npy_float(self): # testing for float arrays and split dimension other than 0 if ht.MPI_WORLD.rank == 0: crea_array = [] - for i in range(0, 20): + for i in range(0, ht.MPI_WORLD.size * 5 + 1): x = np.random.rand(2, random.randint(1, 10), 11) np.save(os.path.join(os.getcwd(), "heat/datasets", "float_data") + str(i), x) crea_array.append(x) @@ -807,3 +807,83 @@ def test_load_npy_exception(self): ht.MPI_WORLD.Barrier() if ht.MPI_WORLD.rank == 0: os.remove(os.path.join(os.getcwd(), "heat/datasets", "float_data.npy")) + + def test_load_multiple_csv(self): + if not ht.io.supports_pandas(): + self.skipTest("Requires pandas") + + import pandas as pd + + csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests") + if ht.MPI_WORLD.rank == 0: + nplist = [] + npdroplist = [] + os.mkdir(csv_path) + for i in range(0, ht.MPI_WORLD.size * 5 + 1): + a = np.random.randint(100, size=(5)) + b = np.random.randint(100, size=(5)) + c = np.random.randint(100, size=(5)) + + data = {"A": a, "B": b, "C": c} + data2 = {"B": b, "C": c} + df = pd.DataFrame(data) # noqa F821 + df2 = pd.DataFrame(data2) # noqa F821 + nplist.append(df.to_numpy()) + npdroplist.append(df2.to_numpy()) + df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False) + + nparray = np.concatenate(nplist) + npdroparray = np.concatenate(npdroplist) + ht.MPI_WORLD.Barrier() + + def delete_first_col(dataf): + dataf.drop(dataf.columns[0], axis=1, inplace=True) + return dataf + + load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0) + load_func_array = ht.load_csv_from_folder( + csv_path, dtype=ht.int32, split=0, func=delete_first_col + ) + load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split=0) + + load_array_npy = load_array.numpy() + load_func_array_npy = load_func_array.numpy() + + self.assertIsInstance(load_array, ht.DNDarray) + self.assertEqual(load_array.dtype, ht.int32) + self.assertEqual(load_array_float.dtype, ht.float32) + + if ht.MPI_WORLD.rank == 0: + self.assertTrue((load_array_npy == nparray).all) + self.assertTrue((load_func_array_npy == npdroparray).all) + shutil.rmtree(csv_path) + + def test_load_multiple_csv_exception(self): + if not ht.io.supports_pandas(): + self.skipTest("Requires pandas") + + import pandas as pd + + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path=1, split=0) + with self.assertRaises(TypeError): + ht.load_csv_from_folder("heat/datasets", split="ABC") + with self.assertRaises(TypeError): + ht.load_csv_from_folder(path="heat/datasets", func=1) + with self.assertRaises(ValueError): + ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0) + if ht.MPI_WORLD.size > 1: + if ht.MPI_WORLD.rank == 0: + os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) + df = pd.DataFrame({"A": [0, 0, 0]}) # noqa F821 + df.to_csv( + (os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")), + index=False, + ) + ht.MPI_WORLD.Barrier() + + with self.assertRaises(RuntimeError): + ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0) + ht.MPI_WORLD.Barrier() + if ht.MPI_WORLD.rank == 0: + shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests")) diff --git a/setup.py b/setup.py index 78931ab36..fad920a5c 100644 --- a/setup.py +++ b/setup.py @@ -47,5 +47,6 @@ "dev": ["pre-commit>=1.18.3"], "examples": ["scikit-learn>=0.24.0", "matplotlib>=3.1.0"], "cb": ["perun>=0.2.0"], + "pandas": ["pandas>=1.4"], }, )