diff --git a/hsds/async_lib.py b/hsds/async_lib.py index 36e41e43..f5f45d9b 100755 --- a/hsds/async_lib.py +++ b/hsds/async_lib.py @@ -165,7 +165,6 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None): chunktable_type_json = chunktable_json["type"] chunktable_dt = createDataType(chunktable_type_json) filters = getFilters(chunktable_json) - log.debug(f"tbd chunktable_json: {chunktable_json}") kwargs = {"dtype": chunktable_dt, "chunk_shape": dims} chunktable_filter_ops = getFilterOps(app, chunktable_id, filters, **kwargs) diff --git a/hsds/chunk_crawl.py b/hsds/chunk_crawl.py index 13b04bbf..015a78b5 100755 --- a/hsds/chunk_crawl.py +++ b/hsds/chunk_crawl.py @@ -197,7 +197,6 @@ async def read_chunk_hyperslab( return chunk_info = chunk_map[chunk_id] log.debug(f"using chunk_map entry for {chunk_id}") - log.debug(f"tbd: chunk_info: {chunk_info}") if "points" in chunk_info: points = chunk_info["points"] log.debug(f"chunkinfo {len(points)} points") @@ -291,15 +290,7 @@ async def read_chunk_hyperslab( # convert to colon seperated string hyper_dims = ":".join(map(str, hyper_dims)) params["hyper_dims"] = hyper_dims - """ - if "hyper_index" in chunk_info: - hyper_index = chunk_info["hyper_index"] - if isinstance(hyper_index, list): - # convert to colon seperated string - hyper_index = ":".join(map(str, hyper_index)) - params["hyper_index"] = hyper_index - log.debug(f"tbd: hyper_index_param: {hyper_index}") - """ + if len(select_dtype) < len(dset_dt): # field selection, pass in the field names fields_param = ":".join(select_dtype.names) diff --git a/hsds/chunk_dn.py b/hsds/chunk_dn.py index 60e1169b..fa5da378 100644 --- a/hsds/chunk_dn.py +++ b/hsds/chunk_dn.py @@ -462,8 +462,6 @@ async def GET_Chunk(request): kwargs["s3size"] = s3size if hyper_dims: kwargs["hyper_dims"] = hyper_dims - #if hyper_index: - # kwargs["hyper_index"] = hyper_index else: kwargs["bucket"] = bucket diff --git a/hsds/datanode_lib.py b/hsds/datanode_lib.py index 868d6d17..ac2453a1 100644 --- a/hsds/datanode_lib.py +++ b/hsds/datanode_lib.py @@ -801,7 +801,6 @@ async def run_chunk_initializer( return chunk_arr - async def get_chunk_bytes( app, s3key, @@ -888,14 +887,14 @@ async def get_chunk_bytes( msg = f"get_chunk_bytes - got more than expected hyperchunks: {num_chunks}" log.warn(msg) raise HTTPBadRequest(reason=msg) - + # create an numpy array for the hsds chunk and arrange h5 chunks within it if fill_value is not None: chunk_arr = np.empty(chunk_dims, dtype=dtype, order="C") chunk_arr[...] = fill_value else: chunk_arr = np.zeros(chunk_dims, dtype=dtype, order="C") - + # create a list of the hyperchunks to be fetched chunk_list = [] for i in range(num_chunks): @@ -903,9 +902,7 @@ async def get_chunk_bytes( # ignore empty range get requests continue hyper_index = getHyperChunkIndex(i, table_factors) - log.debug(f"tbd: got hyper_index: {hyper_index}") chunk_location = ChunkLocation(hyper_index, offset[i], length[i]) - log.debug(f"add ChunkLocation : {chunk_location}") chunk_list.append(chunk_location) if len(chunk_list) == 0: @@ -929,7 +926,7 @@ async def get_chunk_bytes( log.debug(f"getStorBytes processing chunk_locations {chunk_locations}") # get the byte range we'll read from storage - + kwargs = { "filter_ops": filter_ops, "chunk_locations": chunk_locations, @@ -980,8 +977,7 @@ async def get_chunk( log.debug(f" s3path: {s3path} s3offset: {s3offset} s3size: {s3size}") if hyper_dims is not None: log.debug(f" hyper_dims: {hyper_dims}") - #if hyper_index is not None: - # log.debug(f" hyper_index: {hyper_index}") + chunk_cache = app["chunk_cache"] if chunk_init and s3offset > 0: msg = f"unable to initialize chunk {chunk_id} for reference layouts " diff --git a/hsds/dset_lib.py b/hsds/dset_lib.py index 3c05b136..05578100 100755 --- a/hsds/dset_lib.py +++ b/hsds/dset_lib.py @@ -68,11 +68,23 @@ def getFillValue(dset_json): def _get_arr_pts(arr_points, arr_index, pt, dim=0, chunk_index=None, factors=None): - """ recursive function to fill in chunk locations for a hyperchunk """ + """ recursive function to fill in chunk locations for hyperchunk selection. + arr_points: numpy array of shape (num_chunks, rank) + arr_index: row of arr_points for the first hyper chunk pt + pt: list of [0,] * rank - one element will get set for each recursive call + dim: the current dimension - only set in recursion + chunk_index: the HSDS chunk index + factors: the hyper scaling factors + + on return arr_points rows arr_index[arr_index*N:(arr_index+1)*N-1] will be + set to the values needed to do a point selection on the chunk table, where + N is the ratioo of chunks to hyperchunks - np.prod(factors) + + """ log.debug(f"get_arr_pts = arr_index: {arr_index}, dim: {dim}, pt: {pt}") log.debug(f"chunk_index: {chunk_index}") - + index = chunk_index[dim] factor = factors[dim] rank = len(chunk_index) @@ -80,16 +92,13 @@ def _get_arr_pts(arr_points, arr_index, pt, dim=0, chunk_index=None, factors=Non pt[dim] = (index * factor + i) if dim + 1 == rank: idx = int(arr_index) + i - #print(f"idx: {idx}, type: {type(idx)}") - #print("arr_points.shape:", arr_points.shape) - #print("pt:", pt) if rank == 1: arr_points[idx] = pt[0] # need to set 1d arrays with an int index else: - arr_points[idx] = pt + arr_points[idx] = pt else: - kwargs = {"dim": dim+1, "chunk_index": chunk_index, "factors": factors} - next_index = arr_index + i*np.prod(factors[1:]) + kwargs = {"dim": dim + 1, "chunk_index": chunk_index, "factors": factors} + next_index = arr_index + (i * np.prod(factors[1:])) _get_arr_pts(arr_points, next_index, pt, **kwargs) @@ -193,10 +202,8 @@ def getChunkItem(chunkid): log.debug(f"cpl layout: {layout}") s3path = layout["file_uri"] chunks = layout["chunks"] - log.debug(f"tbd: chunks: {chunks}") for chunk_id in chunk_ids: chunk_item = getChunkItem(chunk_id) - log.debug(f"tbd: {chunk_id} chunk_item: {chunk_item}") s3offset = 0 s3size = 0 chunk_key = getChunkSuffix(chunk_id) @@ -240,7 +247,7 @@ def getChunkItem(chunkid): log.debug(f"default_chunktable_dims: {default_chunktable_dims}") if "hyper_dims" in layout: hyper_dims = layout["hyper_dims"] - + else: # assume 1 to 1 matching hyper_dims = chunk_dims @@ -260,21 +267,19 @@ def getChunkItem(chunkid): msg += f" of {chunk_dims[dim]}" log.warn(msg) raise HTTPBadRequest(reason=msg) - + ref_num_chunks = num_chunks * np.prod(table_factors) - + log.debug(f"ref_num_chunks: {ref_num_chunks}") log.debug(f"hyper_dims: {hyper_dims}") arr_points_shape = (ref_num_chunks, rank) arr_points = np.zeros(arr_points_shape, dtype=np.dtype("u8")) - log.debug(f"tbd: arr_points: {arr_points}") - + if ref_num_chunks == num_chunks: for i in range(num_chunks): chunk_id = chunk_ids[i] indx = getChunkIndex(chunk_id) - log.debug(f"tbd: chunkindex: {indx}") arr_points[i] = indx else: # hyper chunking @@ -286,11 +291,9 @@ def getChunkItem(chunkid): arr_index = i * chunks_per_hyperchunk kwargs = {"chunk_index": chunk_index, "factors": table_factors} _get_arr_pts(arr_points, arr_index, pt, **kwargs) - log.debug(f"tbd: arr_points: {arr_points}") - + msg = f"got chunktable - {len(arr_points)} entries, calling getSelectionData" log.debug(msg) - log.debug(f"tbd: arr_points: {arr_points}") # this call won't lead to a circular loop of calls since we've checked # that the chunktable layout is not H5D_CHUNKED_REF_INDIRECT kwargs = {"points": arr_points, "bucket": bucket} @@ -331,23 +334,16 @@ def getChunkItem(chunkid): factor = ref_num_chunks // num_chunks s3offsets = [] s3sizes = [] - # hyper_index = [] - index_base = chunk_index[0] - log.debug(f"tbd - index_base: {index_base}") for j in range(factor): item = point_data[i * factor + j] s3offset = int(item[0]) s3offsets.append(s3offset) s3size = int(item[1]) - s3sizes.append(s3size) - # index = int(arr_points[i * factor + j]) % index_base - #index = getHyperChunkIndex(j, table_factors) - # hyper_index.append(index) + s3sizes.append(s3size) + chunk_item["s3offset"] = s3offsets chunk_item["s3size"] = s3sizes chunk_item["hyper_dims"] = hyper_dims - # chunk_item["hyper_index"] = hyper_index - log.debug(f"tbd - chunk_item: {chunk_item}") else: log.error(f"Unexpected chunk layout: {layout['class']}") @@ -358,7 +354,7 @@ def getChunkItem(chunkid): def get_chunkmap_selections(chunk_map, chunk_ids, slices, dset_json): - """Update chunk_map with chunk and data selections for the + """ Update chunk_map with chunk and data selections for the given set of slices """ log.debug(f"get_chunkmap_selections - {len(chunk_ids)} chunk_ids") diff --git a/hsds/util/rangegetUtil.py b/hsds/util/rangegetUtil.py index 6740118f..877b5d99 100644 --- a/hsds/util/rangegetUtil.py +++ b/hsds/util/rangegetUtil.py @@ -20,6 +20,7 @@ def _chunk_start(c): start = c.offset return start + def getHyperChunkFactors(chunk_dims, hyper_dims): """ return list of rations betwen chunk and hyperchunkdims """ @@ -36,21 +37,21 @@ def getHyperChunkFactors(chunk_dims, hyper_dims): factors.append(factor) return factors + def getHyperChunkIndex(i, factors): - """ return index of ith hyperchunk based on the chunk factors + """ return index of ith hyperchunk based on the chunk factors e.g. for factors: [2,3,4], the 5th index will be: 0_1_1 """ rank = len(factors) index = [] for dim in range(rank): - factor = int(np.prod(factors[(dim+1):])) - n = (i // factor) % factors[dim] + factor = int(np.prod(factors[(dim + 1):])) + n = (i // factor) % factors[dim] index.append(n) return tuple(index) - def _chunk_end(c): """ return end of byte range for given chunk or chunk list """ end = None diff --git a/hsds/util/storUtil.py b/hsds/util/storUtil.py index a92a6fa2..75da2449 100644 --- a/hsds/util/storUtil.py +++ b/hsds/util/storUtil.py @@ -461,7 +461,7 @@ async def getStorBytes(app, continue # add to the list chunk_bytes.append(h5_bytes) - + return chunk_bytes elif filter_ops: # uncompress and return @@ -472,14 +472,14 @@ async def getStorBytes(app, async def getHyperChunks(app, - key, - chunk_arr=None, - hyper_dims=None, - filter_ops=None, - chunk_locations=None, - bucket=None - ): - + key, + chunk_arr=None, + hyper_dims=None, + filter_ops=None, + chunk_locations=None, + bucket=None + ): + min_offset = None max_offset = None rank = len(chunk_arr.shape) @@ -501,39 +501,35 @@ async def getHyperChunks(app, log.debug(f"getHyperChunks: read {len(data)} bytes") if len(data) < item_length: log.warn(f"getHyperChunks, requested: {item_length}, but got: {len(data)} bytes") - + # slot in the data for item in chunk_locations: - log.debug(f"tbd - got item: {item}") chunk_offset = item.offset - min_offset if chunk_offset + item.length > len(data): # edge chunk chunk_size = len(data) - chunk_offset h5_bytes = bytearray(h5_size) - h5_bytes[:chunk_size] = data[chunk_offset:chunk_offset+chunk_size] + h5_bytes[:chunk_size] = data[chunk_offset:chunk_offset + chunk_size] else: - h5_bytes = data[chunk_offset: chunk_offset+item.length] + h5_bytes = data[chunk_offset:chunk_offset + item.length] if filter_ops: h5_bytes = _uncompress(h5_bytes, **filter_ops) hyper_chunk = np.frombuffer(h5_bytes, dtype=chunk_arr.dtype) hyper_chunk = hyper_chunk.reshape(hyper_dims) - log.debug(f"tbd - hyper_dims: {hyper_dims}") hyper_index = item.index slices = [] for i in range(rank): extent = hyper_dims[i] index = hyper_index[i] - log.debug(f"getting slice for hyperchunk: {index} with extent: {extent}") start = extent * index end = start + extent s = slice(start, end, 1) - log.debug(f"tbd - adding slice: {s}") slices.append(s) slices = tuple(slices) # need tuple to use as numpy index - log.debug(f"tbd - hyper index {hyper_index} slice: {slices}") chunk_arr[slices] = hyper_chunk[...] log.debug(f"read {len(chunk_locations)} hyperchunks") - + + async def putStorBytes(app, key, data, filter_ops=None, bucket=None): """Store byte string as S3 object with given key""" diff --git a/tests/integ/value_test.py b/tests/integ/value_test.py index 691cc8a0..99f7a30b 100755 --- a/tests/integ/value_test.py +++ b/tests/integ/value_test.py @@ -3163,7 +3163,6 @@ def testIntelligentRangeGet2D(self): chunk_dims = (50, 25) # ~5KB chunk size chunks_per_col = dset_shape[0] // chunk_dims[0] chunks_per_row = dset_shape[1] // chunk_dims[1] - num_chunks = chunks_per_col * chunks_per_row # get domain req = helper.getEndpoint() + "/" @@ -3211,7 +3210,7 @@ def testIntelligentRangeGet2D(self): # make the dataset chunk a multiple of linked chunk shape hyper_dims = chunk_dims chunk_dims = [chunk_dims[0] * 4, chunk_dims[1] * 4] - + layout = { "class": "H5D_CHUNKED_REF_INDIRECT", "file_uri": file_uri, @@ -3247,7 +3246,8 @@ def testIntelligentRangeGet2D(self): start = 1234 stop = start + 10 col_index = 123 - params = {"select": f"[{start}:{stop}, {col_index}]"} # read 10 element, starting at index 1234567 + # read 10 element, starting at index 1234 + params = {"select": f"[{start}:{stop}, {col_index}]"} params["nonstrict"] = 1 # enable SN to invoke lambda func # read the selection @@ -3260,7 +3260,7 @@ def testIntelligentRangeGet2D(self): # should get ten elements back self.assertEqual(len(value), 10) # data values for element (i,j) should be i*1000+j - expected = [[(start+i)*1000+col_index,] for i in range(10)] + expected = [[(start + i) * 1000 + col_index,] for i in range(10)] self.assertEqual(value, expected) def testIntelligentRangeGetFillValue(self):