Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unique selections on each dset in a MultiManager #186

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/high/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,47 @@ The dtype of the dataset can be accessed via ``<dset>.dtype`` as per normal.
As empty datasets cannot be sliced, some methods of datasets such as
``read_direct`` will raise a ``TypeError`` exception if used on a empty dataset.

Reading and Writing to Multiple Datasets
------------------------------------------------------------
The MultiManager interface enables reading and writing to multiple datasets
in parallel. A MultiManager requires a list of datasets to operate on, and then accepts
slice arguments for reading and writing like a typical Dataset.

Reading datasets through a MultiManager returns a list where each entry is an array containing
the values read from the corresponding data.

>>> mm = MultiManager(datasets=[dset1, dset2, dset3])
>>> data = mm[...] # read all elements from each dataset
>>> data[0] # data read from dset1
[0, 1, 2, 3]
>>> data[1] # data read from dset2
[0, 2, 3, 4]

Writing to datasets through a MultiManager requires a list where each entry is an array containing
the values to be written to each dataset.

>>> mm[0] = [[1], [2], [3]] # write a different element to index 0 in each dataset
>>> data = mm[...]
>>> data[0]
[1, 1, 2, 3]
>>> data[1]
[2, 2, 3, 4]

Multiple selections can be provided to read or write to a different region on each dataset in the MultiManager.

>>> selections = [np.s_[0:2], np.s_[1:4], np.s_[2:4]]
>>> data = mm[selections]
>>> data[0]
[1, 1]
>>> data[1]
[2, 3, 4]
>>> mm[selections] == [[0, 1], [4, 5, 6], [7, 8]]
>>> data = mm[...]
>>> data[0]
[0, 1, 2, 3]
>>> data[1]
[2, 4, 5, 6]

Reference
---------

Expand Down
2 changes: 1 addition & 1 deletion h5pyd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ._hl.files import File, is_hdf5
from ._hl.folders import Folder
from ._hl.group import Group, SoftLink, ExternalLink, UserDefinedLink, HardLink
from ._hl.dataset import Dataset
from ._hl.dataset import Dataset, MultiManager
from ._hl.table import Table
from ._hl.datatype import Datatype
from ._hl.attrs import AttributeManager
Expand Down
43 changes: 35 additions & 8 deletions h5pyd/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import base64
import numpy
import os
import logging
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

Expand Down Expand Up @@ -1741,10 +1742,14 @@ class MultiManager():
# Avoid overtaxing HSDS
max_workers = 16

def __init__(self, datasets=None):
def __init__(self, datasets=None, logger=None):
if (datasets is None) or (len(datasets) == 0):
raise ValueError("MultiManager requires non-empty list of datasets")
self.datasets = datasets
if logger is None:
self.log = logging
else:
self.log = logging.getLogger(logger)

