Skip to content

Commit

Permalink
[Storage] Azure blob storage support (#3032)
Browse files Browse the repository at this point in the history
* first commit

* nit

* implement fetching bucket

* update batch sync

* support file name with empty space sync

* support blobfuse2 mount/container name validate

* support container deletion

* support download from container to remote vm

* complete download from container to remote vm

* update mounting tool blobfuse2 download command

* update mounting command

* _CREDENTIALS_FILES list update

* add smoke test

* update storage comment

* update download commands to use account key

* add account-key for upload

* nit

* nit fix

* data_utils fix

* nit

* nit

* add comments

* nit smoke

* implement verify_az_bucket

* smoke test update and nit mounting_utils

* config schema update

* support public container usage

* nit fix for private bucket test

* update _get_bucket to use from_container_url

* add _download_file

* nit

* fix mounting blobfuse2 issues

* nit

* format

* nit

* container client fix

* smoke test update private_bucket

* azure get_client update to use exists()

* nit

* udpate fetch command for public containers

* nit

* update fetching command for public containers

* silence client logging when used with public containers

* az cli and blobfuse installation update

* update for faster container client fetch

* Handle private container without access

* update private container without access smoke test

* change due to merging master branch

* updates from merging master

* update mounting smoke test

* mounting smoke test update

* remove logger restriction

* update comments

* update verify_az_bucket to use for both private and public

* update comments and formatting

* update delete_az_bucket

* az cli installation versioning

* update logging silence logic for get_client

* support azcopy for fetching

* update sas token generation with az-cli

* propagation hold

* merge fix

* add support to assign role to access storage account

* nit

* silence logging from httpx request to get object_id

* checks existance of storage account and resource group before creation

* create storage account for different regions

* fix source name when translating local file mounts for spot sync

* smoke test update for storage account names

* removing az-cli installation from cloud_stores.py

* nit

* update sas token generation to use python sdk

* nit

* Update sky/data/storage.py

Co-authored-by: Tian Xia <[email protected]>

* move sas token generating functions from data_utils to adaptors.azure

* use constant string format to obtain container url

* nit

* add comment for '/' and azcopy syntax

* refactor AzureBlobCloudStorage methods

* nit

* format

* nit

* update test storage mount yaml j2

* added rich status message for storage account and resource group creation

* update rich status message when creating storage account and resource group

* nit

* Error handle for when storage account creation did not yet propagate to system

* comment update

* merge error output into exception message

* error comment

* additional error handling when creating storage account

* nit

* update to use raw container url endpoint instead of 'az://'

* update config.yaml interface

* remove resource group existance check

* add more comments for az mount command

* nit

* add more exception handling for storage account initialization

* Remove lru cache decorator from sas token generating functions

* nit

* nit

* Revert back to check if the resource group exists before running command to create.

* refactor function to obtain resource group and storage account

* nit

* add support for storage account under AzureBlobStoreMetadata

* set default file permission to be 755 for mounting

* Update sky/adaptors/azure.py

Co-authored-by: Tian Xia <[email protected]>

* nit

* nit fixes

* format and update error handling

* nit fixes

* set default storage account and resource group name as string constant

* update error handle.

* additional error handle for else branch

* Additional error handling

* nit

* update get_az_storage_account_key to replace try-exception with if statement

* nit

* nit

* nit

* format

* update public container example as not accessible anymore

* nit

* file_bucket_name update

* add StoreType method to retrieve bucket endpoint url

* format

* add azure storage blob dependency installation for controller

* fix fetching methods

* nit

* additional docstr for _get_storage_account_and_resource_group

* nit

* update blobfuse2 cache directory

* format

* refactor get_storage_account_key method

* update docker storage mounts smoke test

* sleep for storage account creation to propagate

* handle externally removed storage account being fetched

* format

* nit

* add logic to retry for role assignment

* add comment to _create_storage_account method

* additional error handling for role assignment

* format

* nit

* Update sky/adaptors/azure.py

Co-authored-by: Zhanghao Wu <[email protected]>

* additional installation check for azure blob storage dependencies

* format

* update step 7 from maybe_translate_local_file_mounts_and_sync_up method to format source correctly for azure

* additional comment on container_client.exists()

* explicitly check None for match

* Update sky/cloud_stores.py

Co-authored-by: Zhanghao Wu <[email protected]>

* [style] import module instead of class or funcion

* nit

* docstring nit updates

* nit

* error handle failure to run list blobs API from cloud_stores.py::is_directory()

* nit

* nit

* Add role assignment logic to handle edge case

* format

* remove redundant get_az_resource_group method from data_utils

* asyncio loop lifecycle manage

* update constant values

* add logs when resource group and storage account is newly created

* Update sky/skylet/constants.py

Co-authored-by: Zhanghao Wu <[email protected]>

* add comment and move return True within the try except block

* reverse the order of two decorators for get_client method to allow cache_clear method

* revert error handling at _execute_file_mounts

* nit

* raise error when non existent storage account or container name is provided.

* format

* add comment for keeping decorator order

---------

Co-authored-by: Romil Bhardwaj <[email protected]>
Co-authored-by: Tian Xia <[email protected]>
Co-authored-by: Zhanghao Wu <[email protected]>
  • Loading branch information
4 people authored Jul 17, 2024
1 parent 465d36c commit b6620b0
Show file tree
Hide file tree
Showing 16 changed files with 2,032 additions and 147 deletions.
394 changes: 382 additions & 12 deletions sky/adaptors/azure.py

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4423,13 +4423,13 @@ def _execute_file_mounts(self, handle: CloudVmRayResourceHandle,

storage = cloud_stores.get_storage_from_path(src)
if storage.is_directory(src):
sync = storage.make_sync_dir_command(source=src,
destination=wrapped_dst)
sync_cmd = (storage.make_sync_dir_command(
source=src, destination=wrapped_dst))
# It is a directory so make sure it exists.
mkdir_for_wrapped_dst = f'mkdir -p {wrapped_dst}'
else:
sync = storage.make_sync_file_command(source=src,
destination=wrapped_dst)
sync_cmd = (storage.make_sync_file_command(
source=src, destination=wrapped_dst))
# It is a file so make sure *its parent dir* exists.
mkdir_for_wrapped_dst = (
f'mkdir -p {os.path.dirname(wrapped_dst)}')
Expand All @@ -4438,7 +4438,7 @@ def _execute_file_mounts(self, handle: CloudVmRayResourceHandle,
# Ensure sync can write to wrapped_dst (e.g., '/data/').
mkdir_for_wrapped_dst,
# Both the wrapped and the symlink dir exist; sync.
sync,
sync_cmd,
]
command = ' && '.join(download_target_commands)
# dst is only used for message printing.
Expand Down
209 changes: 199 additions & 10 deletions sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,24 @@
* Better interface.
* Better implementation (e.g., fsspec, smart_open, using each cloud's SDK).
"""
import shlex
import subprocess
import time
import urllib.parse

from sky import exceptions as sky_exceptions
from sky import sky_logging
from sky.adaptors import aws
from sky.adaptors import azure
from sky.adaptors import cloudflare
from sky.adaptors import ibm
from sky.clouds import gcp
from sky.data import data_utils
from sky.data.data_utils import Rclone
from sky.skylet import constants
from sky.utils import ux_utils

logger = sky_logging.init_logger(__name__)


class CloudStorage:
Expand Down Expand Up @@ -153,6 +162,183 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return ' && '.join(all_commands)


class AzureBlobCloudStorage(CloudStorage):
"""Azure Blob Storage."""
# AzCopy is utilized for downloading data from Azure Blob Storage
# containers to remote systems due to its superior performance compared to
# az-cli. While az-cli's `az storage blob sync` can synchronize data from
# local to container, it lacks support to sync from container to remote
# synchronization. Moreover, `az storage blob download-batch` in az-cli
# does not leverage AzCopy's efficient multi-threaded capabilities, leading
# to slower performance.
#
# AzCopy requires appending SAS tokens directly in commands, as it does not
# support using STORAGE_ACCOUNT_KEY, unlike az-cli, which can generate
# SAS tokens but lacks direct multi-threading support like AzCopy.
# Hence, az-cli for SAS token generation is ran on the local machine and
# AzCopy is installed at the remote machine for efficient data transfer
# from containers to remote systems.
# Note that on Azure instances, both az-cli and AzCopy are typically
# pre-installed. And installing both would be used with AZ container is
# used from non-Azure instances.

_GET_AZCOPY = [
'azcopy --version > /dev/null 2>&1 || '
'(mkdir -p /usr/local/bin; '
'curl -L https://aka.ms/downloadazcopy-v10-linux -o azcopy.tar.gz; '
'sudo tar -xvzf azcopy.tar.gz --strip-components=1 -C /usr/local/bin --exclude=*.txt; ' # pylint: disable=line-too-long
'sudo chmod +x /usr/local/bin/azcopy; '
'rm azcopy.tar.gz)'
]

def is_directory(self, url: str) -> bool:
"""Returns whether 'url' of the AZ Container is a directory.
In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
Args:
url: Endpoint url of the container/blob.
Returns:
True if the url is an endpoint of a directory and False if it
is a blob(file).
Raises:
azure.core.exceptions.HttpResponseError: If the user's Azure
Azure account does not have sufficient IAM role for the given
storage account.
StorageBucketGetError: Provided container name does not exist.
TimeoutError: If unable to determine the container path status
in time.
"""
storage_account_name, container_name, path = data_utils.split_az_path(
url)

# If there are more, we need to check if it is a directory or a file.
container_url = data_utils.AZURE_CONTAINER_URL.format(
storage_account_name=storage_account_name,
container_name=container_name)
resource_group_name = azure.get_az_resource_group(storage_account_name)
role_assignment_start = time.time()
refresh_client = False
role_assigned = False

# 1. List blobs in the container_url to decide wether it is a directory
# 2. If it fails due to permission issues, try to assign a permissive
# role for the storage account to the current Azure account
# 3. Wait for the role assignment to propagate and retry.
while (time.time() - role_assignment_start <
constants.WAIT_FOR_STORAGE_ACCOUNT_ROLE_ASSIGNMENT):
container_client = data_utils.create_az_client(
client_type='container',
container_url=container_url,
storage_account_name=storage_account_name,
resource_group_name=resource_group_name,
refresh_client=refresh_client)

if not container_client.exists():
with ux_utils.print_exception_no_traceback():
raise sky_exceptions.StorageBucketGetError(
f'The provided container {container_name!r} from the '
f'passed endpoint url {url!r} does not exist. Please '
'check if the name is correct.')

# If there aren't more than just container name and storage account,
# that's a directory.
# Note: This must be ran after existence of the storage account is
# checked while obtaining container client.
if not path:
return True

num_objects = 0
try:
for blob in container_client.list_blobs(name_starts_with=path):
if blob.name == path:
return False
num_objects += 1
if num_objects > 1:
return True
# A directory with few or no items
return True
except azure.exceptions().HttpResponseError as e:
# Handle case where user lacks sufficient IAM role for
# a private container in the same subscription. Attempt to
# assign appropriate role to current user.
if 'AuthorizationPermissionMismatch' in str(e):
if not role_assigned:
logger.info('Failed to list blobs in container '
f'{container_url!r}. This implies '
'insufficient IAM role for storage account'
f' {storage_account_name!r}.')
azure.assign_storage_account_iam_role(
storage_account_name=storage_account_name,
resource_group_name=resource_group_name)
role_assigned = True
refresh_client = True
else:
logger.info(
'Waiting due to the propagation delay of IAM '
'role assignment to the storage account '
f'{storage_account_name!r}.')
time.sleep(
constants.RETRY_INTERVAL_AFTER_ROLE_ASSIGNMENT)
continue
raise
else:
raise TimeoutError(
'Failed to determine the container path status within '
f'{constants.WAIT_FOR_STORAGE_ACCOUNT_ROLE_ASSIGNMENT}'
'seconds.')

def _get_azcopy_source(self, source: str, is_dir: bool) -> str:
"""Converts the source so it can be used as an argument for azcopy."""
storage_account_name, container_name, blob_path = (
data_utils.split_az_path(source))
storage_account_key = data_utils.get_az_storage_account_key(
storage_account_name)

if storage_account_key is None:
# public containers do not require SAS token for access
sas_token = ''
else:
if is_dir:
sas_token = azure.get_az_container_sas_token(
storage_account_name, storage_account_key, container_name)
else:
sas_token = azure.get_az_blob_sas_token(storage_account_name,
storage_account_key,
container_name,
blob_path)
# "?" is a delimiter character used when SAS token is attached to the
# container endpoint.
# Reference: https://learn.microsoft.com/en-us/azure/ai-services/translator/document-translation/how-to-guides/create-sas-tokens?tabs=Containers # pylint: disable=line-too-long
converted_source = f'{source}?{sas_token}' if sas_token else source

return shlex.quote(converted_source)

def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Fetches a directory using AZCOPY from storage to remote instance."""
source = self._get_azcopy_source(source, is_dir=True)
# destination is guaranteed to not have '/' at the end of the string
# by tasks.py::set_file_mounts(). It is necessary to add from this
# method due to syntax of azcopy.
destination = f'{destination}/'
download_command = (f'azcopy sync {source} {destination} '
'--recursive --delete-destination=false')
all_commands = list(self._GET_AZCOPY)
all_commands.append(download_command)
return ' && '.join(all_commands)

def make_sync_file_command(self, source: str, destination: str) -> str:
"""Fetches a file using AZCOPY from storage to remote instance."""
source = self._get_azcopy_source(source, is_dir=False)
download_command = f'azcopy copy {source} {destination}'
all_commands = list(self._GET_AZCOPY)
all_commands.append(download_command)
return ' && '.join(all_commands)


class R2CloudStorage(CloudStorage):
"""Cloudflare Cloud Storage."""

Expand Down Expand Up @@ -218,16 +404,6 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return ' && '.join(all_commands)


def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)

if result.scheme not in _REGISTRY:
assert False, (f'Scheme {result.scheme} not found in'
f' supported storage ({_REGISTRY.keys()}); path {url}')
return _REGISTRY[result.scheme]


class IBMCosCloudStorage(CloudStorage):
"""IBM Cloud Storage."""
# install rclone if package isn't already installed
Expand Down Expand Up @@ -294,10 +470,23 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return self.make_sync_dir_command(source, destination)


def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)
if result.scheme not in _REGISTRY:
assert False, (f'Scheme {result.scheme} not found in'
f' supported storage ({_REGISTRY.keys()}); path {url}')
return _REGISTRY[result.scheme]


# Maps bucket's URIs prefix(scheme) to its corresponding storage class
_REGISTRY = {
'gs': GcsCloudStorage(),
's3': S3CloudStorage(),
'r2': R2CloudStorage(),
'cos': IBMCosCloudStorage(),
# TODO: This is a hack, as Azure URL starts with https://, we should
# refactor the registry to be able to take regex, so that Azure blob can
# be identified with `https://(.*?)\.blob\.core\.windows\.net`
'https': AzureBlobCloudStorage()
}
13 changes: 13 additions & 0 deletions sky/clouds/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,19 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]:
return False, (f'Getting user\'s Azure identity failed.{help_str}\n'
f'{cls._INDENT_PREFIX}Details: '
f'{common_utils.format_exception(e)}')

# Check if the azure blob storage dependencies are installed.
try:
# pylint: disable=redefined-outer-name, import-outside-toplevel, unused-import
from azure.storage import blob
import msgraph
except ImportError as e:
return False, (
f'Azure blob storage depdencies are not installed. '
'Run the following commands:'
f'\n{cls._INDENT_PREFIX} $ pip install skypilot[azure]'
f'\n{cls._INDENT_PREFIX}Details: '
f'{common_utils.format_exception(e)}')
return True, None

def get_credential_file_mounts(self) -> Dict[str, str]:
Expand Down
8 changes: 4 additions & 4 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ def storage_delete(name: str) -> None:
if handle is None:
raise ValueError(f'Storage name {name!r} not found.')
else:
store_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
store_object.delete()
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()
Loading

0 comments on commit b6620b0

Please sign in to comment.