diff --git a/hsds/basenode.py b/hsds/basenode.py index 6fddd3f0..48286b8e 100644 --- a/hsds/basenode.py +++ b/hsds/basenode.py @@ -567,6 +567,7 @@ def baseInit(node_type): app["start_time"] = int(time.time()) # seconds after epoch app["register_time"] = 0 app["max_task_count"] = config.get("max_task_count") + app["storage_clients"] = {} # storage client drivers is_standalone = config.getCmdLineArg("standalone") diff --git a/hsds/chunk_dn.py b/hsds/chunk_dn.py index b9153b89..ad8d1895 100644 --- a/hsds/chunk_dn.py +++ b/hsds/chunk_dn.py @@ -279,7 +279,12 @@ async def PUT_Chunk(request): log.error(msg) raise HTTPInternalServerError() - input_arr = bytesToArray(input_bytes, select_dt, [num_elements, ]) + try: + input_arr = bytesToArray(input_bytes, select_dt, [num_elements, ]) + except ValueError as ve: + log.error(f"bytesToArray threw ValueError: {ve}") + raise HTTPInternalServerError() + if bcshape: input_arr = input_arr.reshape(bcshape) log.debug(f"broadcasting {bcshape} to mshape {mshape}") diff --git a/hsds/datanode_lib.py b/hsds/datanode_lib.py index 2607bbdd..83ab2f23 100644 --- a/hsds/datanode_lib.py +++ b/hsds/datanode_lib.py @@ -334,7 +334,9 @@ async def get_metadata_obj(app, obj_id, bucket=None): if not bucket: bucket = domain_bucket - if not bucket: + if bucket: + log.debug(f"get_metadata_obj - using bucket: {bucket}") + else: log.warn("get_metadata_obj - bucket is None") # don't call validateInPartition since this is used to pull in @@ -355,6 +357,7 @@ async def get_metadata_obj(app, obj_id, bucket=None): obj_json = meta_cache[obj_id] else: s3_key = getS3Key(obj_id) + log.debug(f"get_metadata_obj - using s3_key: {s3_key}") pending_s3_read = app["pending_s3_read"] if obj_id in pending_s3_read: # already a read in progress, wait for it to complete @@ -364,12 +367,10 @@ async def get_metadata_obj(app, obj_id, bucket=None): log.info(msg) store_read_timeout = float(config.get("store_read_timeout", default=2.0)) log.debug(f"store_read_timeout: {store_read_timeout}") - store_read_sleep_interval = float( - config.get("store_read_sleep_interval", default=0.1) - ) + store_read_sleep = float(config.get("store_read_sleep_interval", default=0.1)) while time.time() - read_start_time < store_read_timeout: log.debug(f"waiting for pending s3 read {s3_key}, sleeping") - await asyncio.sleep(store_read_sleep_interval) # sleep for sub-second? + await asyncio.sleep(store_read_sleep) if obj_id in meta_cache: log.info(f"object {obj_id} has arrived!") obj_json = meta_cache[obj_id] @@ -1027,13 +1028,18 @@ async def get_chunk( log.debug(f"filter_ops: {filter_ops}") if s3path: + if s3path.startswith("s3://"): + bucket = "s3://" + else: + bucket = "" try: - bucket = getBucketFromStorURI(s3path) + bucket += getBucketFromStorURI(s3path) s3key = getKeyFromStorURI(s3path) except ValueError as ve: log.error(f"Invalid URI path: {s3path} exception: {ve}") raise # raise HTTPInternalServerError() + msg = f"Using s3path bucket: {bucket} and s3key: {s3key} " msg += f"offset: {s3offset} length: {s3size}" log.debug(msg) diff --git a/hsds/util/domainUtil.py b/hsds/util/domainUtil.py index 3958527f..cc60ef2e 100644 --- a/hsds/util/domainUtil.py +++ b/hsds/util/domainUtil.py @@ -51,22 +51,103 @@ def isIPAddress(s): return True +def _stripProtocol(uri): + """ returns part of the uri or bucket name after any protocol specification: + 'xyz://' or 'https://myaccount.blob.core.windows.net/' + """ + + if not uri or uri.startswith("/"): + return uri + + n = uri.find("://") + if n < 0: + return uri + + uri = uri[(n + 3):] + parts = uri.split("/") + if len(parts) == 1: + return uri + if parts[0].endswith(".blob.core.windows.net"): + # part of the URI to indicate azure blob storage, skip it + parts = parts[1:] + return "/".join(parts) + + +def isValidBucketName(bucket): + """ + Check whether the given bucket name is valid + """ + is_valid = True + + if bucket is None: + return True + + bucket = _stripProtocol(bucket) + + # Bucket names must contain at least 1 character + if len(bucket) < 1: + is_valid = False + + # Bucket names can consist only of alphanumeric characters, underscores, dots, and hyphens + # other than + if not re.fullmatch("[a-zA-Z0-9_\\.\\-]+", bucket): + is_valid = False + + return is_valid + + def getBucketForDomain(domain): """get the bucket for the domain or None if no bucket is given """ if not domain: return None - if domain[0] == "/": + + # strip s3://, file://, etc + domain_path = _stripProtocol(domain) + if domain_path.startswith("/"): # no bucket specified return None - index = domain.find("/") - if index < 0: + + nchars = len(domain) - len(domain_path) + protocol = domain[:nchars] # save this so we can re-attach to the bucket name + + parts = domain_path.split("/") + if len(parts) < 2: # invalid domain? + msg = f"invalid domain: {domain}" + raise HTTPBadRequest(reason=msg) + bucket_name = parts[0] + if not isValidBucketName(bucket_name): return None - if not isValidBucketName(domain[:index]): + + # fit back the protocol prefix if set + if protocol: + bucket = protocol + else: + bucket = "" + bucket += bucket_name + return bucket + + +def getPathForDomain(domain): + """ + Return the non-bucket part of the domain + """ + if not domain: return None - return domain[:index] + + domain_path = _stripProtocol(domain) + if domain_path.startswith("/"): + # no bucket + return domain_path + + nindex = domain_path.find("/") + if nindex > 0: + # don't include the bucket + domain_path = domain_path[nindex:] + + return domain_path def getParentDomain(domain): @@ -161,7 +242,7 @@ def validateDomainPath(path): if len(path) < 1: raise ValueError("Domain path too short") if path == "/": - return # default buckete, root folder + return # default bucket, root folder if path[:-1].find("/") == -1: msg = "Domain path should have at least one '/' before trailing slash" raise ValueError(msg) @@ -262,25 +343,13 @@ def getDomainFromRequest(request, validate=True, allow_dns=True): pass # no bucket specified if bucket and validate: - if (bucket.find("/") >= 0) or (not isValidBucketName(bucket)): + if not isValidBucketName(bucket): raise ValueError(f"bucket name: {bucket} is not valid") if domain[0] == "/": domain = bucket + domain return domain -def getPathForDomain(domain): - """ - Return the non-bucket part of the domain - """ - if not domain: - return None - index = domain.find("/") - if index < 1: - return domain # no bucket - return domain[(index):] - - def verifyRoot(domain_json): """Throw bad request if we are expecting a domain, but got a folder instead @@ -300,23 +369,3 @@ def getLimits(): limits["max_request_size"] = int(config.get("max_request_size")) return limits - - -def isValidBucketName(bucket): - """ - Check whether the given bucket name is valid - """ - is_valid = True - - if bucket is None: - return True - - # Bucket names must contain at least 1 character - if len(bucket) < 1: - is_valid = False - - # Bucket names can consist only of alphanumeric characters, underscores, dots, and hyphens - if not re.fullmatch("[a-zA-Z0-9_\\.\\-]+", bucket): - is_valid = False - - return is_valid diff --git a/hsds/util/idUtil.py b/hsds/util/idUtil.py index f25d1cb5..68639db3 100644 --- a/hsds/util/idUtil.py +++ b/hsds/util/idUtil.py @@ -21,6 +21,43 @@ from .. import hsds_logger as log +S3_URI = "s3://" +FILE_URI = "file://" +AZURE_URI = "blob.core.windows.net/" # preceded with "https://" + + +def _getStorageProtocol(uri): + """ returns 's3://', 'file://', or 'https://...net/' prefix if present. + If the prefix is in the form: https://myaccount.blob.core.windows.net/mycontainer + (references Azure blob storage), return: https://myaccount.blob.core.windows.net/ + otherwise None """ + + if not uri: + protocol = None + elif uri.startswith(S3_URI): + protocol = S3_URI + elif uri.startswith(FILE_URI): + protocol = FILE_URI + elif uri.startswith("https://") and uri.find(AZURE_URI) > 0: + n = uri.find(AZURE_URI) + len(AZURE_URI) + protocol = uri[:n] + elif uri.find("://") >= 0: + raise ValueError(f"storage uri: {uri} not supported") + else: + protocol = None + return protocol + + +def _getBaseName(uri): + """ Return the part of the URI after the storage protocol (if any) """ + + protocol = _getStorageProtocol(uri) + if not protocol: + return uri + else: + return uri[len(protocol):] + + def getIdHash(id): """Return md5 prefix based on id value""" m = hashlib.new("md5") @@ -146,14 +183,19 @@ def getS3Key(id): Chunk ids have the chunk index added after the slash: "db/id[0:16]/d/id[16:32]/x_y_z - For domain id's return a key with the .domain suffix and no - preceeding slash + For domain id's: + Return a key with the .domain suffix and no preceeding slash. + For non-default buckets, use the format: /s3_key + If the id has a storage specifier ("s3://", "file://", etc.) + include that along with the bucket name. e.g.: "s3://mybucket/a_folder/a_file.h5" """ - if id.find("/") > 0: + + base_id = _getBaseName(id) # strip any s3://, etc. + if base_id.find("/") > 0: # a domain id domain_suffix = ".domain.json" - index = id.find("/") + 1 - key = id[index:] + index = base_id.find("/") + 1 + key = base_id[index:] if not key.endswith(domain_suffix): if key[-1] != "/": key += "/" diff --git a/hsds/util/s3Client.py b/hsds/util/s3Client.py index 698af580..3503e207 100644 --- a/hsds/util/s3Client.py +++ b/hsds/util/s3Client.py @@ -26,6 +26,8 @@ from .. import hsds_logger as log from .. import config +S3_URI = "s3://" + class S3Client: """ @@ -268,16 +270,20 @@ async def get_object(self, key, bucket=None, offset=0, length=-1): """ range = "" - if length > 0: - range = f"bytes={offset} - {offset + length - 1}" - log.info(f"storage range request: {range}") if not bucket: log.error("get_object - bucket not set") raise HTTPInternalServerError() + # remove s3:// prefix if present + if bucket.startswith(S3_URI): + bucket = bucket[len(S3_URI):] + start_time = time.time() - log.debug(f"s3Client.get_object({bucket}/{key}) start: {start_time}") + if length > 0: + range = f"bytes={offset}-{offset + length - 1}" + log.info(f"storage range request: {range}") + log.debug(f"s3Client.get_object({bucket}/{key}) range: {range} start: {start_time}") session = self._app["session"] self._renewToken() kwargs = self._get_client_kwargs() @@ -342,6 +348,10 @@ async def put_object(self, key, data, bucket=None): log.error("put_object - bucket not set") raise HTTPInternalServerError() + # remove s3:// prefix if present + if bucket.startswith(S3_URI): + bucket = bucket[len(S3_URI):] + start_time = time.time() log.debug(f"s3Client.put_object({bucket}/{key} start: {start_time}") session = self._app["session"] @@ -391,10 +401,15 @@ async def put_object(self, key, data, bucket=None): async def delete_object(self, key, bucket=None): """Deletes the object at the given key""" + if not bucket: log.error("delete_object - bucket not set") raise HTTPInternalServerError() + # remove s3:// prefix if present + if bucket.startswith(S3_URI): + bucket = bucket[len(S3_URI):] + start_time = time.time() log.debug(f"s3Client.delete_object({bucket}/{key} start: {start_time}") session = self._app["session"] @@ -437,6 +452,11 @@ async def is_object(self, key, bucket=None): if not bucket: log.error("is_object - bucket not set") raise HTTPInternalServerError() + + # remove s3:// prefix if present + if bucket.startswith(S3_URI): + bucket = bucket[len(S3_URI):] + start_time = time.time() found = False session = self._app["session"] @@ -473,6 +493,15 @@ async def is_object(self, key, bucket=None): async def get_key_stats(self, key, bucket=None): """Get ETag, size, and last modified time for given object""" + + if not bucket: + log.error("get_key_stats - bucket not set") + raise HTTPInternalServerError() + + # remove s3:// prefix if present + if bucket.startswith(S3_URI): + bucket = bucket[len(S3_URI):] + start_time = time.time() session = self._app["session"] self._renewToken() @@ -577,6 +606,11 @@ async def list_keys( if not bucket: log.error("list_keys - bucket not set") raise HTTPInternalServerError() + + # remove s3:// prefix if present + if bucket.startswith(S3_URI): + bucket = bucket[len(S3_URI):] + msg = f"list_keys('{prefix}','{deliminator}','{suffix}', " msg += f"include_stats={include_stats}, " msg += f"callback {'set' if callback is not None else 'not set'}" diff --git a/hsds/util/storUtil.py b/hsds/util/storUtil.py index 75da2449..0b3d3c6a 100644 --- a/hsds/util/storUtil.py +++ b/hsds/util/storUtil.py @@ -277,58 +277,93 @@ def _compress(data, compressor=None, level=5, shuffle=0, dtype=None, chunk_shape return data -def _getStorageClient(app): - """get storage client s3 or azure blob""" +def _getStorageDriverName(app, bucket=None): + """Return name of storage driver that is being used""" + driver = None + if bucket: + if bucket.startswith("s3://"): + driver = "S3Client" + elif bucket.startswith("file://"): + driver = "FileClient" + elif bucket.startswith("https://") and bucket.find(".blob.core.windows.net/") > 0: + driver = "AzureBlobClient" + else: + pass # will determine the default driver below + if not driver: + # select a default driver based on config settings + if config.get("root_dir"): + driver = "FileClient" + elif config.get("aws_s3_gateway"): + driver = "S3Client" + elif config.get("azure_connection_string"): + driver = "AzureBlobClient" + else: + driver = "FileClient" + return driver + + +def _getStorageClient(app, bucket=None): + """get storage client posix, or s3 or azure blob""" - if "storage_client" in app: - return app["storage_client"] + driver = _getStorageDriverName(app, bucket=bucket) - if config.get("aws_s3_gateway"): + storage_clients = app["storage_clients"] + if driver in storage_clients: + return storage_clients[driver] + + # initialize a new client + if driver == "S3Client": log.debug("_getStorageClient getting S3Client") client = S3Client(app) - elif config.get("azure_connection_string"): + elif driver == "FileClient": + log.debug("_getStorageClient getting FileClient") + client = FileClient(app) + elif driver == "AzureBlobClient": log.debug("_getStorageClient getting AzureBlobClient") client = AzureBlobClient(app) else: - log.debug("_getStorageClient getting FileClient") - client = FileClient(app) - # save client so we don't neeed to recreate each time - app["storage_client"] = client - return client + msg = f"_getStorageClient - unexpected storage driver: {driver}" + log.error(msg) + raise HTTPInternalServerError() + # save client so we don't neeed to recreate each time + storage_clients[driver] = client -def getStorageDriverName(app): - """Return name of storage driver that is being used""" - if config.get("aws_s3_gateway"): - driver = "S3Client" - elif config.get("azure_connection_string"): - driver = "AzureBlobClient" - else: - driver = "FileClient" - return driver + return client async def releaseStorageClient(app): """release the client storage connection (Used for cleanup on application exit) """ - client = _getStorageClient(app) - await client.releaseClient() - if "storage_client" in app: - del app["storage_client"] + storage_clients = app["storage_clients"] + drivers = list(storage_clients) + for driver in drivers: + log.debug(f"releasing storage client: {driver}") + client = storage_clients[driver] + await client.releaseClient() + del storage_clients[driver] def _getURIParts(uri): """return tuple of (bucket, path) for given URI""" - if uri.startswith("s3://"): - uri = uri[5:] + S3_URI = "s3://" + FILE_URI = "file://" + AZURE_URI = "blob.core.windows.net/" # preceded with "https://" + if uri.startswith(S3_URI): + uri = uri[len(S3_URI):] + elif uri.startswith(FILE_URI): + uri = uri[len(FILE_URI):] + elif uri.startswith("https://") and uri.find(AZURE_URI) > 0: + n = uri.find(AZURE_URI) + len(AZURE_URI) + uri = uri[n:] if uri.startswith("/"): raise ValueError("invalid uri") n = uri.find("/") - if n < 0: + if n <= 0: raise ValueError("invalid uri") - fields = (uri[:n], uri[n:]) + fields = (uri[:n], uri[n + 1:]) return fields @@ -336,6 +371,7 @@ def getBucketFromStorURI(uri): """Return a bucket name given a storage URI Examples: s3://mybucket/folder/object.json -> mybucket + https://myaccount.blob.core.windows.net/mybucket/folder/object.json" -> mybucket mybucket/folder/object.json -> mybucket mybucket -> ValueError # no slash /mybucket/folder/object.json -> ValueError # not expecting abs path @@ -350,8 +386,8 @@ def getBucketFromStorURI(uri): def getKeyFromStorURI(uri): """Return a key (path within a bucket) given a storage URI Examples: - s3://mybucket/folder/object.json -> mybucket - mybucket/folder/object.json -> mybucket + s3://mybucket/folder/object.json -> folder/object.json + mybucket/folder/object.json -> folder/object.json mybucket -> ValueError # no slash /mybucket/folder/object.json -> ValueError # not expecting abs path """ @@ -364,7 +400,7 @@ def getKeyFromStorURI(uri): def getURIFromKey(app, bucket=None, key=None): """ return URI for given bucket and key """ - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] if key[0] == "/": @@ -376,7 +412,7 @@ def getURIFromKey(app, bucket=None, key=None): async def getStorJSONObj(app, key, bucket=None): """Get object identified by key and read as JSON""" - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] if key[0] == "/": @@ -411,7 +447,7 @@ async def getStorBytes(app, ): """Get object identified by key and read as bytes""" - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] if key[0] == "/": @@ -533,7 +569,7 @@ async def getHyperChunks(app, async def putStorBytes(app, key, data, filter_ops=None, bucket=None): """Store byte string as S3 object with given key""" - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] if key[0] == "/": @@ -552,7 +588,7 @@ async def putStorBytes(app, key, data, filter_ops=None, bucket=None): async def putStorJSONObj(app, key, json_obj, bucket=None): """Store JSON data as storage object with given key""" - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] if key[0] == "/": @@ -569,7 +605,7 @@ async def putStorJSONObj(app, key, json_obj, bucket=None): async def deleteStorObj(app, key, bucket=None): """Delete storage object identfied by given key""" - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] if key[0] == "/": @@ -585,7 +621,7 @@ async def getStorObjStats(app, key, bucket=None): """Return etag, size, and last modified time for given object""" # TBD - will need to be refactored to handle azure responses - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] stats = {} @@ -606,7 +642,7 @@ async def getStorObjStats(app, key, bucket=None): async def isStorObj(app, key, bucket=None): """Test if the given key maps to S3 object""" found = False - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] else: @@ -630,7 +666,7 @@ async def getStorKeys( limit=None, ): # return keys matching the arguments - client = _getStorageClient(app) + client = _getStorageClient(app, bucket=bucket) if not bucket: bucket = app["bucket_name"] msg = f"getStorKeys('{prefix}','{deliminator}','{suffix}', " diff --git a/runall.sh b/runall.sh index 228fcb19..6a1a4ab2 100755 --- a/runall.sh +++ b/runall.sh @@ -143,19 +143,9 @@ fi [[ -z ${HSDS_ENDPOINT} ]] && echo "HSDS_ENDPOINT is not set" && exit 1 -if [[ ${AWS_S3_GATEWAY} ]]; then - COMPOSE_FILE="admin/docker/docker-compose.aws.yml" - echo "AWS_S3_GATEWAY set, using ${BUCKET_NAME} S3 Bucket (verify that this bucket exists)" -elif [[ ${AZURE_CONNECTION_STRING} ]]; then - COMPOSE_FILE="admin/docker/docker-compose.azure.yml" - echo "AZURE_CONNECTION_STRING set, using ${BUCKET_NAME} Azure Storage Container (verify that the container exsits)" -else +if [[ ${ROOT_DIR} ]]; then COMPOSE_FILE="admin/docker/docker-compose.posix.yml" - echo "no AWS or AZURE env set, using POSIX storage" - if [[ -z ${ROOT_DIR} ]]; then - export ROOT_DIR=$PWD/data - echo "no ROOT_DIR env set, using $ROOT_DIR directory for storage" - fi + echo "ROOT_DIR set, using POSIX storage" if [[ ! -d ${ROOT_DIR} ]]; then echo "creating directory ${ROOT_DIR}" mkdir ${ROOT_DIR} @@ -164,6 +154,15 @@ else echo "creating directory ${ROOT_DIR}/${BUCKET_NAME}" mkdir ${ROOT_DIR}/${BUCKET_NAME} fi +elif [[ ${AWS_S3_GATEWAY} ]]; then + COMPOSE_FILE="admin/docker/docker-compose.aws.yml" + echo "AWS_S3_GATEWAY set, using ${BUCKET_NAME} S3 Bucket (verify that this bucket exists)" +elif [[ ${AZURE_CONNECTION_STRING} ]]; then + COMPOSE_FILE="admin/docker/docker-compose.azure.yml" + echo "AZURE_CONNECTION_STRING set, using ${BUCKET_NAME} Azure Storage Container (verify that the container exsits)" +else + echo "no storage setting defined (set at least one of ROOT_DIR, AWS_S3_GATEWAY or AZURE_CONNECTION_STRING)" + exit 1 fi if [[ -z $AWS_IAM_ROLE ]] && [[ $AWS_S3_GATEWAY ]]; then diff --git a/tests/integ/pointsel_test.py b/tests/integ/pointsel_test.py index f4c85534..c8dda7e6 100755 --- a/tests/integ/pointsel_test.py +++ b/tests/integ/pointsel_test.py @@ -516,7 +516,7 @@ def testPostContiguousDataset(self): root_uuid = rspJson["root"] # create dataset fodr /g1/g1.1/dset1.1.2 - s3path = "s3://" + hdf5_sample_bucket + "/data/hdf5test" + "/tall.h5" + s3path = hdf5_sample_bucket + "/data/hdf5test" + "/tall.h5" data = {"type": "H5T_STD_I32BE", "shape": 20} layout = { "class": "H5D_CONTIGUOUS_REF", @@ -578,7 +578,6 @@ def testPostContiguousDataset(self): msg = f"s3object: {s3path} not found, " msg += "skipping point read chunk reference contiguous test" print(msg) - return self.assertEqual(rsp.status_code, 200) @@ -586,9 +585,8 @@ def testPostContiguousDataset(self): self.assertTrue("value" in rspJson) ret_value = rspJson["value"] self.assertEqual(len(ret_value), len(points)) - self.assertEqual( - ret_value, points - ) # get back the points since the dataset in the range 0-20 + self.assertEqual(ret_value, points) + # get back the points since the dataset in the range 0-20 # do a point selection read on dset22 req = self.endpoint + "/datasets/" + dset22_id + "/value" @@ -610,7 +608,7 @@ def testPostChunkedRefDataset(self): print("hdf5_sample_bucket config not set, skipping testChunkedRefDataset") return - s3path = "s3://" + hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" + s3path = hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" SNP500_ROWS = 3207353 snp500_json = helper.getHDF5JSON("snp500.json") @@ -618,17 +616,13 @@ def testPostChunkedRefDataset(self): print("snp500.json file not found, skipping testPostChunkedRefDataset") return - if "snp500.h5" not in snp500_json: - self.assertTrue(False) + self.assertTrue("snp500.h5" in snp500_json) - chunk_dims = [ - 60000, - ] # chunk layout used in snp500.h5 file + chunk_dims = [60000, ] # chunk layout used in snp500.h5 file chunk_info = snp500_json["snp500.h5"] dset_info = chunk_info["/dset"] - if "byteStreams" not in dset_info: - self.assertTrue(False) + self.assertTrue("byteStreams" in dset_info) byteStreams = dset_info["byteStreams"] # construct map of chunks @@ -675,9 +669,7 @@ def testPostChunkedRefDataset(self): data = { "type": datatype, - "shape": [ - SNP500_ROWS, - ], + "shape": [SNP500_ROWS, ], } layout = { "class": "H5D_CHUNKED_REF", @@ -731,7 +723,7 @@ def testPostChunkedRefIndirectDataset(self): print(msg) return - s3path = "s3://" + hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" + s3path = hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" SNP500_ROWS = 3207353 snp500_json = helper.getHDF5JSON("snp500.json") @@ -739,16 +731,14 @@ def testPostChunkedRefIndirectDataset(self): print("snp500.json file not found, skipping testChunkedRefDataset") return - if "snp500.h5" not in snp500_json: - self.assertTrue(False) + self.assertTrue("snp500.h5" in snp500_json) chunk_dims = [60000,] # chunk layout used in snp500.h5 file num_chunks = (SNP500_ROWS // chunk_dims[0]) + 1 chunk_info = snp500_json["snp500.h5"] dset_info = chunk_info["/dset"] - if "byteStreams" not in dset_info: - self.assertTrue(False) + self.assertTrue("byteStreams" in dset_info) byteStreams = dset_info["byteStreams"] self.assertEqual(len(byteStreams), num_chunks) @@ -777,9 +767,7 @@ def testPostChunkedRefIndirectDataset(self): chunkinfo_type = {"class": "H5T_COMPOUND", "fields": fields} req = self.endpoint + "/datasets" # Store 40 chunk locations - chunkinfo_dims = [ - num_chunks, - ] + chunkinfo_dims = [num_chunks, ] payload = {"type": chunkinfo_type, "shape": chunkinfo_dims} req = self.endpoint + "/datasets" rsp = self.session.post(req, data=json.dumps(payload), headers=headers) @@ -832,9 +820,7 @@ def testPostChunkedRefIndirectDataset(self): data = { "type": datatype, - "shape": [ - SNP500_ROWS, - ], + "shape": [SNP500_ROWS, ], } layout = { "class": "H5D_CHUNKED_REF_INDIRECT", diff --git a/tests/integ/query_test.py b/tests/integ/query_test.py index 965f3104..437ae021 100644 --- a/tests/integ/query_test.py +++ b/tests/integ/query_test.py @@ -273,7 +273,7 @@ def testChunkedRefIndirectDataset(self): ) return - s3path = "s3://" + hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" + s3path = hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" SNP500_ROWS = 3207353 snp500_json = helper.getHDF5JSON("snp500.json") @@ -281,16 +281,14 @@ def testChunkedRefIndirectDataset(self): print("snp500.json file not found, skipping testChunkedRefDataset") return - if "snp500.h5" not in snp500_json: - self.assertTrue(False) + self.assertTrue("snp500.h5" in snp500_json) chunk_dims = [60000, ] # chunk layout used in snp500.h5 file num_chunks = (SNP500_ROWS // chunk_dims[0]) + 1 chunk_info = snp500_json["snp500.h5"] dset_info = chunk_info["/dset"] - if "byteStreams" not in dset_info: - self.assertTrue(False) + self.assertTrue("byteStreams" in dset_info) byteStreams = dset_info["byteStreams"] self.assertEqual(len(byteStreams), num_chunks) @@ -319,9 +317,7 @@ def testChunkedRefIndirectDataset(self): chunkinfo_type = {"class": "H5T_COMPOUND", "fields": fields} req = self.endpoint + "/datasets" # Store 40 chunk locations - chunkinfo_dims = [ - num_chunks, - ] + chunkinfo_dims = [num_chunks, ] payload = {"type": chunkinfo_type, "shape": chunkinfo_dims} req = self.endpoint + "/datasets" rsp = self.session.post(req, data=json.dumps(payload), headers=headers) @@ -374,9 +370,7 @@ def testChunkedRefIndirectDataset(self): data = { "type": datatype, - "shape": [ - SNP500_ROWS, - ], + "shape": [SNP500_ROWS, ], } layout = { "class": "H5D_CHUNKED_REF_INDIRECT", diff --git a/tests/integ/vlen_test.py b/tests/integ/vlen_test.py index 708e2e8e..4fdc2dcc 100755 --- a/tests/integ/vlen_test.py +++ b/tests/integ/vlen_test.py @@ -664,8 +664,6 @@ def testPutVLenCompoundBinary(self): rsp = self.session.get(req, headers=headers_bin_rsp) self.assertEqual(rsp.status_code, 200) self.assertEqual(rsp.headers["Content-Type"], "application/octet-stream") - for k in rsp.headers: - print(f"{k}: {rsp.headers[k]}") data = rsp.content self.assertEqual(len(data), 192) arr_rsp = bytesToArray(data, dt_compound, [count,]) diff --git a/tests/unit/domain_util_test.py b/tests/unit/domain_util_test.py index 17b90066..b3b01096 100755 --- a/tests/unit/domain_util_test.py +++ b/tests/unit/domain_util_test.py @@ -19,7 +19,7 @@ isValidDomainPath, getBucketForDomain, getPathForDomain, - isValidBucketName + isValidBucketName, ) @@ -52,7 +52,11 @@ def testValidDomain(self): for domain in invalid_domains: self.assertFalse(isValidDomain(domain)) - valid_domains = ("/gov/nasa/nex", "/home") + azure_path = "https://myaccount.blob.core.windows.net//home" + s3_path = "s3://mybucket/home" + file_path = "file://mybucket/home" + + valid_domains = ("/gov/nasa/nex", "/home", s3_path, file_path, azure_path) for domain in valid_domains: self.assertTrue(isValidDomain(domain)) @@ -121,6 +125,24 @@ def testGetDomainFragments(self): bucket = getBucketForDomain(domain) self.assertEqual(bucket, "mybucket") + domain = "s3://nasaeos/gov/nasa/nex/climate.h5" + domain_path = getPathForDomain(domain) + self.assertEqual("/gov/nasa/nex/climate.h5", domain_path) + bucket = getBucketForDomain(domain) + self.assertEqual(bucket, "s3://nasaeos") + + domain = "file://nasaeos/gov/nasa/nex/climate.h5" + domain_path = getPathForDomain(domain) + self.assertEqual("/gov/nasa/nex/climate.h5", domain_path) + bucket = getBucketForDomain(domain) + self.assertEqual(bucket, "file://nasaeos") + + domain = "https://myaccount.blob.core.windows.net/nasaeos/gov/nasa/nex/climate.h5" + domain_path = getPathForDomain(domain) + self.assertEqual("/gov/nasa/nex/climate.h5", domain_path) + bucket = getBucketForDomain(domain) + self.assertEqual(bucket, "https://myaccount.blob.core.windows.net/nasaeos") + def testIsValidBucketName(self): # Illegal characters self.assertFalse(isValidBucketName("bucket;")) @@ -130,6 +152,7 @@ def testIsValidBucketName(self): self.assertFalse(isValidBucketName("bucket ")) self.assertFalse(isValidBucketName("bucket>")) self.assertFalse(isValidBucketName("")) + self.assertFalse(isValidBucketName("s3:/mybucket")) self.assertTrue(isValidBucketName("bucket")) self.assertTrue(isValidBucketName("bucket_")) @@ -143,6 +166,9 @@ def testIsValidBucketName(self): self.assertTrue(isValidBucketName("buck-et")) self.assertTrue(isValidBucketName("bucket-1.bucket1-.1")) + self.assertTrue(isValidBucketName("s3://mybucket")) + self.assertTrue(isValidBucketName("file://mybucket")) + if __name__ == "__main__": # setup test files diff --git a/tests/unit/id_util_test.py b/tests/unit/id_util_test.py index cb2c2005..06f974c4 100755 --- a/tests/unit/id_util_test.py +++ b/tests/unit/id_util_test.py @@ -43,38 +43,78 @@ def testCreateObjId(self): pass # expected def testIsValidUuid(self): - group_id = "g-314d61b8-9954-11e6-a733-3c15c2da029e" - dataset_id = "d-4c48f3ae-9954-11e6-a3cd-3c15c2da029e" - ctype_id = "t-8c785f1c-9953-11e6-9bc2-0242ac110005" - chunk_id = "c-8c785f1c-9953-11e6-9bc2-0242ac110005_7_2" + group1_id = "g-314d61b8-9954-11e6-a733-3c15c2da029e" # orig schema + group2_id = "g-314d61b8-995411e6-a733-3c15c2-da029e" + root_id = "g-f9aaa28e-d42e10e5-7122-2a065c-a6986d" + dataset1_id = "d-4c48f3ae-9954-11e6-a3cd-3c15c2da029e" # orig schema + dataset2_id = "d-4c48f3ae-995411e6-a3cd-3c15c2-da029e" + ctype1_id = "t-8c785f1c-9953-11e6-9bc2-0242ac110005" # orig schema + ctype2_id = "t-8c785f1c-995311e6-9bc2-0242ac-110005" + chunk1_id = "c-8c785f1c-9953-11e6-9bc2-0242ac110005_7_2" # orig schema + chunk2_id = "c-8c785f1c-995311e6-9bc2-0242ac-110005_7_2" domain_id = "mybucket/bob/mydata.h5" - valid_ids = (group_id, dataset_id, ctype_id, chunk_id, domain_id) + s3_domain_id = "s3://mybucket/bob/mydata.h5" + file_domain_id = "file://mybucket/bob/mydata.h5" + azure_domain_id = "https://myaccount.blob.core.windows.net/mybucket/bob/mydata.h5" + valid_id_map = { + group1_id: "a49be-g-314d61b8-9954-11e6-a733-3c15c2da029e", + group2_id: "db/314d61b8-995411e6/g/a733-3c15c2-da029e/.group.json", + dataset1_id: "26928-d-4c48f3ae-9954-11e6-a3cd-3c15c2da029e", + dataset2_id: "db/4c48f3ae-995411e6/d/a3cd-3c15c2-da029e/.dataset.json", + ctype1_id: "5a9cf-t-8c785f1c-9953-11e6-9bc2-0242ac110005", + ctype2_id: "db/8c785f1c-995311e6/t/9bc2-0242ac-110005/.datatype.json", + chunk1_id: "dc4ce-c-8c785f1c-9953-11e6-9bc2-0242ac110005_7_2", + chunk2_id: "db/8c785f1c-995311e6/d/9bc2-0242ac-110005/7_2", + domain_id: "bob/mydata.h5/.domain.json", + s3_domain_id: "bob/mydata.h5/.domain.json", + file_domain_id: "bob/mydata.h5/.domain.json", + azure_domain_id: "bob/mydata.h5/.domain.json", } + bad_ids = ("g-1e76d862", "/bob/mydata.h5") - self.assertTrue(isValidUuid(group_id)) - self.assertFalse(isSchema2Id(group_id)) - self.assertTrue(isValidUuid(group_id, obj_class="Group")) - self.assertTrue(isValidUuid(group_id, obj_class="group")) - self.assertTrue(isValidUuid(group_id, obj_class="groups")) - self.assertTrue(isValidUuid(dataset_id, obj_class="datasets")) - self.assertFalse(isSchema2Id(dataset_id)) - self.assertTrue(isValidUuid(ctype_id, obj_class="datatypes")) - self.assertFalse(isSchema2Id(ctype_id)) - self.assertTrue(isValidUuid(chunk_id, obj_class="chunks")) - self.assertFalse(isSchema2Id(chunk_id)) - validateUuid(group_id) + self.assertTrue(isValidUuid(group1_id)) + self.assertFalse(isSchema2Id(group1_id)) + self.assertTrue(isValidUuid(group1_id, obj_class="Group")) + self.assertTrue(isValidUuid(group1_id, obj_class="group")) + self.assertTrue(isValidUuid(group1_id, obj_class="groups")) + self.assertTrue(isSchema2Id(root_id)) + self.assertTrue(isValidUuid(root_id, obj_class="Group")) + self.assertTrue(isValidUuid(root_id, obj_class="group")) + self.assertTrue(isValidUuid(root_id, obj_class="groups")) + self.assertTrue(isRootObjId(root_id)) + self.assertTrue(isValidUuid(dataset1_id, obj_class="datasets")) + self.assertFalse(isSchema2Id(dataset1_id)) + self.assertTrue(isValidUuid(ctype1_id, obj_class="datatypes")) + self.assertFalse(isSchema2Id(ctype1_id)) + self.assertTrue(isValidUuid(chunk1_id, obj_class="chunks")) + self.assertFalse(isSchema2Id(chunk1_id)) + self.assertTrue(isValidUuid(group2_id)) + self.assertTrue(isSchema2Id(group2_id)) + self.assertTrue(isValidUuid(group2_id, obj_class="Group")) + self.assertTrue(isValidUuid(group2_id, obj_class="group")) + self.assertTrue(isValidUuid(group2_id, obj_class="groups")) + self.assertFalse(isRootObjId(group2_id)) + self.assertTrue(isValidUuid(dataset2_id, obj_class="datasets")) + self.assertTrue(isSchema2Id(dataset2_id)) + self.assertTrue(isValidUuid(ctype2_id, obj_class="datatypes")) + self.assertTrue(isSchema2Id(ctype2_id)) + self.assertTrue(isValidUuid(chunk2_id, obj_class="chunks")) + self.assertTrue(isSchema2Id(chunk2_id)) + validateUuid(group1_id) try: - isRootObjId(group_id) + isRootObjId(group1_id) self.assertTrue(False) except ValueError: # only works for v2 schema pass # expected - for item in valid_ids: + for item in valid_id_map: self.assertTrue(isObjId(item)) s3key = getS3Key(item) self.assertTrue(s3key[0] != "/") self.assertTrue(isS3ObjKey(s3key)) + expected = valid_id_map[item] + self.assertEqual(s3key, expected) if item.find("/") > 0: continue # bucket name gets lost when domain ids get converted to s3keys objid = getObjId(s3key) diff --git a/tests/unit/stor_util_test.py b/tests/unit/stor_util_test.py index 77aa6671..c0074bf2 100755 --- a/tests/unit/stor_util_test.py +++ b/tests/unit/stor_util_test.py @@ -23,7 +23,7 @@ from hsds.util.storUtil import getStorJSONObj, putStorJSONObj, putStorBytes from hsds.util.storUtil import getStorBytes, isStorObj from hsds.util.storUtil import getStorObjStats, getStorKeys, releaseStorageClient -from hsds.util.storUtil import getStorageDriverName +from hsds.util.storUtil import _getStorageDriverName, getBucketFromStorURI, getKeyFromStorURI class StorUtilTest(unittest.TestCase): @@ -31,10 +31,29 @@ def __init__(self, *args, **kwargs): super(StorUtilTest, self).__init__(*args, **kwargs) # main + def s3path_test(self): + uri = "s3://mybucket/afolder/afile.h5" + self.assertEqual(getBucketFromStorURI(uri), "mybucket") + self.assertEqual(getKeyFromStorURI(uri), "afolder/afile.h5") + uri = "file://mybucket/afolder/afile.h5" + self.assertEqual(getBucketFromStorURI(uri), "mybucket") + self.assertEqual(getKeyFromStorURI(uri), "afolder/afile.h5") + uri = "https://myaccount.blob.core.windows.net/mybucket/afolder/afile.h5" + self.assertEqual(getBucketFromStorURI(uri), "mybucket") + self.assertEqual(getKeyFromStorURI(uri), "afolder/afile.h5") + async def stor_util_test(self, app): + default_storage_driver = _getStorageDriverName(app) + print(f"Default storage driver: {default_storage_driver}") + uri = "mybucket/afolder/afile.h5" + self.assertEqual(_getStorageDriverName(app, bucket=uri), default_storage_driver) + uri = "s3://mybucket/afolder/afile.h5" + self.assertEqual(_getStorageDriverName(app, bucket=uri), "S3Client") + uri = "file://mybucket/afolder/afile.h5" + self.assertEqual(_getStorageDriverName(app, bucket=uri), "FileClient") + uri = "https://myaccount.blob.core.windows.net/mybucket/afolder/afile.h5" + self.assertEqual(_getStorageDriverName(app, bucket=uri), "AzureBlobClient") - storage_driver = getStorageDriverName(app) - print(f"Using storage driver: {storage_driver}") try: await getStorKeys(app) except HTTPNotFound: @@ -104,7 +123,6 @@ async def stor_util_test(self, app): for i in range(nchars): bucket_name[i] = ord("a") + random.randint(0, 25) bucket_name = bucket_name.decode("ascii") - print("bucket name:", bucket_name) try: await getStorBytes(app, f"{key_folder}/bogus", bucket=bucket_name) @@ -145,8 +163,6 @@ async def stor_util_test(self, app): # list keys in folder - get all subkeys key_list = await getStorKeys(app, prefix=key_folder + "/", deliminator="") - for key in key_list: - print("got key:", key) self.assertEqual(len(key_list), 7) self.assertTrue(f"{key_folder}/obj_json_1" in key_list) @@ -159,8 +175,6 @@ async def stor_util_test(self, app): # get just sub-folders key_list = await getStorKeys(app, prefix=key_folder + "/", deliminator="/") - for key in key_list: - print("got delim key:", key) self.assertEqual(len(key_list), 1) self.assertTrue(f"{subkey_folder}/" in key_list) @@ -172,10 +186,6 @@ async def stor_util_test(self, app): # get keys with obj etag, size, last modified key_dict = await getStorKeys(app, prefix=key_folder + "/", include_stats=True) - for k in key_dict: - v = key_dict[k] - print(f"{k}: {v}") - now = time.time() for k in key_dict: v = key_dict[k] @@ -214,9 +224,9 @@ async def stor_util_test(self, app): await releaseStorageClient(app) def testStorUtil(self): + # run synchronus tests + self.s3path_test() - cors_domain = config.get("cors_domain") - print(f"cors_domain: [{cors_domain}]") bucket = config.get("hsds_unit_test_bucket") if not bucket: msg = "No bucket configured, create bucket and export " @@ -227,11 +237,12 @@ def testStorUtil(self): # we need to setup a asyncio loop to query s3 loop = asyncio.get_event_loop() - session = get_session(loop=loop) + session = get_session() app = {} app["session"] = session app["bucket_name"] = bucket + app["storage_clients"] = {} app["loop"] = loop loop.run_until_complete(self.stor_util_test(app))