Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hyperchunk2d #310

Merged
merged 6 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ xss_protection: "1; mode=block" # Include in response headers if set
allow_any_bucket_read: true # enable reads to buckets other than default bucket
allow_any_bucket_write: true # enable writes to buckets other than default bucket
bit_shuffle_default_blocksize: 2048 # default blocksize for bitshuffle filter
max_rangeget_gap: 1024 # max gap in byte for intelligent range get requests
# DEPRECATED - the remaining config values are not used in currently but kept for backward compatibility with older container images
aws_lambda_chunkread_function: null # name of aws lambda function for chunk reading
aws_lambda_threshold: 4 # number of chunks per node per request to reach before using lambda
Expand Down
7 changes: 4 additions & 3 deletions hsds/async_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .util.chunkUtil import getDatasetId, getNumChunks, ChunkIterator
from .util.hdf5dtype import getItemSize, createDataType
from .util.arrayUtil import getNumElements, bytesToArray
from .util.dsetUtil import getHyperslabSelection, getFilterOps, getChunkDims
from .util.dsetUtil import getHyperslabSelection, getFilterOps, getChunkDims, getFilters
from .util.dsetUtil import getDatasetLayoutClass, getDatasetLayout, getShapeDims

from .util.storUtil import getStorKeys, putStorJSONObj, getStorJSONObj
Expand Down Expand Up @@ -163,9 +163,10 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None):
return
dims = chunktable_layout["dims"]
chunktable_type_json = chunktable_json["type"]
chunktable_item_size = getItemSize(chunktable_type_json)
chunktable_dt = createDataType(chunktable_type_json)
chunktable_filter_ops = getFilterOps(app, chunktable_json, chunktable_item_size)
filters = getFilters(chunktable_json)
kwargs = {"dtype": chunktable_dt, "chunk_shape": dims}
chunktable_filter_ops = getFilterOps(app, chunktable_id, filters, **kwargs)

