Skip to content

Commit

Permalink
Merge pull request #1551 from helmholtz-analytics/features/1550-Exten…
Browse files Browse the repository at this point in the history
…sion_of_load_npy_from_path_to_csv-fIles_with_same_functionality

Features/1550 extension of load npy from path to csv-files with same functionality
  • Loading branch information
mrfh92 authored Aug 29, 2024
2 parents 9d3af11 + 956e3d0 commit cbdf49a
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- 3.11
- 3.12
mpi: [ 'openmpi' ]
install-options: [ '.', '.[hdf5,netcdf]' ]
install-options: [ '.', '.[hdf5,netcdf,pandas]' ]
pytorch-version:
- 'torch==2.0.1 torchvision==0.15.2 torchaudio==2.0.2'
- 'torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2'
Expand Down
87 changes: 87 additions & 0 deletions heat/core/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,3 +1196,90 @@ def load_npy_from_path(

x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm)
return x


try:
import pandas as pd
except ModuleNotFoundError:
# pandas support is optional
def supports_pandas() -> bool:
"""
Returns ``True`` if pandas is installed , ``False`` otherwise.
"""
return False

else:
# add functions to visible exports
__all__.extend(["load_csv_from_folder"])

def supports_pandas() -> bool:
"""
Returns ``True`` if pandas is installed, ``False`` otherwise.
"""
return True

def load_csv_from_folder(
path: str,
dtype: datatype = types.int32,
split: int = 0,
device: Optional[str] = None,
comm: Optional[Communication] = None,
func: Optional[callable] = None,
) -> DNDarray:
"""
Loads multiple .csv files into one DNDarray which will be returned. The data will be concatenated along the split axis provided as input.
Parameters
----------
path : str
Path to the directory in which .csv-files are located.
dtype : datatype, optional
Data type of the resulting array.
split : int
Along which axis the loaded arrays should be concatenated.
device : str, optional
The device id on which to place the data, defaults to globally set default device.
comm : Communication, optional
The communication to use for the data distribution, default is 'heat.MPI_WORLD'
func : pandas.DataFrame, optional
The function the files have to go through before being added to the array.
"""
if not isinstance(path, str):
raise TypeError(f"path must be str, not {type(path)}")
elif split is not None and not isinstance(split, int):
raise TypeError(f"split must be None or int, not {type(split)}")
elif (func is not None) and not callable(func):
raise TypeError("func needs to be a callable function or None")

process_number = MPI_WORLD.size
file_list = []
for file in os.listdir(path):
if fnmatch.fnmatch(file, "*.csv"):
file_list.append(file)
n_files = len(file_list)

if n_files == 0:
raise ValueError("No .csv Files were found")
if (n_files < process_number) and (process_number > 1):
raise RuntimeError("Number of processes can't exceed number of files")

rank = MPI_WORLD.rank
if rank < (n_files % process_number):
n_for_procs = n_files // process_number + 1
idx = rank * n_for_procs
else:
n_for_procs = n_files // process_number
idx = rank * n_for_procs + (n_files % process_number)
array_list = [
(
(func(pd.read_csv(path + "/" + element))).to_numpy()
if ((func is not None) and (callable(func)))
else (pd.read_csv(path + "/" + element)).to_numpy()
)
for element in file_list[idx : idx + n_for_procs]
]

larray = np.concatenate(array_list, split)
larray = torch.from_numpy(larray)
x = factories.array(larray, dtype=dtype, device=device, is_split=split, comm=comm)
return x
88 changes: 84 additions & 4 deletions heat/core/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import os
import torch
import tempfile
import random
import time
import random
import shutil
import fnmatch

import heat as ht
Expand Down Expand Up @@ -747,13 +748,12 @@ def test_load_npy_int(self):
# testing for int arrays
if ht.MPI_WORLD.rank == 0:
crea_array = []
for i in range(0, 20):
for i in range(0, ht.MPI_WORLD.size * 5):
x = np.random.randint(1000, size=(random.randint(0, 30), 6, 11))
np.save(os.path.join(os.getcwd(), "heat/datasets", "int_data") + str(i), x)
crea_array.append(x)
int_array = np.concatenate(crea_array)
ht.MPI_WORLD.Barrier()

