Skip to content

Commit

Permalink
Implement MultiManager (#167)
Browse files Browse the repository at this point in the history
* Implement MultiManager

* Add multi RW benchmark to tests

* Implement prelinary spread of requests over SNs

* Add resource usage collection to benchmark
  • Loading branch information
mattjala authored Mar 13, 2024
1 parent 5cdd09b commit fdf1e40
Show file tree
Hide file tree
Showing 4 changed files with 845 additions and 2 deletions.
147 changes: 147 additions & 0 deletions h5pyd/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import time
import base64
import numpy
import os
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

from .base import HLObject, jsonToArray, bytesToArray, arrayToBytes
from .base import Empty, guess_dtype
Expand Down Expand Up @@ -1742,3 +1745,147 @@ def toTuple(self, data):
return tuple(self.toTuple(x) for x in data)
else:
return data


class MultiManager():
"""
high-level object to support slicing operations
that map to H5Dread_multi/H5Dwrite_multi
"""
# Avoid overtaxing HSDS
max_workers = 16

def __init__(self, datasets=None):
if (datasets is None) or (len(datasets) == 0):
raise ValueError("MultiManager requires non-empty list of datasets")
self.datasets = datasets

def read_dset_tl(self, args):
"""
Thread-local method to read from a single dataset
"""
dset = args[0]
idx = args[1]
try:
read_args = args[2]
except Exception as e:
raise e
return (idx, dset[read_args])

def write_dset_tl(self, args):
"""
Thread-local method to write to a single dataset
"""
dset = args[0]
idx = args[1]
write_args = args[2]
write_vals = args[3]
try:
dset[write_args] = write_vals
except Exception as e:
raise e
return

def __getitem__(self, args):
"""
Read the same slice from each of the datasets
managed by this MultiManager.
"""
# Spread requests out evenly among all available SNs

# TODO: This should eventually be handled at the config/HTTPConn level
try:
num_endpoints = int(os.environ["SN_CORES"])
port_range = os.environ["SN_PORT_RANGE"]
ports = port_range.split('-')

if len(ports) != 2:
raise ValueError("Malformed SN_PORT_RANGE")

low_port = int(ports[0])
high_port = int(ports[1])

except Exception as e:
msg = f"{e}: Defaulting Number of SN_COREs to 1"
self.log.warning(msg)
num_endpoints = 1

if (num_endpoints > 1):
next_port = low_port
port_len = len(ports[0])

for i, dset in enumerate(self.datasets):
endpt = dset.id.http_conn._endpoint
endpt = endpt[:len(endpt) - port_len] + str(next_port)
dset.id.http_conn._endpoint = endpt
next_port += 1

if next_port > high_port:
next_port = low_port

# TODO: Handle the case where some or all datasets share an HTTPConn object

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
read_futures = [executor.submit(self.read_dset_tl, (self.datasets[i], i, args)) for i in range(len(self.datasets))]
ret_data = [None] * len(self.datasets)

for future in as_completed(read_futures):
try:
result = future.result()
idx = result[0]
dset_data = result[1]
ret_data[idx] = dset_data
except Exception as exc:
executor.shutdown(wait=False)
raise ValueError(f"Error during multi-read: {exc}")
return ret_data

def __setitem__(self, args, vals):
"""
Write to the provided slice of each dataset
managed by this MultiManager.
"""
# TODO: This should eventually be handled at the config/HTTPConn level
try:
num_endpoints = int(os.environ["SN_CORES"])
port_range = os.environ["SN_PORT_RANGE"]
ports = port_range.split('-')

if len(ports) != 2:
raise ValueError("Malformed SN_PORT_RANGE")

low_port = int(ports[0])
high_port = int(ports[1])

if (high_port - low_port) != num_endpoints - 1:
raise ValueError("Malformed port range specification; must be sequential ports")

except Exception as e:
print(f"{e}: Defaulting Number of SNs to 1")
num_endpoints = 1

# TODO: Handle the case where some or all datasets share an HTTPConn object
# For now, assume each connection is distinct
if (num_endpoints > 1):
next_port = low_port
port_len = len(ports[0])

for i, dset in enumerate(self.datasets):
endpt = dset.id.http_conn._endpoint
endpt = endpt[:len(endpt) - port_len] + str(next_port)
dset.id.http_conn._endpoint = endpt
next_port += 1

if next_port > high_port:
next_port = low_port

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
write_futures = [executor.submit(self.write_dset_tl, (self.datasets[i], i, args, vals[i])) for i in range(len(self.datasets))]

for future in as_completed(write_futures):
try:
future.result()
except Exception as exc:
executor.shutdown(wait=False)
raise ValueError(f"Error during multi-write: {exc}")
return
4 changes: 2 additions & 2 deletions h5pyd/_hl/httpconn.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,11 +753,11 @@ def session(self):

s.mount(
"http://",
HTTPAdapter(max_retries=retry),
HTTPAdapter(max_retries=retry, pool_connections=16, pool_maxsize=16),
)
s.mount(
"https://",
HTTPAdapter(max_retries=retry),
HTTPAdapter(max_retries=retry, pool_connections=16, pool_maxsize=16),
)
self._s = s
else:
Expand Down
Loading

0 comments on commit fdf1e40

Please sign in to comment.