Skip to content

Commit

Permalink
cleanup code for checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Feb 14, 2024
1 parent 0a6433c commit b4be21d
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 76 deletions.
1 change: 0 additions & 1 deletion hsds/async_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None):
chunktable_type_json = chunktable_json["type"]
chunktable_dt = createDataType(chunktable_type_json)
filters = getFilters(chunktable_json)
log.debug(f"tbd chunktable_json: {chunktable_json}")
kwargs = {"dtype": chunktable_dt, "chunk_shape": dims}
chunktable_filter_ops = getFilterOps(app, chunktable_id, filters, **kwargs)

Expand Down
11 changes: 1 addition & 10 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ async def read_chunk_hyperslab(
return
chunk_info = chunk_map[chunk_id]
log.debug(f"using chunk_map entry for {chunk_id}")
log.debug(f"tbd: chunk_info: {chunk_info}")
if "points" in chunk_info:
points = chunk_info["points"]
log.debug(f"chunkinfo {len(points)} points")
Expand Down Expand Up @@ -291,15 +290,7 @@ async def read_chunk_hyperslab(
# convert to colon seperated string
hyper_dims = ":".join(map(str, hyper_dims))
params["hyper_dims"] = hyper_dims
"""
if "hyper_index" in chunk_info:
hyper_index = chunk_info["hyper_index"]
if isinstance(hyper_index, list):
# convert to colon seperated string
hyper_index = ":".join(map(str, hyper_index))
params["hyper_index"] = hyper_index
log.debug(f"tbd: hyper_index_param: {hyper_index}")
"""

if len(select_dtype) < len(dset_dt):
# field selection, pass in the field names
fields_param = ":".join(select_dtype.names)
Expand Down
2 changes: 0 additions & 2 deletions hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,6 @@ async def GET_Chunk(request):
kwargs["s3size"] = s3size
if hyper_dims:
kwargs["hyper_dims"] = hyper_dims
#if hyper_index:
# kwargs["hyper_index"] = hyper_index
else:
kwargs["bucket"] = bucket

Expand Down
12 changes: 4 additions & 8 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,6 @@ async def run_chunk_initializer(
return chunk_arr



async def get_chunk_bytes(
app,
s3key,
Expand Down Expand Up @@ -888,24 +887,22 @@ async def get_chunk_bytes(
msg = f"get_chunk_bytes - got more than expected hyperchunks: {num_chunks}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

# create an numpy array for the hsds chunk and arrange h5 chunks within it
if fill_value is not None:
chunk_arr = np.empty(chunk_dims, dtype=dtype, order="C")
chunk_arr[...] = fill_value
else:
chunk_arr = np.zeros(chunk_dims, dtype=dtype, order="C")

# create a list of the hyperchunks to be fetched
chunk_list = []
for i in range(num_chunks):
if length[i] == 0:
# ignore empty range get requests
continue
hyper_index = getHyperChunkIndex(i, table_factors)
log.debug(f"tbd: got hyper_index: {hyper_index}")
chunk_location = ChunkLocation(hyper_index, offset[i], length[i])
log.debug(f"add ChunkLocation : {chunk_location}")
chunk_list.append(chunk_location)

if len(chunk_list) == 0:
Expand All @@ -929,7 +926,7 @@ async def get_chunk_bytes(

log.debug(f"getStorBytes processing chunk_locations {chunk_locations}")
# get the byte range we'll read from storage

kwargs = {
"filter_ops": filter_ops,
"chunk_locations": chunk_locations,
Expand Down Expand Up @@ -980,8 +977,7 @@ async def get_chunk(
log.debug(f" s3path: {s3path} s3offset: {s3offset} s3size: {s3size}")
if hyper_dims is not None:
log.debug(f" hyper_dims: {hyper_dims}")
#if hyper_index is not None:
# log.debug(f" hyper_index: {hyper_index}")

chunk_cache = app["chunk_cache"]
if chunk_init and s3offset > 0:
msg = f"unable to initialize chunk {chunk_id} for reference layouts "
Expand Down
54 changes: 25 additions & 29 deletions hsds/dset_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,37 @@ def getFillValue(dset_json):


def _get_arr_pts(arr_points, arr_index, pt, dim=0, chunk_index=None, factors=None):
""" recursive function to fill in chunk locations for a hyperchunk """
""" recursive function to fill in chunk locations for hyperchunk selection.
arr_points: numpy array of shape (num_chunks, rank)
arr_index: row of arr_points for the first hyper chunk pt
pt: list of [0,] * rank - one element will get set for each recursive call
dim: the current dimension - only set in recursion
chunk_index: the HSDS chunk index
factors: the hyper scaling factors
on return arr_points rows arr_index[arr_index*N:(arr_index+1)*N-1] will be
set to the values needed to do a point selection on the chunk table, where
N is the ratioo of chunks to hyperchunks - np.prod(factors)
"""

log.debug(f"get_arr_pts = arr_index: {arr_index}, dim: {dim}, pt: {pt}")
log.debug(f"chunk_index: {chunk_index}")

index = chunk_index[dim]
factor = factors[dim]
rank = len(chunk_index)
for i in range(factor):
pt[dim] = (index * factor + i)
if dim + 1 == rank:
idx = int(arr_index) + i
#print(f"idx: {idx}, type: {type(idx)}")
#print("arr_points.shape:", arr_points.shape)
#print("pt:", pt)
if rank == 1:
arr_points[idx] = pt[0] # need to set 1d arrays with an int index
else:
arr_points[idx] = pt
arr_points[idx] = pt
else:
kwargs = {"dim": dim+1, "chunk_index": chunk_index, "factors": factors}
next_index = arr_index + i*np.prod(factors[1:])
kwargs = {"dim": dim + 1, "chunk_index": chunk_index, "factors": factors}
next_index = arr_index + (i * np.prod(factors[1:]))
_get_arr_pts(arr_points, next_index, pt, **kwargs)


Expand Down Expand Up @@ -193,10 +202,8 @@ def getChunkItem(chunkid):
log.debug(f"cpl layout: {layout}")
s3path = layout["file_uri"]
chunks = layout["chunks"]
log.debug(f"tbd: chunks: {chunks}")
for chunk_id in chunk_ids:
chunk_item = getChunkItem(chunk_id)
log.debug(f"tbd: {chunk_id} chunk_item: {chunk_item}")
s3offset = 0
s3size = 0
chunk_key = getChunkSuffix(chunk_id)
Expand Down Expand Up @@ -240,7 +247,7 @@ def getChunkItem(chunkid):
log.debug(f"default_chunktable_dims: {default_chunktable_dims}")
if "hyper_dims" in layout:
hyper_dims = layout["hyper_dims"]

else:
# assume 1 to 1 matching
hyper_dims = chunk_dims
Expand All @@ -260,21 +267,19 @@ def getChunkItem(chunkid):
msg += f" of {chunk_dims[dim]}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

ref_num_chunks = num_chunks * np.prod(table_factors)

log.debug(f"ref_num_chunks: {ref_num_chunks}")
log.debug(f"hyper_dims: {hyper_dims}")

arr_points_shape = (ref_num_chunks, rank)
arr_points = np.zeros(arr_points_shape, dtype=np.dtype("u8"))
log.debug(f"tbd: arr_points: {arr_points}")


if ref_num_chunks == num_chunks:
for i in range(num_chunks):
chunk_id = chunk_ids[i]
indx = getChunkIndex(chunk_id)
log.debug(f"tbd: chunkindex: {indx}")
arr_points[i] = indx
else:
# hyper chunking
Expand All @@ -286,11 +291,9 @@ def getChunkItem(chunkid):
arr_index = i * chunks_per_hyperchunk
kwargs = {"chunk_index": chunk_index, "factors": table_factors}
_get_arr_pts(arr_points, arr_index, pt, **kwargs)
log.debug(f"tbd: arr_points: {arr_points}")


msg = f"got chunktable - {len(arr_points)} entries, calling getSelectionData"
log.debug(msg)
log.debug(f"tbd: arr_points: {arr_points}")
# this call won't lead to a circular loop of calls since we've checked
# that the chunktable layout is not H5D_CHUNKED_REF_INDIRECT
kwargs = {"points": arr_points, "bucket": bucket}
Expand Down Expand Up @@ -331,23 +334,16 @@ def getChunkItem(chunkid):
factor = ref_num_chunks // num_chunks
s3offsets = []
s3sizes = []
# hyper_index = []
index_base = chunk_index[0]
log.debug(f"tbd - index_base: {index_base}")
for j in range(factor):
item = point_data[i * factor + j]
s3offset = int(item[0])
s3offsets.append(s3offset)
s3size = int(item[1])
s3sizes.append(s3size)
# index = int(arr_points[i * factor + j]) % index_base
#index = getHyperChunkIndex(j, table_factors)
# hyper_index.append(index)
s3sizes.append(s3size)

chunk_item["s3offset"] = s3offsets
chunk_item["s3size"] = s3sizes
chunk_item["hyper_dims"] = hyper_dims
# chunk_item["hyper_index"] = hyper_index
log.debug(f"tbd - chunk_item: {chunk_item}")

else:
log.error(f"Unexpected chunk layout: {layout['class']}")
Expand All @@ -358,7 +354,7 @@ def getChunkItem(chunkid):


def get_chunkmap_selections(chunk_map, chunk_ids, slices, dset_json):
"""Update chunk_map with chunk and data selections for the
""" Update chunk_map with chunk and data selections for the
given set of slices
"""
log.debug(f"get_chunkmap_selections - {len(chunk_ids)} chunk_ids")
Expand Down
9 changes: 5 additions & 4 deletions hsds/util/rangegetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _chunk_start(c):
start = c.offset
return start


def getHyperChunkFactors(chunk_dims, hyper_dims):
""" return list of rations betwen chunk and hyperchunkdims """

Expand All @@ -36,21 +37,21 @@ def getHyperChunkFactors(chunk_dims, hyper_dims):
factors.append(factor)
return factors


def getHyperChunkIndex(i, factors):
""" return index of ith hyperchunk based on the chunk factors
""" return index of ith hyperchunk based on the chunk factors
e.g. for factors: [2,3,4], the 5th index will be: 0_1_1
"""

rank = len(factors)
index = []
for dim in range(rank):
factor = int(np.prod(factors[(dim+1):]))
n = (i // factor) % factors[dim]
factor = int(np.prod(factors[(dim + 1):]))
n = (i // factor) % factors[dim]
index.append(n)
return tuple(index)



def _chunk_end(c):
""" return end of byte range for given chunk or chunk list """
end = None
Expand Down
32 changes: 14 additions & 18 deletions hsds/util/storUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ async def getStorBytes(app,
continue
# add to the list
chunk_bytes.append(h5_bytes)

return chunk_bytes
elif filter_ops:
# uncompress and return
Expand All @@ -472,14 +472,14 @@ async def getStorBytes(app,


async def getHyperChunks(app,
key,
chunk_arr=None,
hyper_dims=None,
filter_ops=None,
chunk_locations=None,
bucket=None
):
key,
chunk_arr=None,
hyper_dims=None,
filter_ops=None,
chunk_locations=None,
bucket=None
):

min_offset = None
max_offset = None
rank = len(chunk_arr.shape)
Expand All @@ -501,39 +501,35 @@ async def getHyperChunks(app,
log.debug(f"getHyperChunks: read {len(data)} bytes")
if len(data) < item_length:
log.warn(f"getHyperChunks, requested: {item_length}, but got: {len(data)} bytes")

# slot in the data
for item in chunk_locations:
log.debug(f"tbd - got item: {item}")
chunk_offset = item.offset - min_offset
if chunk_offset + item.length > len(data):
# edge chunk
chunk_size = len(data) - chunk_offset
h5_bytes = bytearray(h5_size)
h5_bytes[:chunk_size] = data[chunk_offset:chunk_offset+chunk_size]
h5_bytes[:chunk_size] = data[chunk_offset:chunk_offset + chunk_size]
else:
h5_bytes = data[chunk_offset: chunk_offset+item.length]
h5_bytes = data[chunk_offset:chunk_offset + item.length]
if filter_ops:
h5_bytes = _uncompress(h5_bytes, **filter_ops)
hyper_chunk = np.frombuffer(h5_bytes, dtype=chunk_arr.dtype)
hyper_chunk = hyper_chunk.reshape(hyper_dims)
log.debug(f"tbd - hyper_dims: {hyper_dims}")
hyper_index = item.index
slices = []
for i in range(rank):
extent = hyper_dims[i]
index = hyper_index[i]
log.debug(f"getting slice for hyperchunk: {index} with extent: {extent}")
start = extent * index
end = start + extent
s = slice(start, end, 1)
log.debug(f"tbd - adding slice: {s}")
slices.append(s)
slices = tuple(slices) # need tuple to use as numpy index
log.debug(f"tbd - hyper index {hyper_index} slice: {slices}")
chunk_arr[slices] = hyper_chunk[...]
log.debug(f"read {len(chunk_locations)} hyperchunks")



async def putStorBytes(app, key, data, filter_ops=None, bucket=None):
"""Store byte string as S3 object with given key"""

Expand Down
8 changes: 4 additions & 4 deletions tests/integ/value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3163,7 +3163,6 @@ def testIntelligentRangeGet2D(self):
chunk_dims = (50, 25) # ~5KB chunk size
chunks_per_col = dset_shape[0] // chunk_dims[0]
chunks_per_row = dset_shape[1] // chunk_dims[1]
num_chunks = chunks_per_col * chunks_per_row

# get domain
req = helper.getEndpoint() + "/"
Expand Down Expand Up @@ -3211,7 +3210,7 @@ def testIntelligentRangeGet2D(self):
# make the dataset chunk a multiple of linked chunk shape
hyper_dims = chunk_dims
chunk_dims = [chunk_dims[0] * 4, chunk_dims[1] * 4]

layout = {
"class": "H5D_CHUNKED_REF_INDIRECT",
"file_uri": file_uri,
Expand Down Expand Up @@ -3247,7 +3246,8 @@ def testIntelligentRangeGet2D(self):
start = 1234
stop = start + 10
col_index = 123
params = {"select": f"[{start}:{stop}, {col_index}]"} # read 10 element, starting at index 1234567
# read 10 element, starting at index 1234
params = {"select": f"[{start}:{stop}, {col_index}]"}
params["nonstrict"] = 1 # enable SN to invoke lambda func

# read the selection
Expand All @@ -3260,7 +3260,7 @@ def testIntelligentRangeGet2D(self):
# should get ten elements back
self.assertEqual(len(value), 10)
# data values for element (i,j) should be i*1000+j
expected = [[(start+i)*1000+col_index,] for i in range(10)]
expected = [[(start + i) * 1000 + col_index,] for i in range(10)]
self.assertEqual(value, expected)

def testIntelligentRangeGetFillValue(self):
Expand Down

0 comments on commit b4be21d

Please sign in to comment.