load_array = ht.load_npy_from_path(
os.path.join(os.getcwd(), "heat/datasets"), dtype=ht.int32, split=0
)
Expand All @@ -771,7 +771,7 @@ def test_load_npy_float(self):
# testing for float arrays and split dimension other than 0
if ht.MPI_WORLD.rank == 0:
crea_array = []
for i in range(0, 20):
for i in range(0, ht.MPI_WORLD.size * 5 + 1):
x = np.random.rand(2, random.randint(1, 10), 11)
np.save(os.path.join(os.getcwd(), "heat/datasets", "float_data") + str(i), x)
crea_array.append(x)
Expand Down Expand Up @@ -807,3 +807,83 @@ def test_load_npy_exception(self):
ht.MPI_WORLD.Barrier()
if ht.MPI_WORLD.rank == 0:
os.remove(os.path.join(os.getcwd(), "heat/datasets", "float_data.npy"))

def test_load_multiple_csv(self):
if not ht.io.supports_pandas():
self.skipTest("Requires pandas")

import pandas as pd

csv_path = os.path.join(os.getcwd(), "heat/datasets/csv_tests")
if ht.MPI_WORLD.rank == 0:
nplist = []
npdroplist = []
os.mkdir(csv_path)
for i in range(0, ht.MPI_WORLD.size * 5 + 1):
a = np.random.randint(100, size=(5))
b = np.random.randint(100, size=(5))
c = np.random.randint(100, size=(5))

data = {"A": a, "B": b, "C": c}
data2 = {"B": b, "C": c}
df = pd.DataFrame(data) # noqa F821
df2 = pd.DataFrame(data2) # noqa F821
nplist.append(df.to_numpy())
npdroplist.append(df2.to_numpy())
df.to_csv((os.path.join(csv_path, f"csv_test_{i}.csv")), index=False)

nparray = np.concatenate(nplist)
npdroparray = np.concatenate(npdroplist)
ht.MPI_WORLD.Barrier()

def delete_first_col(dataf):
dataf.drop(dataf.columns[0], axis=1, inplace=True)
return dataf

load_array = ht.load_csv_from_folder(csv_path, dtype=ht.int32, split=0)
load_func_array = ht.load_csv_from_folder(
csv_path, dtype=ht.int32, split=0, func=delete_first_col
)
load_array_float = ht.load_csv_from_folder(csv_path, dtype=ht.float32, split=0)

load_array_npy = load_array.numpy()
load_func_array_npy = load_func_array.numpy()

self.assertIsInstance(load_array, ht.DNDarray)
self.assertEqual(load_array.dtype, ht.int32)
self.assertEqual(load_array_float.dtype, ht.float32)

if ht.MPI_WORLD.rank == 0:
self.assertTrue((load_array_npy == nparray).all)
self.assertTrue((load_func_array_npy == npdroparray).all)
shutil.rmtree(csv_path)

def test_load_multiple_csv_exception(self):
if not ht.io.supports_pandas():
self.skipTest("Requires pandas")

import pandas as pd

with self.assertRaises(TypeError):
ht.load_csv_from_folder(path=1, split=0)
with self.assertRaises(TypeError):
ht.load_csv_from_folder("heat/datasets", split="ABC")
with self.assertRaises(TypeError):
ht.load_csv_from_folder(path="heat/datasets", func=1)
with self.assertRaises(ValueError):
ht.load_csv_from_folder(path="heat", dtype=ht.int64, split=0)
if ht.MPI_WORLD.size > 1:
if ht.MPI_WORLD.rank == 0:
os.mkdir(os.path.join(os.getcwd(), "heat/datasets/csv_tests"))
df = pd.DataFrame({"A": [0, 0, 0]}) # noqa F821
df.to_csv(
(os.path.join(os.getcwd(), "heat/datasets/csv_tests", "fail.csv")),
index=False,
)
ht.MPI_WORLD.Barrier()

with self.assertRaises(RuntimeError):
ht.load_csv_from_folder("heat/datasets/csv_tests", dtype=ht.int64, split=0)
ht.MPI_WORLD.Barrier()
if ht.MPI_WORLD.rank == 0:
shutil.rmtree(os.path.join(os.getcwd(), "heat/datasets/csv_tests"))
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@
"dev": ["pre-commit>=1.18.3"],
"examples": ["scikit-learn>=0.24.0", "matplotlib>=3.1.0"],
"cb": ["perun>=0.2.0"],
"pandas": ["pandas>=1.4"],
},
)

0 comments on commit cbdf49a

Please sign in to comment.