From 302105ca7738d3f9c1e0c8dea2f55872e18e640d Mon Sep 17 00:00:00 2001 From: John Readey Date: Thu, 31 Aug 2023 15:46:03 +0200 Subject: [PATCH] Azurefix (#258) * make azure-storage-blob default dependency * fix walk_blobs for azure driver * don't stop iteration at max_blobs_count * update quick start to use --nolint option --- README.md | 2 +- hsds/domain_sn.py | 8 ++-- hsds/util/azureBlobClient.py | 76 ++---------------------------------- pyproject.toml | 7 ++-- 4 files changed, 12 insertions(+), 81 deletions(-) diff --git a/README.md b/README.md index 0616fbdf..c11b2fc9 100755 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ In addition, HSDS can be run in serverless mode with AWS Lambda or h5pyd local m Make sure you have Python 3 and Pip installed, then: -1. Run install: `$ ./build.sh` from source tree OR install from pypi: `$ pip install hsds` +1. Run install: `$ ./build.sh --nolint` from source tree OR install from pypi: `$ pip install hsds` 2. Create a directory the server will use to store data, example: `$ mkdir ~/hsds_data` 3. Start server: `$ hsds --root_dir ~/hsds_data` 4. Run the test suite. In a separate terminal run: diff --git a/hsds/domain_sn.py b/hsds/domain_sn.py index ee82e28b..05ad5cdf 100755 --- a/hsds/domain_sn.py +++ b/hsds/domain_sn.py @@ -526,7 +526,7 @@ async def get_domains(request): domainNames = domainNames.split(",") else: s3prefix = prefix[1:] - log.debug(f"get_domains - listing S3 keys for {s3prefix}") + log.debug(f"get_domains - listing keys for {s3prefix}") kwargs = { "include_stats": False, "prefix": s3prefix, @@ -534,7 +534,7 @@ async def get_domains(request): "bucket": bucket, } s3keys = await getStorKeys(app, **kwargs) - log.debug(f"get_domains - getS3Keys returned: {len(s3keys)} keys") + log.debug(f"get_domains - getStorKeys returned: {len(s3keys)} keys") for s3key in s3keys: if s3key[-1] != "/": @@ -543,7 +543,7 @@ async def get_domains(request): if len(s3key) > 1 and s3key[-2] == "/": # trim off double slash s3key = s3key[:-1] - log.debug(f"get_domains - got s3key: {s3key}") + log.debug(f"get_domains - got key: {s3key}") domain = "/" + s3key[:-1] if pattern: # do a pattern match on the basename @@ -1353,7 +1353,7 @@ async def DELETE_Domain(request): index = domain.find("/") nlen = index + 1 s3prefix = domain[nlen:] + "/" - log.info(f"checking s3key with prefix: {s3prefix} in bucket: {bucket}") + log.info(f"checking key with prefix: {s3prefix} in bucket: {bucket}") kwargs = { "include_stats": False, "prefix": s3prefix, diff --git a/hsds/util/azureBlobClient.py b/hsds/util/azureBlobClient.py index f035867a..e8c6313d 100644 --- a/hsds/util/azureBlobClient.py +++ b/hsds/util/azureBlobClient.py @@ -3,7 +3,6 @@ import datetime import time from azure.storage.blob.aio import BlobServiceClient -from azure.storage.blob import BlobPrefix from azure.core.exceptions import AzureError from .. import hsds_logger as log @@ -380,6 +379,7 @@ async def walk_blobs( deliminator="/", callback=None, ): + key_names = {} if include_stats else [] continuation_token = None count = 0 while True: @@ -389,7 +389,6 @@ async def walk_blobs( "results_per_page": CALLBACK_MAX_COUNT, } keyList = client.walk_blobs(**kwargs).by_page(continuation_token) - key_names = {} if include_stats else [] async for key in await keyList.__anext__(): key_name = key["name"] log.debug(f"walk_blobs got: {key_name}") @@ -411,8 +410,6 @@ async def walk_blobs( log.debug("skip name thaat doesn't end in '/'") # only return folders continue - if len(key_names) >= CALLBACK_MAX_COUNT: - break key_names.append(key_name) count += 1 if callback: @@ -422,86 +419,20 @@ async def walk_blobs( callback(self._app, key_names) key_names = {} if include_stats else [] token = keyList.continuation_token - if not token or len(key_names) >= CALLBACK_MAX_COUNT: + if not token: # got all the keys (or as many as requested) log.debug("walk_blobs complete") break else: # keep going continuation_token = keyList.continuation_token - log.info(f"walk_blob_hierarchy, returning {count} items") + log.info(f"walk_blobs, returning {count} items") if not callback and count != len(key_names): msg = f"expected {count} keys in return list " msg += f"but got {len(key_names)}" log.warning(msg) return key_names - async def walk_blob_hierarchy( - self, client, prefix="", include_stats=False, callback=None - ): - log.info(f"walk_blob_hierarchy, prefix: {prefix}") - - key_names = None - - async def do_callback(callback, keynames): - if iscoroutinefunction(callback): - await callback(self._app, key_names) - else: - callback(self._app, key_names) - - key_names = key_names = {} if include_stats else [] - count = 0 - async for item in client.walk_blobs(name_starts_with=prefix): - nlen = len(prefix) - short_name = item.name[nlen:] - if isinstance(item, BlobPrefix): - log.debug(f"walk_blob_hierarchy - BlobPrefix: {short_name}") - kwargs = { - "prefix": item.name, - "include_stats": include_stats, - "callback": callback, - } - key_names = await self.walk_blob_hierarchy(client, **kwargs) - else: - kwargs = {"nme_starts_with": item.name} - async for item in client.list_blobs(**kwargs): - key_name = item["name"] - log.debug(f"walk_blob_hierarchy - got name: {key_name}") - if include_stats: - ETag = item["etag"] - lastModified = int(item["last_modified"].timestamp()) - data_size = item["size"] - key_tags = { - "ETag": ETag, - "Size": data_size, - "LastModified": lastModified, - } - key_names[key_name] = key_tags - else: - # just add the blob name to the list - key_names.append(item["name"]) - count += 1 - if callback and len(key_names) >= CALLBACK_MAX_COUNT: - msg = "walk_blob_hierarchy, invoking callback " - msg += f"with {len(key_names)} items" - log.debug(msg) - await do_callback(callback, key_names) - key_names = key_names = {} if include_stats else [] - if callback: - msg = "walk_blob_hierarchy, invoking callback " - msg += f"with {len(key_names)} items" - log.debug(msg) - await do_callback(callback, key_names) - key_names = {} if include_stats else [] - - log.info(f"walk_blob_hierarchy, returning {count} items") - if not callback and count != len(key_names): - msg = f"expected {count} keys in return list " - msg += f"but got {len(key_names)}" - log.warning(msg) - - return key_names - async def list_keys( self, prefix="", @@ -539,7 +470,6 @@ async def list_keys( "callback": callback, } key_names = await self.walk_blobs(client, **kwargs) - # key_names = await self.walk_blob_hierarchy(client, **kwargs) except CancelledError as cle: self._azure_stats_increment("error_count") msg = f"azureBlobClient.CancelledError for list_keys: {cle}" diff --git a/pyproject.toml b/pyproject.toml index a06eb603..1ee9b941 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,10 +34,11 @@ requires-python = ">=3.8" version = "0.8.2" dependencies = [ - "aiohttp == 3.8.5", + "aiohttp == 3.8.5", "aiobotocore == 2.5.0", "aiohttp_cors", "aiofiles", + "azure-storage-blob", "botocore", "cryptography", "numcodecs", @@ -51,7 +52,7 @@ dependencies = [ ] [project.optional-dependencies] -azure = ["azure-storage-blob"] +azure = [] [project.readme] text = """\ @@ -60,7 +61,7 @@ Data can be stored in either a POSIX files system, or using object-based storage AWS S3, Azure Blob Storage, or [MinIO](https://min.io). HSDS can be run a single machine or on a cluster using Kubernetes (or AKS on Microsoft Azure). -In addition, HSDS can be run in serverless mode with AWS Lambda or h5pyd local mode. +In addition, HSDS can b e run in serverless mode with AWS Lambda or h5pyd local mode. """ content-type = "text/x-rst"