Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hsload fix for large contiguous datasets #231

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion h5pyd/_apps/hsload.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
sys.exit(1)

try:
# this package is used when reading HDF5 files on S3
import s3fs

S3FS_IMPORT = True
except ImportError:
S3FS_IMPORT = False

try:
# this package is useful for HDF5 compresssed with blosc, etc.
import hdf5plugin
except ImportError:
pass

if __name__ == "__main__":
from config import Config
from utillib import load_file, load_h5image
Expand Down
161 changes: 142 additions & 19 deletions h5pyd/_apps/utillib.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

# adjust chunk shape to fit between min and max chunk sizes when possible
MIN_CHUNK_SIZE = 1 * 1024 * 1024
MAC_CHUNK_SIZE = 8 * 1024 * 1024
MAX_CHUNK_SIZE = 8 * 1024 * 1024
CHUNK_BASE = 64 * 1024 # Multiplier by which chunks are adjusted

H5Z_FILTER_MAP = {
32001: "blosclz",
Expand Down Expand Up @@ -235,6 +236,139 @@ def convert_dtype(srcdt, ctx):
return tgt_dt


def guess_chunk(shape, typesize):
""" Guess an appropriate chunk layout for a dataset, given its shape and
the size of each element in bytes. Will allocate chunks only as large
as MAX_SIZE. Chunks are generally close to some power-of-2 fraction of
each axis, slightly favoring bigger values for the last index.

Undocumented and subject to change without warning.
"""

# For unlimited dimensions we have to guess 1024
shape = tuple((x if x != 0 else 1024) for i, x in enumerate(shape))

ndims = len(shape)
if ndims == 0:
raise ValueError("Chunks not allowed for scalar datasets.")

chunks = np.array(shape, dtype='=f8')
if not np.all(np.isfinite(chunks)):
raise ValueError("Illegal value in chunk tuple")

# Determine the optimal chunk size in bytes using a PyTables expression.
# This is kept as a float.
dset_size = np.prod(chunks) * typesize
target_size = CHUNK_BASE * (2 ** np.log10(dset_size / (1024. * 1024)))

if target_size > MIN_CHUNK_SIZE:
target_size = MAX_CHUNK_SIZE
elif target_size < MIN_CHUNK_SIZE:
target_size = MIN_CHUNK_SIZE

idx = 0
while True:
# Repeatedly loop over the axes, dividing them by 2. Stop when:
# 1a. We're smaller than the target chunk size, OR
# 1b. We're within 50% of the target chunk size, AND
# 2. The chunk is smaller than the maximum chunk size

chunk_bytes = np.prod(chunks) * typesize

if (chunk_bytes < target_size or abs(chunk_bytes - target_size) / target_size < 0.5) and \
chunk_bytes < MAX_CHUNK_SIZE:
break

if np.prod(chunks) == 1:
break # Element size larger than CHUNK_MAX
chunks[idx % ndims] = np.ceil(chunks[idx % ndims] / 2.0)
idx += 1

return tuple(int(x) for x in chunks)


class ChunkIterator(object):
"""
Class to iterate through list of chunks of a given dataset
"""

def __init__(self, dset, source_sel=None):
self._shape = dset.shape
rank = len(dset.shape)

if not dset.chunks:
# coniguous layout - create some psuedo-chunks so we do't
# try to read to much data in one selection
self._layout = guess_chunk(dset.shape, dset.dtype.itemsize)
elif isinstance(dset.chunks, dict):
self._layout = dset.chunks["dims"]
else:
self._layout = dset.chunks

if source_sel is None:
# select over entire dataset
slices = []
for dim in range(rank):
slices.append(slice(0, self._shape[dim]))
self._sel = tuple(slices)
else:
if isinstance(source_sel, slice):
self._sel = (source_sel,)
else:
self._sel = source_sel
if len(self._sel) != rank:
raise ValueError(
"Invalid selection - selection region must have same rank as dataset"
)
self._chunk_index = []
for dim in range(rank):
s = self._sel[dim]
if s.start < 0 or s.stop > self._shape[dim] or s.stop <= s.start:
msg = "Invalid selection - selection region must be within dataset space"
raise ValueError(msg)
index = s.start // self._layout[dim]
self._chunk_index.append(index)

def __iter__(self):
return self

def __next__(self):
rank = len(self._shape)
slices = []
if rank == 0 or self._chunk_index[0] * self._layout[0] >= self._sel[0].stop:
# ran past the last chunk, end iteration
raise StopIteration()

for dim in range(rank):
s = self._sel[dim]
start = self._chunk_index[dim] * self._layout[dim]
stop = (self._chunk_index[dim] + 1) * self._layout[dim]
# adjust the start if this is an edge chunk
if start < s.start:
start = s.start
if stop > s.stop:
stop = s.stop # trim to end of the selection
s = slice(start, stop, 1)
slices.append(s)

# bump up the last index and carry forward if we run outside the selection
dim = rank - 1
while dim >= 0:
s = self._sel[dim]
self._chunk_index[dim] += 1

chunk_end = self._chunk_index[dim] * self._layout[dim]
if chunk_end < s.stop:
# we still have room to extend along this dimensions
return tuple(slices)

if dim > 0:
# reset to the start and continue iterating with higher dimension
self._chunk_index[dim] = 0
dim -= 1
return tuple(slices)


# ----------------------------------------------------------------------------------
def copy_element(val, src_dt, tgt_dt, ctx):
msg = f"copy_element, val: {val} "
Expand Down Expand Up @@ -1262,8 +1396,12 @@ def create_dataset(dobj, ctx):
# supported non-standard compressor
kwargs["compression"] = filter_name
logging.info(f"using compressor: {filter_name} for {dobj.name}")
kwargs["compression_opts"] = filter_opts
logging.info(f"compression_opts: {filter_opts}")
if isinstance(filter_opts, int) or isinstance(filter_opts, dict):
kwargs["compression_opts"] = filter_opts
logging.info(f"compression_opts: {filter_opts}")
else:
msg = f"ignoring compression_opts for filter: {filter_name}"
logging.warn(msg)
else:
logging.warning(f"filter id {filter_id} for {dobj.name} not supported")

Expand Down Expand Up @@ -1402,26 +1540,11 @@ def write_dataset(src, tgt, ctx):
if ctx["verbose"]:
print(msg)
try:
if src.chunks is None:
# contiguous dataset, fake an iterator by creating a list
# with one slice
slices = []
for dim in range(rank):
extent = src.shape[dim]
s = slice(0, extent, 1)
slices.append(s)
slices = tuple(slices)
if rank == 1:
it = [slices[0],]
else:
it = [slices,]
else:
it = src.iter_chunks()

logging.debug(f"src dtype: {src.dtype}")
logging.debug(f"des dtype: {tgt.dtype}")

empty_arr = None
it = ChunkIterator(src)
for src_s in it:
logging.debug(f"src selection: {src_s}")
if rank == 1 and isinstance(src_s, slice):
Expand Down
2 changes: 1 addition & 1 deletion h5pyd/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def make_new_dset(
maxshape = tuple(m if m is not None else 0 for m in maxshape)
body["maxdims"] = maxshape
else:
print("Warning: maxshape provided but no shape")
print("maxshape provided but no shape")

req = "/datasets"

Expand Down
13 changes: 5 additions & 8 deletions h5pyd/_hl/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,11 @@ def rq_tuple(tpl, name):
for k in compression_opts:
filter[k] = compression_opts[k]
else:
if compression_opts in range(10):
level = compression_opts
else:
raise ValueError(
"compression setting must be a dict or integer from 0-9, not {}".format(
compression_opts
)
)
if compression_opts not in range(10):
msg = "compression setting must be a dict or integer from 0-9, "
msg += f"not {compression_opts}"
raise ValueError(msg)
level = compression_opts
filter["level"] = level
filters.append(filter)

Expand Down