Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unique selections on each dset in a MultiManager #186

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/high/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,47 @@ The dtype of the dataset can be accessed via ``<dset>.dtype`` as per normal.
As empty datasets cannot be sliced, some methods of datasets such as
``read_direct`` will raise a ``TypeError`` exception if used on a empty dataset.

Reading and Writing to Multiple Datasets
------------------------------------------------------------
The MultiManager interface enables reading and writing to multiple datasets
in parallel. A MultiManager requires a list of datasets to operate on, and then accepts
slice arguments for reading and writing like a typical Dataset.

Reading datasets through a MultiManager returns a list where each entry is an array containing
the values read from the corresponding data.

>>> mm = MultiManager(datasets=[dset1, dset2, dset3])
>>> data = mm[...] # read all elements from each dataset
>>> data[0] # data read from dset1
[0, 1, 2, 3]
>>> data[1] # data read from dset2
[0, 2, 3, 4]

Writing to datasets through a MultiManager requires a list where each entry is an array containing
the values to be written to each dataset.

>>> mm[0] = [[1], [2], [3]] # write a different element to index 0 in each dataset
>>> data = mm[...]
>>> data[0]
[1, 1, 2, 3]
>>> data[1]
[2, 2, 3, 4]

Multiple selections can be provided to read or write to a different region on each dataset in the MultiManager.

>>> selections = [np.s_[0:2], np.s_[1:4], np.s_[2:4]]
>>> data = mm[selections]
>>> data[0]
[1, 1]
>>> data[1]
[2, 3, 4]
>>> mm[selections] == [[0, 1], [4, 5, 6], [7, 8]]
>>> data = mm[...]
>>> data[0]
[0, 1, 2, 3]
>>> data[1]
[2, 4, 5, 6]

Reference
---------

Expand Down
34 changes: 28 additions & 6 deletions h5pyd/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1809,11 +1809,22 @@ def __getitem__(self, args):
if next_port > high_port:
next_port = low_port

# TODO: Handle the case where some or all datasets share an HTTPConn object
# TODO: Handle the case where some or all datasets share an HTTPConn object

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args)) for i in range(len(self.datasets))]
# Unwrap one-selection list
if (isinstance(args, list) and len(args) == 1):
args = args[0]

if not isinstance(args, list):
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args)) for i in range(len(self.datasets))]
elif isinstance(args, list) and len(args) == len(self.datasets):
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args[i])) for i in range(len(self.datasets))]
else:
raise ValueError("Number of selections must be one or equal number of datasets")

ret_data = [None] * len(self.datasets)

for future in as_completed(read_futures):
Expand Down Expand Up @@ -1848,7 +1859,8 @@ def __setitem__(self, args, vals):
raise ValueError("Malformed port range specification; must be sequential ports")

except Exception as e:
print(f"{e}: Defaulting Number of SNs to 1")
msg = f"{e}: Defaulting Number of SN_COREs to 1"
self.log.warning(msg)
num_endpoints = 1

# TODO: Handle the case where some or all datasets share an HTTPConn object
Expand All @@ -1867,8 +1879,18 @@ def __setitem__(self, args, vals):
next_port = low_port

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]
# Unwrap one-selection list
if (isinstance(args, list) and len(args) == 1):
args = args[0]

if not isinstance(args, list):
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]
elif isinstance(args, list) and len(args) == len(self.datasets):
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args[i], vals[i])) for i in range(len(self.datasets))]
else:
raise ValueError("Number of selections must be one or equal to number of datasets")

for future in as_completed(write_futures):
try:
Expand Down
63 changes: 63 additions & 0 deletions test/hl/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,69 @@ def test_multi_write_mixed_shapes(self):
out = self.f["data" + str(i)][...]
np.testing.assert_array_equal(out[sel_idx, sel_idx], data_in + i)

def test_multi_selection_rw(self):
"""
Test reading and writing a unique selection in each dataset
"""
shape = (10, 10, 10)
count = 3
dt = np.int32

# Create datasets
data_in = np.reshape(np.arange(np.prod(shape)), shape)
data_in_original = data_in.copy()
datasets = []

for i in range(count):
dset = self.f.create_dataset("data" + str(i), shape=shape,
dtype=dt, data=data_in)
datasets.append(dset)

mm = MultiManager(datasets=datasets)

# Selections to read from
sel = [np.s_[0:10, 0:10, 0:10], np.s_[0:5, 5:10, 1:4:2], np.s_[4, 5, 6]]
data_out = mm[sel]

for i in range(count):
np.testing.assert_array_equal(data_out[i], data_in[sel[i]])

# If selection list has only a single element, apply it to all dsets
sel = [np.s_[0:10, 0:10, 0:10]]
data_out = mm[sel[0]]

for d in data_out:
np.testing.assert_array_equal(d, data_in[sel[0]])

# Selections to write to
sel = [np.s_[0:10, 0:10, 0:10], np.s_[0:5, 0:5, 0:5], np.s_[0, 0, 0]]
data_in = [np.zeros_like(data_in), np.ones_like(data_in), np.full_like(data_in, 2)]
mm[sel] = data_in

for i in range(count):
np.testing.assert_array_equal(self.f["data" + str(i)][sel[i]], data_in[i][sel[i]])

# Check that unselected regions are unmodified
np.testing.assert_array_equal(self.f["data1"][5:, 5:, 5:], data_in_original[5:, 5:, 5:])
np.testing.assert_array_equal(self.f["data2"][1:, 1:, 1:], data_in_original[1:, 1:, 1:])

# Save for later comparison
data_in_original = mm[...]

# If selection list has only a single element, apply it to all dsets
sel = [np.s_[0:6, 0:6, 0:6]]
data_in = np.full(shape, 3, dtype=dt)
mm[sel] = [data_in[sel[0]]] * count

for i in range(count):
np.testing.assert_array_equal(self.f["data" + str(i)][sel[0]], data_in[sel[0]])

# Check that unselected regions are unmodified
data_out = mm[...]

for i in range(count):
np.testing.assert_array_equal(data_out[i][6:, 6:, 6:], data_in_original[i][6:, 6:, 6:])

"""
TBD - Field selection in h5pyd seems to work slightly different than in h5py
def test_multi_write_field_selection(self):
Expand Down