Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azurefix #259

Merged
merged 5 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ In addition, HSDS can be run in serverless mode with AWS Lambda or h5pyd local m

Make sure you have Python 3 and Pip installed, then:

1. Run install: `$ ./build.sh` from source tree OR install from pypi: `$ pip install hsds`
1. Run install: `$ ./build.sh --nolint` from source tree OR install from pypi: `$ pip install hsds`
2. Create a directory the server will use to store data, example: `$ mkdir ~/hsds_data`
3. Start server: `$ hsds --root_dir ~/hsds_data`
4. Run the test suite. In a separate terminal run:
Expand Down
2 changes: 1 addition & 1 deletion hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .util.k8sClient import getDnLabelSelector, getPodIps
from . import hsds_logger as log

HSDS_VERSION = "0.8.2"
HSDS_VERSION = "0.8.3"


def getVersion():
Expand Down
8 changes: 4 additions & 4 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,15 @@ async def get_domains(request):
domainNames = domainNames.split(",")
else:
s3prefix = prefix[1:]
log.debug(f"get_domains - listing S3 keys for {s3prefix}")
log.debug(f"get_domains - listing keys for {s3prefix}")
kwargs = {
"include_stats": False,
"prefix": s3prefix,
"deliminator": "/",
"bucket": bucket,
}
s3keys = await getStorKeys(app, **kwargs)
log.debug(f"get_domains - getS3Keys returned: {len(s3keys)} keys")
log.debug(f"get_domains - getStorKeys returned: {len(s3keys)} keys")

for s3key in s3keys:
if s3key[-1] != "/":
Expand All @@ -543,7 +543,7 @@ async def get_domains(request):
if len(s3key) > 1 and s3key[-2] == "/":
# trim off double slash
s3key = s3key[:-1]
log.debug(f"get_domains - got s3key: {s3key}")
log.debug(f"get_domains - got key: {s3key}")
domain = "/" + s3key[:-1]
if pattern:
# do a pattern match on the basename
Expand Down Expand Up @@ -1353,7 +1353,7 @@ async def DELETE_Domain(request):
index = domain.find("/")
nlen = index + 1
s3prefix = domain[nlen:] + "/"
log.info(f"checking s3key with prefix: {s3prefix} in bucket: {bucket}")
log.info(f"checking key with prefix: {s3prefix} in bucket: {bucket}")
kwargs = {
"include_stats": False,
"prefix": s3prefix,
Expand Down
76 changes: 3 additions & 73 deletions hsds/util/azureBlobClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime
import time
from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import BlobPrefix
from azure.core.exceptions import AzureError
from .. import hsds_logger as log

Expand Down Expand Up @@ -380,6 +379,7 @@ async def walk_blobs(
deliminator="/",
callback=None,
):
key_names = {} if include_stats else []
continuation_token = None
count = 0
while True:
Expand All @@ -389,7 +389,6 @@ async def walk_blobs(
"results_per_page": CALLBACK_MAX_COUNT,
}
keyList = client.walk_blobs(**kwargs).by_page(continuation_token)
key_names = {} if include_stats else []
async for key in await keyList.__anext__():
key_name = key["name"]
log.debug(f"walk_blobs got: {key_name}")
Expand All @@ -411,8 +410,6 @@ async def walk_blobs(
log.debug("skip name thaat doesn't end in '/'")
# only return folders
continue
if len(key_names) >= CALLBACK_MAX_COUNT:
break
key_names.append(key_name)
count += 1
if callback:
Expand All @@ -422,86 +419,20 @@ async def walk_blobs(
callback(self._app, key_names)
key_names = {} if include_stats else []
token = keyList.continuation_token
if not token or len(key_names) >= CALLBACK_MAX_COUNT:
if not token:
# got all the keys (or as many as requested)
log.debug("walk_blobs complete")
break
else:
# keep going
continuation_token = keyList.continuation_token
log.info(f"walk_blob_hierarchy, returning {count} items")
log.info(f"walk_blobs, returning {count} items")
if not callback and count != len(key_names):
msg = f"expected {count} keys in return list "
msg += f"but got {len(key_names)}"
log.warning(msg)
return key_names

async def walk_blob_hierarchy(
self, client, prefix="", include_stats=False, callback=None
):
log.info(f"walk_blob_hierarchy, prefix: {prefix}")

key_names = None

async def do_callback(callback, keynames):
if iscoroutinefunction(callback):
await callback(self._app, key_names)
else:
callback(self._app, key_names)

key_names = key_names = {} if include_stats else []
count = 0
async for item in client.walk_blobs(name_starts_with=prefix):
nlen = len(prefix)
short_name = item.name[nlen:]
if isinstance(item, BlobPrefix):
log.debug(f"walk_blob_hierarchy - BlobPrefix: {short_name}")
kwargs = {
"prefix": item.name,
"include_stats": include_stats,
"callback": callback,
}
key_names = await self.walk_blob_hierarchy(client, **kwargs)
else:
kwargs = {"nme_starts_with": item.name}
async for item in client.list_blobs(**kwargs):
key_name = item["name"]
log.debug(f"walk_blob_hierarchy - got name: {key_name}")
if include_stats:
ETag = item["etag"]
lastModified = int(item["last_modified"].timestamp())
data_size = item["size"]
key_tags = {
"ETag": ETag,
"Size": data_size,
"LastModified": lastModified,
}
key_names[key_name] = key_tags
else:
# just add the blob name to the list
key_names.append(item["name"])
count += 1
if callback and len(key_names) >= CALLBACK_MAX_COUNT:
msg = "walk_blob_hierarchy, invoking callback "
msg += f"with {len(key_names)} items"
log.debug(msg)
await do_callback(callback, key_names)
key_names = key_names = {} if include_stats else []
if callback:
msg = "walk_blob_hierarchy, invoking callback "
msg += f"with {len(key_names)} items"
log.debug(msg)
await do_callback(callback, key_names)
key_names = {} if include_stats else []

log.info(f"walk_blob_hierarchy, returning {count} items")
if not callback and count != len(key_names):
msg = f"expected {count} keys in return list "
msg += f"but got {len(key_names)}"
log.warning(msg)

return key_names

async def list_keys(
self,
prefix="",
Expand Down Expand Up @@ -539,7 +470,6 @@ async def list_keys(
"callback": callback,
}
key_names = await self.walk_blobs(client, **kwargs)
# key_names = await self.walk_blob_hierarchy(client, **kwargs)
except CancelledError as cle:
self._azure_stats_increment("error_count")
msg = f"azureBlobClient.CancelledError for list_keys: {cle}"
Expand Down
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
]
requires-python = ">=3.8"
version = "0.8.2"
version = "0.8.3"

dependencies = [
"aiohttp == 3.8.5",
"aiohttp == 3.8.5",
"aiobotocore == 2.5.0",
"aiohttp_cors",
"aiofiles",
"azure-storage-blob",
"botocore",
"cryptography",
"numcodecs",
Expand All @@ -51,7 +52,7 @@ dependencies = [
]

[project.optional-dependencies]
azure = ["azure-storage-blob"]
azure = []

[project.readme]
text = """\
Expand All @@ -60,7 +61,7 @@ Data can be stored in either a POSIX files system, or using object-based storage
AWS S3, Azure Blob Storage, or [MinIO](https://min.io).
HSDS can be run a single machine or on a cluster using Kubernetes (or AKS on Microsoft Azure).

In addition, HSDS can be run in serverless mode with AWS Lambda or h5pyd local mode.
In addition, HSDS can b e run in serverless mode with AWS Lambda or h5pyd local mode.
"""
content-type = "text/x-rst"

Expand Down
Loading