Skip to content

Commit

Permalink
Merge pull request #186 from mattjala/multi_selections
Browse files Browse the repository at this point in the history
Support unique selections on each dset in a MultiManager
  • Loading branch information
mattjala authored Apr 25, 2024
2 parents a54d95a + 5c70f56 commit 0cc166e
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 6 deletions.
41 changes: 41 additions & 0 deletions docs/high/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,47 @@ The dtype of the dataset can be accessed via ``<dset>.dtype`` as per normal.
As empty datasets cannot be sliced, some methods of datasets such as
``read_direct`` will raise a ``TypeError`` exception if used on a empty dataset.

Reading and Writing to Multiple Datasets
------------------------------------------------------------
The MultiManager interface enables reading and writing to multiple datasets
in parallel. A MultiManager requires a list of datasets to operate on, and then accepts
slice arguments for reading and writing like a typical Dataset.

Reading datasets through a MultiManager returns a list where each entry is an array containing
the values read from the corresponding data.

>>> mm = MultiManager(datasets=[dset1, dset2, dset3])
>>> data = mm[...] # read all elements from each dataset
>>> data[0] # data read from dset1
[0, 1, 2, 3]
>>> data[1] # data read from dset2
[0, 2, 3, 4]

Writing to datasets through a MultiManager requires a list where each entry is an array containing
the values to be written to each dataset.

>>> mm[0] = [[1], [2], [3]] # write a different element to index 0 in each dataset
>>> data = mm[...]
>>> data[0]
[1, 1, 2, 3]
>>> data[1]
[2, 2, 3, 4]

Multiple selections can be provided to read or write to a different region on each dataset in the MultiManager.

>>> selections = [np.s_[0:2], np.s_[1:4], np.s_[2:4]]
>>> data = mm[selections]
>>> data[0]
[1, 1]
>>> data[1]
[2, 3, 4]
>>> mm[selections] == [[0, 1], [4, 5, 6], [7, 8]]
>>> data = mm[...]
>>> data[0]
[0, 1, 2, 3]
>>> data[1]
[2, 4, 5, 6]

Reference
---------

Expand Down
34 changes: 28 additions & 6 deletions h5pyd/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1814,11 +1814,22 @@ def __getitem__(self, args):
if next_port > high_port:
next_port = low_port

# TODO: Handle the case where some or all datasets share an HTTPConn object
# TODO: Handle the case where some or all datasets share an HTTPConn object

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args)) for i in range(len(self.datasets))]
# Unwrap one-selection list
if (isinstance(args, list) and len(args) == 1):
args = args[0]

if not isinstance(args, list):
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args)) for i in range(len(self.datasets))]
elif isinstance(args, list) and len(args) == len(self.datasets):
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args[i])) for i in range(len(self.datasets))]
else:
raise ValueError("Number of selections must be one or equal number of datasets")

ret_data = [None] * len(self.datasets)

for future in as_completed(read_futures):
Expand Down Expand Up @@ -1853,7 +1864,8 @@ def __setitem__(self, args, vals):
raise ValueError("Malformed port range specification; must be sequential ports")

except Exception as e:
self.log.debug(f"{e}: Defaulting Number of SNs to 1")
msg = f"{e}: Defaulting Number of SN_COREs to 1"
self.log.debug(msg)
num_endpoints = 1

# TODO: Handle the case where some or all datasets share an HTTPConn object
Expand All @@ -1872,8 +1884,18 @@ def __setitem__(self, args, vals):
next_port = low_port

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]
# Unwrap one-selection list
if (isinstance(args, list) and len(args) == 1):
args = args[0]

if not isinstance(args, list):
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]
elif isinstance(args, list) and len(args) == len(self.datasets):
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args[i], vals[i])) for i in range(len(self.datasets))]
else:
raise ValueError("Number of selections must be one or equal to number of datasets")

for future in as_completed(write_futures):
try:
Expand Down
63 changes: 63 additions & 0 deletions test/hl/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,69 @@ def test_multi_write_mixed_shapes(self):
out = self.f["data" + str(i)][...]
np.testing.assert_array_equal(out[sel_idx, sel_idx], data_in + i)

def test_multi_selection_rw(self):
"""
Test reading and writing a unique selection in each dataset
"""
shape = (10, 10, 10)
count = 3
dt = np.int32

# Create datasets
data_in = np.reshape(np.arange(np.prod(shape)), shape)
data_in_original = data_in.copy()
datasets = []

for i in range(count):
dset = self.f.create_dataset("data" + str(i), shape=shape,
dtype=dt, data=data_in)
datasets.append(dset)

mm = MultiManager(datasets=datasets)

# Selections to read from
sel = [np.s_[0:10, 0:10, 0:10], np.s_[0:5, 5:10, 1:4:2], np.s_[4, 5, 6]]
data_out = mm[sel]

for i in range(count):
np.testing.assert_array_equal(data_out[i], data_in[sel[i]])

# If selection list has only a single element, apply it to all dsets
sel = [np.s_[0:10, 0:10, 0:10]]
data_out = mm[sel[0]]

for d in data_out:
np.testing.assert_array_equal(d, data_in[sel[0]])

# Selections to write to
sel = [np.s_[0:10, 0:10, 0:10], np.s_[0:5, 0:5, 0:5], np.s_[0, 0, 0]]
data_in = [np.zeros_like(data_in), np.ones_like(data_in), np.full_like(data_in, 2)]
mm[sel] = data_in

for i in range(count):
np.testing.assert_array_equal(self.f["data" + str(i)][sel[i]], data_in[i][sel[i]])

# Check that unselected regions are unmodified
np.testing.assert_array_equal(self.f["data1"][5:, 5:, 5:], data_in_original[5:, 5:, 5:])
np.testing.assert_array_equal(self.f["data2"][1:, 1:, 1:], data_in_original[1:, 1:, 1:])

# Save for later comparison
data_in_original = mm[...]

# If selection list has only a single element, apply it to all dsets
sel = [np.s_[0:6, 0:6, 0:6]]
data_in = np.full(shape, 3, dtype=dt)
mm[sel] = [data_in[sel[0]]] * count

for i in range(count):
np.testing.assert_array_equal(self.f["data" + str(i)][sel[0]], data_in[sel[0]])

# Check that unselected regions are unmodified
data_out = mm[...]

for i in range(count):
np.testing.assert_array_equal(data_out[i][6:, 6:, 6:], data_in_original[i][6:, 6:, 6:])

"""
TBD - Field selection in h5pyd seems to work slightly different than in h5py
def test_multi_write_field_selection(self):
Expand Down

0 comments on commit 0cc166e

Please sign in to comment.