# read chunktable one chunk at a time - this can be slow if there
# are a lot of chunks, but this is only used by the async bucket
Expand Down
5 changes: 5 additions & 0 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,17 @@ async def read_chunk_hyperslab(
if isinstance(s3offset, list):
# convert to a colon seperated string
s3offset = ":".join(map(str, s3offset))
else:
s3offset = int(s3offset)
params["s3offset"] = s3offset

if "s3size" in chunk_info:
s3size = chunk_info["s3size"]
if isinstance(s3size, list):
# convert to a colon seperated string
s3size = ":".join(map(str, s3size))
else:
s3size = int(s3size)
params["s3size"] = s3size

if "hyper_dims" in chunk_info:
Expand All @@ -286,6 +290,7 @@ async def read_chunk_hyperslab(
# convert to colon seperated string
hyper_dims = ":".join(map(str, hyper_dims))
params["hyper_dims"] = hyper_dims

if len(select_dtype) < len(dset_dt):
# field selection, pass in the field names
fields_param = ":".join(select_dtype.names)
Expand Down
12 changes: 10 additions & 2 deletions hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ async def GET_Chunk(request):

if "s3offset" in params:
param_s3offset = params["s3offset"]
log.debug(f"s3offset param: {param_s3offset}")
try:
if param_s3offset.find(":") > 0:
# colon seperated index values, convert to list
Expand All @@ -362,7 +361,6 @@ async def GET_Chunk(request):

if "s3size" in params:
param_s3size = params["s3size"]
log.debug(f"s3size param: {param_s3size}")
try:
if param_s3size.find(":") > 0:
s3size = list(map(int, param_s3size.split(":")))
Expand All @@ -384,6 +382,16 @@ async def GET_Chunk(request):
log.error(f"invalid hyper_dims params: {param_hyper_dims}")
raise HTTPBadRequest()
log.debug(f"hyper_dims: {hyper_dims}")
"""
if "hyper_index" in params:
param_hyper_index = params["hyper_index"]
log.debug(f"tbd param_hyper_index: {param_hyper_index}")
if param_hyper_index.find(":") > 0:
hyper_index = param_hyper_index.split(":")
else:
hyper_index = param_hyper_index
log.debug(f"hyper_index: {hyper_index}")
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


if "query" in params:
query = params["query"]
Expand Down
130 changes: 72 additions & 58 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .util.idUtil import isValidChunkId, getDataNodeUrl, isSchema2Id
from .util.idUtil import getRootObjId, isRootObjId
from .util.storUtil import getStorJSONObj, putStorJSONObj, putStorBytes
from .util.storUtil import getStorBytes, isStorObj, deleteStorObj
from .util.storUtil import getStorBytes, isStorObj, deleteStorObj, getHyperChunks
from .util.storUtil import getBucketFromStorURI, getKeyFromStorURI, getURIFromKey
from .util.domainUtil import isValidDomain, getBucketForDomain
from .util.attrUtil import getRequestCollectionName
Expand All @@ -34,7 +34,7 @@
from .util.chunkUtil import getDatasetId, getChunkSelection, getChunkIndex
from .util.arrayUtil import arrayToBytes, bytesToArray, jsonToArray
from .util.hdf5dtype import createDataType
from .util.rangegetUtil import ChunkLocation, chunkMunge
from .util.rangegetUtil import ChunkLocation, chunkMunge, getHyperChunkIndex, getHyperChunkFactors

from . import config
from . import hsds_logger as log
Expand Down Expand Up @@ -811,11 +811,15 @@ async def get_chunk_bytes(
dtype=None,
chunk_id=None,
chunk_dims=None,
hyper_dims=None
layout_class=None,
hyper_dims=None,
fill_value=None,
):
""" For regular chunk reads, just call getStorBytes.
"""
item_size = dtype.itemsize
chunk_size = np.prod(chunk_dims) * item_size
rank = len(chunk_dims)

msg = f"get_chunk_bytes({chunk_id}, bucket={bucket}, offset={offset}, length={length}, "
msg += f"item_size={item_size}, chunk_dims={chunk_dims}, hyper_dims={hyper_dims}"
Expand All @@ -831,58 +835,85 @@ async def get_chunk_bytes(
}

chunk_bytes = await getStorBytes(app, s3key, **kwargs)
return chunk_bytes
if chunk_bytes is None:
msg = f"read {chunk_id} bucket: {bucket} returned None"
raise ValueError(msg)
if layout_class == "H5D_CONTIGUOUS_REF":
if len(chunk_bytes) < chunk_size:
# we may get less than expected bytes if this chunk
# is close to the end of the file
# expand to expected number of bytes
msg = "extending returned bytearray for "
msg += "H5D_CONTIGUOUS layout from "
msg += f"{len(chunk_bytes)} to {chunk_size}"
log.info(msg)
tmp_buffer = bytearray(chunk_size)
tmp_buffer[: len(chunk_bytes)] = chunk_bytes
chunk_bytes = bytes(tmp_buffer)
chunk_arr = bytesToArray(chunk_bytes, dtype, chunk_dims)
return chunk_arr

# intelligent range get request
log.debug("intelligent range get")

if hyper_dims is None:
log.error("get_chunk_bytes - expected hyper_dims parameter to be set")
raise HTTPInternalServerError()
msg = "expected hyper_dims parameter to be set"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
if len(hyper_dims) != rank:
msg = f"invalid hyper_dims: {hyper_dims}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

rank = len(chunk_dims)
if rank != 1:
msg = "get_chunk_bytes - only one-dimensional datasets are supported currently "
msg += "for intelligent range gets"
log.error(msg)
raise HTTPInternalServerError()
table_factors = getHyperChunkFactors(chunk_dims, hyper_dims)

num_chunks = len(offset)
num_chunks = len(offset) # offset is a list of chunk offsets
log.debug(f"get_chunk_bytes - num_chunks: {num_chunks}")
if len(length) != num_chunks:
log.error("get_chunk_bytes - expecting length and num_chunks to have same value")
raise HTTPInternalServerError()
if np.prod(table_factors) != num_chunks:
msg = f"unexpected number of hyperchunks: {num_chunks}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
if not isinstance(length, list) or len(length) != num_chunks:
msg = "get_chunk_bytes - expecting length count and num_chunks to have same value"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

# create a buffer for the hsds chunk and arrange h5 chunks within it
chunk_size = np.prod(chunk_dims) * item_size
# number of bytes in the hd5 chunk
# hyper_dims = [4000,] # test
h5_size = np.prod(hyper_dims) * item_size
log.debug(f"h5 chunk size: {h5_size}")
chunk_bytes = bytearray(chunk_size)

if num_chunks > chunk_size // h5_size:
# shouldn't have more than this many hyperchunks
msg = f"get_chunk_bytes - got more than expected hyperchunks: {num_chunks}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise HTTPBadRequest(reason=msg)

# create an numpy array for the hsds chunk and arrange h5 chunks within it
if fill_value is not None:
chunk_arr = np.empty(chunk_dims, dtype=dtype, order="C")
chunk_arr[...] = fill_value
else:
chunk_arr = np.zeros(chunk_dims, dtype=dtype, order="C")

# create a list of the chunk to be fetched
# create a list of the hyperchunks to be fetched
chunk_list = []
for i in range(num_chunks):
if length[i] == 0:
# ignore empty range get requests
continue
chunk_list.append(ChunkLocation(i, offset[i], length[i]))
hyper_index = getHyperChunkIndex(i, table_factors)
chunk_location = ChunkLocation(hyper_index, offset[i], length[i])
chunk_list.append(chunk_location)

if len(chunk_list) == 0:
# nothing to fetch, return zero-initialized array
return chunk_bytes

# munge adjacent chunks to reduce the number of storage
# requests needed
chunk_list = chunkMunge(chunk_list, max_gap=1024)

log.info(f"get_chunk_butes - stor get requests reduced from {num_chunks} to {len(chunk_list)}")
max_gap = int(config.get("max_rangeget_gap", default=1024))
chunk_list = chunkMunge(chunk_list, max_gap=max_gap)
log.info(f"get_chunk_bytes - get requests reduced from {num_chunks} to {len(chunk_list)}")

# gather all the individual h5 chunk reads into a list of tasks
tasks = []
Expand All @@ -896,22 +927,17 @@ async def get_chunk_bytes(

log.debug(f"getStorBytes processing chunk_locations {chunk_locations}")
# get the byte range we'll read from storage
item_offset = chunk_locations[0].offset
item_length = chunk_locations[-1].offset + chunk_locations[-1].length - item_offset

kwargs = {
"filter_ops": filter_ops,
"offset": item_offset,
"length": item_length,
"chunk_locations": chunk_locations,
"bucket": bucket,
"chunk_bytes": chunk_bytes,
"h5_size": h5_size,
"chunk_arr": chunk_arr,
"hyper_dims": hyper_dims,
}
msg = f"get_chunk_bytes - {len(chunk_locations)} h5 chunks, "
msg += f"offset: {item_offset}, length: {item_length}"
msg = f"get_chunk_bytes - {len(chunk_locations)} h5 chunks"
log.debug(msg)
tasks.append(getStorBytes(app, s3key, **kwargs))
tasks.append(getHyperChunks(app, s3key, **kwargs))

log.debug(f"running asyncio.gather on {len(tasks)} tasks")
results = await asyncio.gather(*tasks)
Expand All @@ -922,8 +948,11 @@ async def get_chunk_bytes(
log.error(msg)
raise HTTPInternalServerError()

log.debug("get_chunk_bytes done")
return chunk_bytes
msg = "get_chunk_bytes done for hyperchunks "
msg += f"arr min: {chunk_arr.min()}, arr_max: {chunk_arr.max()}"
log.debug(msg)

return chunk_arr


async def get_chunk(
Expand All @@ -949,6 +978,7 @@ async def get_chunk(
log.debug(f" s3path: {s3path} s3offset: {s3offset} s3size: {s3size}")
if hyper_dims is not None:
log.debug(f" hyper_dims: {hyper_dims}")

chunk_cache = app["chunk_cache"]
if chunk_init and s3offset > 0:
msg = f"unable to initialize chunk {chunk_id} for reference layouts "
Expand Down Expand Up @@ -982,6 +1012,7 @@ async def get_chunk(
layout_json = dset_json["layout"]
layout_class = layout_json.get("class")
chunk_dims = getChunkLayout(dset_json)
fill_value = getFillValue(dset_json)

# note - officially we should follow the order in which the filters are
# defined in the filter_list,
Expand Down Expand Up @@ -1053,10 +1084,12 @@ async def get_chunk(
"dtype": dt,
"chunk_dims": chunk_dims,
"hyper_dims": hyper_dims,
"fill_value": fill_value,
"layout_class": layout_class,
"bucket": bucket,
}

chunk_bytes = await get_chunk_bytes(app, s3key, **kwargs)
chunk_arr = await get_chunk_bytes(app, s3key, **kwargs)

if chunk_id in pending_s3_read:
# read complete - remove from pending map
Expand All @@ -1066,23 +1099,6 @@ async def get_chunk(
msg = f"expected to find {chunk_id} in "
msg += "pending_s3_read map"
log.warn(msg)
if chunk_bytes is None:
msg = f"read {chunk_id} bucket: {bucket} returned None"
raise ValueError(msg)
if layout_class == "H5D_CONTIGUOUS_REF":
if len(chunk_bytes) < s3size:
# we may get less than expected bytes if this chunk
# is close to the end of the file
# expand to expected number of bytes
msg = "extending returned bytearray for "
msg += "H5D_CONTIGUOUS layout from "
msg += f"{len(chunk_bytes)} to {s3size}"
log.info(msg)
tmp_buffer = bytearray(s3size)
tmp_buffer[: len(chunk_bytes)] = chunk_bytes
chunk_bytes = bytes(tmp_buffer)
chunk_arr = bytesToArray(chunk_bytes, dt, dims)
log.debug(f"chunk size: {chunk_arr.size}")
except HTTPNotFound:
if not chunk_init:
log.info(f"chunk not found for id: {chunk_id}")
Expand Down Expand Up @@ -1122,8 +1138,6 @@ async def get_chunk(

if chunk_arr is None:
# normal fill value based init or initializer failed
fill_value = getFillValue(dset_json)

if fill_value is not None:
chunk_arr = np.empty(dims, dtype=dt, order="C")
chunk_arr[...] = fill_value
Expand Down
Loading
Loading