Skip to content

Commit

Permalink
fix walk_blobs for azure driver
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Aug 31, 2023
1 parent 12ef9cc commit 355f617
Showing 1 changed file with 2 additions and 70 deletions.
72 changes: 2 additions & 70 deletions hsds/util/azureBlobClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime
import time
from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import BlobPrefix
from azure.core.exceptions import AzureError
from .. import hsds_logger as log

Expand Down Expand Up @@ -380,6 +379,7 @@ async def walk_blobs(
deliminator="/",
callback=None,
):
key_names = {} if include_stats else []
continuation_token = None
count = 0
while True:
Expand All @@ -389,7 +389,6 @@ async def walk_blobs(
"results_per_page": CALLBACK_MAX_COUNT,
}
keyList = client.walk_blobs(**kwargs).by_page(continuation_token)
key_names = {} if include_stats else []
async for key in await keyList.__anext__():
key_name = key["name"]
log.debug(f"walk_blobs got: {key_name}")
Expand Down Expand Up @@ -429,79 +428,13 @@ async def walk_blobs(
else:
# keep going
continuation_token = keyList.continuation_token
log.info(f"walk_blob_hierarchy, returning {count} items")
log.info(f"walk_blobs, returning {count} items")
if not callback and count != len(key_names):
msg = f"expected {count} keys in return list "
msg += f"but got {len(key_names)}"
log.warning(msg)
return key_names

async def walk_blob_hierarchy(
self, client, prefix="", include_stats=False, callback=None
):
log.info(f"walk_blob_hierarchy, prefix: {prefix}")

key_names = None

async def do_callback(callback, keynames):
if iscoroutinefunction(callback):
await callback(self._app, key_names)
else:
callback(self._app, key_names)

key_names = key_names = {} if include_stats else []
count = 0
async for item in client.walk_blobs(name_starts_with=prefix):
nlen = len(prefix)
short_name = item.name[nlen:]
if isinstance(item, BlobPrefix):
log.debug(f"walk_blob_hierarchy - BlobPrefix: {short_name}")
kwargs = {
"prefix": item.name,
"include_stats": include_stats,
"callback": callback,
}
key_names = await self.walk_blob_hierarchy(client, **kwargs)
else:
kwargs = {"nme_starts_with": item.name}
async for item in client.list_blobs(**kwargs):
key_name = item["name"]
log.debug(f"walk_blob_hierarchy - got name: {key_name}")
if include_stats:
ETag = item["etag"]
lastModified = int(item["last_modified"].timestamp())
data_size = item["size"]
key_tags = {
"ETag": ETag,
"Size": data_size,
"LastModified": lastModified,
}
key_names[key_name] = key_tags
else:
# just add the blob name to the list
key_names.append(item["name"])
count += 1
if callback and len(key_names) >= CALLBACK_MAX_COUNT:
msg = "walk_blob_hierarchy, invoking callback "
msg += f"with {len(key_names)} items"
log.debug(msg)
await do_callback(callback, key_names)
key_names = key_names = {} if include_stats else []
if callback:
msg = "walk_blob_hierarchy, invoking callback "
msg += f"with {len(key_names)} items"
log.debug(msg)
await do_callback(callback, key_names)
key_names = {} if include_stats else []

log.info(f"walk_blob_hierarchy, returning {count} items")
if not callback and count != len(key_names):
msg = f"expected {count} keys in return list "
msg += f"but got {len(key_names)}"
log.warning(msg)

return key_names

async def list_keys(
self,
prefix="",
Expand Down Expand Up @@ -539,7 +472,6 @@ async def list_keys(
"callback": callback,
}
key_names = await self.walk_blobs(client, **kwargs)
# key_names = await self.walk_blob_hierarchy(client, **kwargs)
except CancelledError as cle:
self._azure_stats_increment("error_count")
msg = f"azureBlobClient.CancelledError for list_keys: {cle}"
Expand Down

0 comments on commit 355f617

Please sign in to comment.