def read_dset_tl(self, args):
"""
Expand Down Expand Up @@ -1793,7 +1798,7 @@ def __getitem__(self, args):

except Exception as e:
msg = f"{e}: Defaulting Number of SN_COREs to 1"
self.log.warning(msg)
self.log.debug(msg)
num_endpoints = 1

if (num_endpoints > 1):
Expand All @@ -1809,11 +1814,22 @@ def __getitem__(self, args):
if next_port > high_port:
next_port = low_port

# TODO: Handle the case where some or all datasets share an HTTPConn object
# TODO: Handle the case where some or all datasets share an HTTPConn object

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args)) for i in range(len(self.datasets))]
# Unwrap one-selection list
if (isinstance(args, list) and len(args) == 1):
args = args[0]

if not isinstance(args, list):
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args)) for i in range(len(self.datasets))]
elif isinstance(args, list) and len(args) == len(self.datasets):
read_futures = [executor.submit(self.read_dset_tl,
(self.datasets[i], i, args[i])) for i in range(len(self.datasets))]
else:
raise ValueError("Number of selections must be one or equal number of datasets")

ret_data = [None] * len(self.datasets)

for future in as_completed(read_futures):
Expand Down Expand Up @@ -1848,7 +1864,8 @@ def __setitem__(self, args, vals):
raise ValueError("Malformed port range specification; must be sequential ports")

except Exception as e:
print(f"{e}: Defaulting Number of SNs to 1")
msg = f"{e}: Defaulting Number of SN_COREs to 1"
self.log.debug(msg)
num_endpoints = 1

# TODO: Handle the case where some or all datasets share an HTTPConn object
Expand All @@ -1867,8 +1884,18 @@ def __setitem__(self, args, vals):
next_port = low_port

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]
# Unwrap one-selection list
if (isinstance(args, list) and len(args) == 1):
args = args[0]

if not isinstance(args, list):
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]
elif isinstance(args, list) and len(args) == len(self.datasets):
write_futures = [executor.submit(self.write_dset_tl,
(self.datasets[i], i, args[i], vals[i])) for i in range(len(self.datasets))]
else:
raise ValueError("Number of selections must be one or equal to number of datasets")

for future in as_completed(write_futures):
try:
Expand Down
44 changes: 30 additions & 14 deletions test/hl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from __future__ import absolute_import

import sys
import os
import os.path as op
import tempfile
Expand All @@ -39,6 +38,32 @@
del testfile


def getTestFileName(basename, subfolder=None):
"""
Get filepath for a test case given a testname
"""

if config.get("use_h5py"):
filename = "out"
if not op.isdir(filename):
os.mkdir(filename)
if subfolder:
filename = op.join(filename, subfolder)
if not op.isdir(filename):
os.mkdir(filename)
filename = op.join(filename, f"{basename}.h5")
else:
if "H5PYD_TEST_FOLDER" in os.environ:
filename = os.environ["H5PYD_TEST_FOLDER"]
else:
# default to the root folder
filename = "/"
if subfolder:
filename = op.join(filename, subfolder)
filename = op.join(filename, f"{basename}.h5")
return filename


class TestCase(ut.TestCase):

"""
Expand Down Expand Up @@ -201,23 +226,14 @@ def assertNumpyBehavior(self, dset, arr, s):
with self.assertRaises(exc):
dset[s]

def getFileName(self, basename):
def getFileName(self, basename, subfolder=None):
"""
Get filepath for a test case given a testname
"""

if config.get("use_h5py"):
if not op.isdir("out"):
os.mkdir("out")
filename = "out/" + basename + ".h5"
else:
if "H5PYD_TEST_FOLDER" in os.environ:
domain = os.environ["H5PYD_TEST_FOLDER"]
else:
# default to the root folder
domain = "/"
filename = op.join(domain, basename)
filename += ".h5"
# Just call the external function
filename = getTestFileName(basename, subfolder=subfolder)

return filename

def getPathFromDomain(self, domain):
Expand Down
19 changes: 16 additions & 3 deletions test/hl/multi_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import numpy as np
import time
import sys

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import subprocess
import re

from h5pyd._hl.dataset import MultiManager
from h5pyd import MultiManager
import h5pyd as h5py
from common import getTestFileName

# Flag to stop resource usage collection thread after a benchmark finishes
stop_stat_collection = False
Expand Down Expand Up @@ -249,7 +251,17 @@ def run_benchmark(test_name, test_func, stats, datasets, num_iters):
dt = np.int32
stats = {}

fs = [h5py.File("/home/test_user1/h5pyd_multi_bm_" + str(i), mode='w') for i in range(count)]
fs = []

for i in range(count):
filename = getTestFileName(f"bm_{i:04d}", subfolder="multi_bm")
try:
f = h5py.File(filename, mode='w')
except IOError:
print(f"unable to create domain at: {filename} - does the parent folder exist?")
sys.exit(1)
fs.append(f)

data_in = np.zeros(shape, dtype=dt)
datasets = [f.create_dataset("data", shape, dtype=dt, data=data_in) for f in fs]

Expand All @@ -266,7 +278,8 @@ def run_benchmark(test_name, test_func, stats, datasets, num_iters):

print("Testing with shared HTTP connection...")

