Skip to content

Commit

Permalink
Azurefix (#258)
Browse files Browse the repository at this point in the history
* make azure-storage-blob default dependency

* fix walk_blobs for azure driver

* don't stop iteration at max_blobs_count

* update quick start to use --nolint option
  • Loading branch information
jreadey authored Aug 31, 2023
1 parent c63c9ba commit 302105c
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ In addition, HSDS can be run in serverless mode with AWS Lambda or h5pyd local m

Make sure you have Python 3 and Pip installed, then:

1. Run install: `$ ./build.sh` from source tree OR install from pypi: `$ pip install hsds`
1. Run install: `$ ./build.sh --nolint` from source tree OR install from pypi: `$ pip install hsds`
2. Create a directory the server will use to store data, example: `$ mkdir ~/hsds_data`
3. Start server: `$ hsds --root_dir ~/hsds_data`
4. Run the test suite. In a separate terminal run:
Expand Down
8 changes: 4 additions & 4 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,15 @@ async def get_domains(request):
domainNames = domainNames.split(",")
else:
s3prefix = prefix[1:]
log.debug(f"get_domains - listing S3 keys for {s3prefix}")
log.debug(f"get_domains - listing keys for {s3prefix}")
kwargs = {
"include_stats": False,
"prefix": s3prefix,
"deliminator": "/",
"bucket": bucket,
}
s3keys = await getStorKeys(app, **kwargs)
log.debug(f"get_domains - getS3Keys returned: {len(s3keys)} keys")
log.debug(f"get_domains - getStorKeys returned: {len(s3keys)} keys")

for s3key in s3keys:
if s3key[-1] != "/":
Expand All @@ -543,7 +543,7 @@ async def get_domains(request):
if len(s3key) > 1 and s3key[-2] == "/":
# trim off double slash
s3key = s3key[:-1]
log.debug(f"get_domains - got s3key: {s3key}")
log.debug(f"get_domains - got key: {s3key}")
domain = "/" + s3key[:-1]
if pattern:
# do a pattern match on the basename
Expand Down Expand Up @@ -1353,7 +1353,7 @@ async def DELETE_Domain(request):
index = domain.find("/")
nlen = index + 1
s3prefix = domain[nlen:] + "/"
log.info(f"checking s3key with prefix: {s3prefix} in bucket: {bucket}")
log.info(f"checking key with prefix: {s3prefix} in bucket: {bucket}")
kwargs = {
"include_stats": False,
"prefix": s3prefix,
Expand Down
76 changes: 3 additions & 73 deletions hsds/util/azureBlobClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime
import time
from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import BlobPrefix
from azure.core.exceptions import AzureError
from .. import hsds_logger as log

Expand Down Expand Up @@ -380,6 +379,7 @@ async def walk_blobs(
deliminator="/",
callback=None,
):
key_names = {} if include_stats else []
continuation_token = None
count = 0
while True:
Expand All @@ -389,7 +389,6 @@ async def walk_blobs(
"results_per_page": CALLBACK_MAX_COUNT,
}
keyList = client.walk_blobs(**kwargs).by_page(continuation_token)
key_names = {} if include_stats else []
async for key in await keyList.__anext__():
key_name = key["name"]
log.debug(f"walk_blobs got: {key_name}")
Expand All @@ -411,8 +410,6 @@ async def walk_blobs(
log.debug("skip name thaat doesn't end in '/'")
# only return folders
continue
if len(key_names) >= CALLBACK_MAX_COUNT:
break
key_names.append(key_name)
count += 1
if callback:
Expand All @@ -422,86 +419,20 @@ async def walk_blobs(
callback(self._app, key_names)
key_names = {} if include_stats else []
token = keyList.continuation_token
if not token or len(key_names) >= CALLBACK_MAX_COUNT:
if not token:
# got all the keys (or as many as requested)
log.debug("walk_blobs complete")
break
else:
# keep going
continuation_token = keyList.continuation_token
log.info(f"walk_blob_hierarchy, returning {count} items")
log.info(f"walk_blobs, returning {count} items")
if not callback and count != len(key_names):
msg = f"expected {count} keys in return list "
msg += f"but got {len(key_names)}"
log.warning(msg)
return key_names

async def walk_blob_hierarchy(
self, client, prefix="", include_stats=False, callback=None
):
log.info(f"walk_blob_hierarchy, prefix: {prefix}")

key_names = None

async def do_callback(callback, keynames):
if iscoroutinefunction(callback):
await callback(self._app, key_names)
else:
callback(self._app, key_names)

key_names = key_names = {} if include_stats else []
count = 0
async for item in client.walk_blobs(name_starts_with=prefix):
nlen = len(prefix)
short_name = item.name[nlen:]
if isinstance(item, BlobPrefix):
log.debug(f"walk_blob_hierarchy - BlobPrefix: {short_name}")
kwargs = {
"prefix": item.name,
"include_stats": include_stats,
"callback": callback,
}
key_names = await self.walk_blob_hierarchy(client, **kwargs)
else:
kwargs = {"nme_starts_with": item.name}
async for item in client.list_blobs(**kwargs):
key_name = item["name"]
log.debug(f"walk_blob_hierarchy - got name: {key_name}")
if include_stats:
ETag = item["etag"]
lastModified = int(item["last_modified"].timestamp())
data_size = item["size"]
key_tags = {
"ETag": ETag,
"Size": data_size,
"LastModified": lastModified,
}
key_names[key_name] = key_tags
else:
# just add the blob name to the list
key_names.append(item["name"])
count += 1
if callback and len(key_names) >= CALLBACK_MAX_COUNT:
msg = "walk_blob_hierarchy, invoking callback "
msg += f"with {len(key_names)} items"
log.debug(msg)
await do_callback(callback, key_names)
key_names = key_names = {} if include_stats else []
if callback:
msg = "walk_blob_hierarchy, invoking callback "
msg += f"with {len(key_names)} items"
log.debug(msg)
await do_callback(callback, key_names)
key_names = {} if include_stats else []

log.info(f"walk_blob_hierarchy, returning {count} items")
if not callback and count != len(key_names):
msg = f"expected {count} keys in return list "
msg += f"but got {len(key_names)}"
log.warning(msg)

return key_names

async def list_keys(
self,
prefix="",
Expand Down Expand Up @@ -539,7 +470,6 @@ async def list_keys(
"callback": callback,
}
key_names = await self.walk_blobs(client, **kwargs)
# key_names = await self.walk_blob_hierarchy(client, **kwargs)
except CancelledError as cle:
self._azure_stats_increment("error_count")
msg = f"azureBlobClient.CancelledError for list_keys: {cle}"
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ requires-python = ">=3.8"
version = "0.8.2"

dependencies = [
"aiohttp == 3.8.5",
"aiohttp == 3.8.5",
"aiobotocore == 2.5.0",
"aiohttp_cors",
"aiofiles",
"azure-storage-blob",
"botocore",
"cryptography",
"numcodecs",
Expand All @@ -51,7 +52,7 @@ dependencies = [
]

[project.optional-dependencies]
azure = ["azure-storage-blob"]
azure = []

[project.readme]
text = """\
Expand All @@ -60,7 +61,7 @@ Data can be stored in either a POSIX files system, or using object-based storage
AWS S3, Azure Blob Storage, or [MinIO](https://min.io).
HSDS can be run a single machine or on a cluster using Kubernetes (or AKS on Microsoft Azure).
In addition, HSDS can be run in serverless mode with AWS Lambda or h5pyd local mode.
In addition, HSDS can b e run in serverless mode with AWS Lambda or h5pyd local mode.
"""
content-type = "text/x-rst"

Expand Down

0 comments on commit 302105c

Please sign in to comment.