f = h5py.File("/home/test_user1/h5pyd_multi_bm_shared", mode='w')
filename = getTestFileName("bm_shared", subfolder="multi_bm")
f = h5py.File(filename, mode='w')
datasets = [f.create_dataset("data" + str(i), data=data_in, dtype=dt) for i in range(count)]

run_benchmark("Read Multi (Shared HttpConn)", read_datasets_multi, stats, datasets, num_iters)
Expand Down
67 changes: 65 additions & 2 deletions test/hl/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import sys
import numpy as np
import platform
import warnings

from common import ut, TestCase
from h5pyd._hl.dataset import MultiManager
import config
from h5pyd import MultiManager

if config.get("use_h5py"):
from h5py import File, Dataset
Expand All @@ -39,6 +38,7 @@

def is_empty_dataspace(obj):
shape_json = obj.shape_json

if "class" not in shape_json:
raise KeyError()
if shape_json["class"] == 'H5S_NULL':
Expand Down Expand Up @@ -2290,6 +2290,69 @@ def test_multi_write_mixed_shapes(self):
out = self.f["data" + str(i)][...]
np.testing.assert_array_equal(out[sel_idx, sel_idx], data_in + i)

def test_multi_selection_rw(self):
"""
Test reading and writing a unique selection in each dataset
"""
shape = (10, 10, 10)
count = 3
dt = np.int32

# Create datasets
data_in = np.reshape(np.arange(np.prod(shape)), shape)
data_in_original = data_in.copy()
datasets = []

for i in range(count):
dset = self.f.create_dataset("data" + str(i), shape=shape,
dtype=dt, data=data_in)
datasets.append(dset)

mm = MultiManager(datasets=datasets)

# Selections to read from
sel = [np.s_[0:10, 0:10, 0:10], np.s_[0:5, 5:10, 1:4:2], np.s_[4, 5, 6]]
data_out = mm[sel]

for i in range(count):
np.testing.assert_array_equal(data_out[i], data_in[sel[i]])

# If selection list has only a single element, apply it to all dsets
sel = [np.s_[0:10, 0:10, 0:10]]
data_out = mm[sel[0]]

for d in data_out:
np.testing.assert_array_equal(d, data_in[sel[0]])

# Selections to write to
sel = [np.s_[0:10, 0:10, 0:10], np.s_[0:5, 0:5, 0:5], np.s_[0, 0, 0]]
data_in = [np.zeros_like(data_in), np.ones_like(data_in), np.full_like(data_in, 2)]
mm[sel] = data_in

for i in range(count):
np.testing.assert_array_equal(self.f["data" + str(i)][sel[i]], data_in[i][sel[i]])

# Check that unselected regions are unmodified
np.testing.assert_array_equal(self.f["data1"][5:, 5:, 5:], data_in_original[5:, 5:, 5:])
np.testing.assert_array_equal(self.f["data2"][1:, 1:, 1:], data_in_original[1:, 1:, 1:])

# Save for later comparison
data_in_original = mm[...]

# If selection list has only a single element, apply it to all dsets
sel = [np.s_[0:6, 0:6, 0:6]]
data_in = np.full(shape, 3, dtype=dt)
mm[sel] = [data_in[sel[0]]] * count

for i in range(count):
np.testing.assert_array_equal(self.f["data" + str(i)][sel[0]], data_in[sel[0]])

# Check that unselected regions are unmodified
data_out = mm[...]

for i in range(count):
np.testing.assert_array_equal(data_out[i][6:, 6:, 6:], data_in_original[i][6:, 6:, 6:])

"""
TBD - Field selection in h5pyd seems to work slightly different than in h5py
def test_multi_write_field_selection(self):
Expand Down
2 changes: 0 additions & 2 deletions test/hl/test_dataset_extend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
##############################################################################

import logging
import numpy as np
import math

import config

Expand Down
1 change: 0 additions & 1 deletion test/hl/test_dataset_fancyselect.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
##############################################################################

import numpy as np
import math

import config

Expand Down
1 change: 0 additions & 1 deletion test/hl/test_datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
##############################################################################

import numpy as np
import math
import logging
import config

Expand Down