diff --git a/examples/oci/dataset-mount.yaml b/examples/oci/dataset-mount.yaml new file mode 100644 index 00000000000..91bec9cda65 --- /dev/null +++ b/examples/oci/dataset-mount.yaml @@ -0,0 +1,35 @@ +name: cpu-task1 + +resources: + cloud: oci + region: us-sanjose-1 + cpus: 2 + disk_size: 256 + disk_tier: medium + use_spot: False + +file_mounts: + # Mount an existing oci bucket + /datasets-storage: + source: oci://skybucket + mode: MOUNT # Either MOUNT or COPY. Optional. + +# Working directory (optional) containing the project codebase. +# Its contents are synced to ~/sky_workdir/ on the cluster. +workdir: . + +num_nodes: 1 + +# Typical use: pip install -r requirements.txt +# Invoked under the workdir (i.e., can use its files). +setup: | + echo "*** Running setup for the task. ***" + +# Typical use: make use of resources, such as running training. +# Invoked under the workdir (i.e., can use its files). +run: | + echo "*** Running the task on OCI ***" + timestamp=$(date +%s) + ls -lthr /datasets-storage + echo "hi" >> /datasets-storage/foo.txt + ls -lthr /datasets-storage diff --git a/examples/oci/dataset-upload-and-mount.yaml b/examples/oci/dataset-upload-and-mount.yaml new file mode 100644 index 00000000000..13ddc4d2b35 --- /dev/null +++ b/examples/oci/dataset-upload-and-mount.yaml @@ -0,0 +1,47 @@ +name: cpu-task1 + +resources: + cloud: oci + region: us-sanjose-1 + cpus: 2 + disk_size: 256 + disk_tier: medium + use_spot: False + +file_mounts: + /datasets-storage: + name: skybucket # Name of storage, optional when source is bucket URI + source: ['./examples/oci'] # Source path, can be local or bucket URL. Optional, do not specify to create an empty bucket. + store: oci # E.g 'oci', 's3', 'gcs'...; default: None. Optional. + persistent: True # Defaults to True; can be set to false. Optional. + mode: MOUNT # Either MOUNT or COPY. Optional. + + /datasets-storage2: + name: skybucket2 # Name of storage, optional when source is bucket URI + source: './examples/oci' # Source path, can be local or bucket URL. Optional, do not specify to create an empty bucket. + store: oci # E.g 'oci', 's3', 'gcs'...; default: None. Optional. + persistent: True # Defaults to True; can be set to false. Optional. + mode: MOUNT # Either MOUNT or COPY. Optional. + +# Working directory (optional) containing the project codebase. +# Its contents are synced to ~/sky_workdir/ on the cluster. +workdir: . + +num_nodes: 1 + +# Typical use: pip install -r requirements.txt +# Invoked under the workdir (i.e., can use its files). +setup: | + echo "*** Running setup for the task. ***" + +# Typical use: make use of resources, such as running training. +# Invoked under the workdir (i.e., can use its files). +run: | + echo "*** Running the task on OCI ***" + ls -lthr /datasets-storage + echo "hi" >> /datasets-storage/foo.txt + ls -lthr /datasets-storage + + ls -lthr /datasets-storage2 + echo "hi" >> /datasets-storage2/foo2.txt + ls -lthr /datasets-storage2 diff --git a/examples/oci/oci-mounts.yaml b/examples/oci/oci-mounts.yaml new file mode 100644 index 00000000000..6fd2aaf16eb --- /dev/null +++ b/examples/oci/oci-mounts.yaml @@ -0,0 +1,26 @@ +resources: + cloud: oci + +file_mounts: + ~/tmpfile: ~/tmpfile + ~/a/b/c/tmpfile: ~/tmpfile + /tmp/workdir: ~/tmp-workdir + + /mydir: + name: skybucket + source: ['~/tmp-workdir'] + store: oci + mode: MOUNT + +setup: | + echo "*** Setup ***" + +run: | + echo "*** Run ***" + + ls -lthr ~/tmpfile + ls -lthr ~/a/b/c + echo hi >> /tmp/workdir/new_file + ls -lthr /tmp/workdir + + ls -lthr /mydir diff --git a/sky/adaptors/oci.py b/sky/adaptors/oci.py index 8fe09479a38..31712de414f 100644 --- a/sky/adaptors/oci.py +++ b/sky/adaptors/oci.py @@ -1,9 +1,11 @@ """Oracle OCI cloud adaptor""" +import functools import logging import os from sky.adaptors import common +from sky.clouds.utils import oci_utils # Suppress OCI circuit breaker logging before lazy import, because # oci modules prints additional message during imports, i.e., the @@ -30,10 +32,16 @@ def get_config_file() -> str: def get_oci_config(region=None, profile='DEFAULT'): conf_file_path = get_config_file() + if not profile or profile == 'DEFAULT': + config_profile = oci_utils.oci_config.get_profile() + else: + config_profile = profile + oci_config = oci.config.from_file(file_location=conf_file_path, - profile_name=profile) + profile_name=config_profile) if region is not None: oci_config['region'] = region + return oci_config @@ -54,6 +62,29 @@ def get_identity_client(region=None, profile='DEFAULT'): return oci.identity.IdentityClient(get_oci_config(region, profile)) +def get_object_storage_client(region=None, profile='DEFAULT'): + return oci.object_storage.ObjectStorageClient( + get_oci_config(region, profile)) + + def service_exception(): """OCI service exception.""" return oci.exceptions.ServiceError + + +def with_oci_env(f): + + @functools.wraps(f) + def wrapper(*args, **kwargs): + # pylint: disable=line-too-long + enter_env_cmds = [ + 'conda info --envs | grep "sky-oci-cli-env" || conda create -n sky-oci-cli-env python=3.10 -y', + '. $(conda info --base 2> /dev/null)/etc/profile.d/conda.sh > /dev/null 2>&1 || true', + 'conda activate sky-oci-cli-env', 'pip install oci-cli', + 'export OCI_CLI_SUPPRESS_FILE_PERMISSIONS_WARNING=True' + ] + operation_cmd = [f(*args, **kwargs)] + leave_env_cmds = ['conda deactivate'] + return ' && '.join(enter_env_cmds + operation_cmd + leave_env_cmds) + + return wrapper diff --git a/sky/cloud_stores.py b/sky/cloud_stores.py index e9c111c56ac..108f33f2c1f 100644 --- a/sky/cloud_stores.py +++ b/sky/cloud_stores.py @@ -7,6 +7,7 @@ * Better interface. * Better implementation (e.g., fsspec, smart_open, using each cloud's SDK). """ +import os import shlex import subprocess import time @@ -18,6 +19,7 @@ from sky.adaptors import azure from sky.adaptors import cloudflare from sky.adaptors import ibm +from sky.adaptors import oci from sky.clouds import gcp from sky.data import data_utils from sky.data.data_utils import Rclone @@ -470,6 +472,64 @@ def make_sync_file_command(self, source: str, destination: str) -> str: return self.make_sync_dir_command(source, destination) +class OciCloudStorage(CloudStorage): + """OCI Cloud Storage.""" + + def is_directory(self, url: str) -> bool: + """Returns whether OCI 'url' is a directory. + In cloud object stores, a "directory" refers to a regular object whose + name is a prefix of other objects. + """ + bucket_name, path = data_utils.split_oci_path(url) + + client = oci.get_object_storage_client() + namespace = client.get_namespace( + compartment_id=oci.get_oci_config()['tenancy']).data + + objects = client.list_objects(namespace_name=namespace, + bucket_name=bucket_name, + prefix=path).data.objects + + if len(objects) == 0: + # A directory with few or no items + return True + + if len(objects) > 1: + # A directory with more than 1 items + return True + + object_name = objects[0].name + if path.endswith(object_name): + # An object path + return False + + # A directory with only 1 item + return True + + @oci.with_oci_env + def make_sync_dir_command(self, source: str, destination: str) -> str: + """Downloads using OCI CLI.""" + bucket_name, path = data_utils.split_oci_path(source) + + download_via_ocicli = (f'oci os object sync --no-follow-symlinks ' + f'--bucket-name {bucket_name} ' + f'--prefix "{path}" --dest-dir "{destination}"') + + return download_via_ocicli + + @oci.with_oci_env + def make_sync_file_command(self, source: str, destination: str) -> str: + """Downloads a file using OCI CLI.""" + bucket_name, path = data_utils.split_oci_path(source) + filename = os.path.basename(path) + destination = os.path.join(destination, filename) + + download_via_ocicli = (f'oci os object get --bucket-name {bucket_name} ' + f'--name "{path}" --file "{destination}"') + + return download_via_ocicli + + def get_storage_from_path(url: str) -> CloudStorage: """Returns a CloudStorage by identifying the scheme:// in a URL.""" result = urllib.parse.urlsplit(url) @@ -485,6 +545,7 @@ def get_storage_from_path(url: str) -> CloudStorage: 's3': S3CloudStorage(), 'r2': R2CloudStorage(), 'cos': IBMCosCloudStorage(), + 'oci': OciCloudStorage(), # TODO: This is a hack, as Azure URL starts with https://, we should # refactor the registry to be able to take regex, so that Azure blob can # be identified with `https://(.*?)\.blob\.core\.windows\.net` diff --git a/sky/data/data_transfer.py b/sky/data/data_transfer.py index 374871031cb..3ccc6f8fc0e 100644 --- a/sky/data/data_transfer.py +++ b/sky/data/data_transfer.py @@ -200,3 +200,40 @@ def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None: bucket.set_iam_policy(policy) logger.debug(f'Added {member} with role {role} to {bucket_name}.') + + +def s3_to_oci(s3_bucket_name: str, oci_bucket_name: str) -> None: + """Creates a one-time transfer from Amazon S3 to OCI Object Storage. + Args: + s3_bucket_name: str; Name of the Amazon S3 Bucket + oci_bucket_name: str; Name of the OCI Bucket + """ + # TODO(HysunHe): Implement sync with other clouds (s3, gs) + raise NotImplementedError('Moving data directly from S3 to OCI bucket ' + 'is currently not supported. Please specify ' + 'a local source for the storage object.') + + +def gcs_to_oci(gs_bucket_name: str, oci_bucket_name: str) -> None: + """Creates a one-time transfer from Google Cloud Storage to + OCI Object Storage. + Args: + gs_bucket_name: str; Name of the Google Cloud Storage Bucket + oci_bucket_name: str; Name of the OCI Bucket + """ + # TODO(HysunHe): Implement sync with other clouds (s3, gs) + raise NotImplementedError('Moving data directly from GCS to OCI bucket ' + 'is currently not supported. Please specify ' + 'a local source for the storage object.') + + +def r2_to_oci(r2_bucket_name: str, oci_bucket_name: str) -> None: + """Creates a one-time transfer from Cloudflare R2 to OCI Bucket. + Args: + r2_bucket_name: str; Name of the Cloudflare R2 Bucket + oci_bucket_name: str; Name of the OCI Bucket + """ + raise NotImplementedError( + 'Moving data directly from Cloudflare R2 to OCI ' + 'bucket is currently not supported. Please specify ' + 'a local source for the storage object.') diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index d66c79afeb0..05c2b42c844 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -730,3 +730,14 @@ def _remove_bucket_profile_rclone(bucket_name: str, lines_to_keep.append(line) return lines_to_keep + + +def split_oci_path(oci_path: str) -> Tuple[str, str]: + """Splits OCI Path into Bucket name and Relative Path to Bucket + Args: + oci_path: str; OCI Path, e.g. oci://imagenet/train/ + """ + path_parts = oci_path.replace('oci://', '').split('/') + bucket = path_parts.pop(0) + key = '/'.join(path_parts) + return bucket, key diff --git a/sky/data/mounting_utils.py b/sky/data/mounting_utils.py index 22b26c372c4..b713a1f1cc5 100644 --- a/sky/data/mounting_utils.py +++ b/sky/data/mounting_utils.py @@ -19,6 +19,7 @@ _BLOBFUSE_CACHE_ROOT_DIR = '~/.sky/blobfuse2_cache' _BLOBFUSE_CACHE_DIR = ('~/.sky/blobfuse2_cache/' '{storage_account_name}_{container_name}') +RCLONE_VERSION = 'v1.68.2' def get_s3_mount_install_cmd() -> str: @@ -158,6 +159,48 @@ def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str, return mount_cmd +def get_rclone_install_cmd() -> str: + """ RClone installation for both apt-get and rpm. + This would be common command. + """ + # pylint: disable=line-too-long + install_cmd = ( + f'(which dpkg > /dev/null 2>&1 && (which rclone > /dev/null || (cd ~ > /dev/null' + f' && curl -O https://downloads.rclone.org/{RCLONE_VERSION}/rclone-{RCLONE_VERSION}-linux-amd64.deb' + f' && sudo dpkg -i rclone-{RCLONE_VERSION}-linux-amd64.deb' + f' && rm -f rclone-{RCLONE_VERSION}-linux-amd64.deb)))' + f' || (which rclone > /dev/null || (cd ~ > /dev/null' + f' && curl -O https://downloads.rclone.org/{RCLONE_VERSION}/rclone-{RCLONE_VERSION}-linux-amd64.rpm' + f' && sudo yum --nogpgcheck install rclone-{RCLONE_VERSION}-linux-amd64.rpm -y' + f' && rm -f rclone-{RCLONE_VERSION}-linux-amd64.rpm))') + return install_cmd + + +def get_oci_mount_cmd(mount_path: str, store_name: str, region: str, + namespace: str, compartment: str, config_file: str, + config_profile: str) -> str: + """ OCI specific RClone mount command for oci object storage. """ + # pylint: disable=line-too-long + mount_cmd = ( + f'sudo chown -R `whoami` {mount_path}' + f' && rclone config create oos_{store_name} oracleobjectstorage' + f' provider user_principal_auth namespace {namespace}' + f' compartment {compartment} region {region}' + f' oci-config-file {config_file}' + f' oci-config-profile {config_profile}' + f' && sed -i "s/oci-config-file/config_file/g;' + f' s/oci-config-profile/config_profile/g" ~/.config/rclone/rclone.conf' + f' && ([ ! -f /bin/fusermount3 ] && sudo ln -s /bin/fusermount /bin/fusermount3 || true)' + f' && (grep -q {mount_path} /proc/mounts || rclone mount oos_{store_name}:{store_name} {mount_path} --daemon --allow-non-empty)' + ) + return mount_cmd + + +def get_rclone_version_check_cmd() -> str: + """ RClone version check. This would be common command. """ + return f'rclone --version | grep -q {RCLONE_VERSION}' + + def _get_mount_binary(mount_cmd: str) -> str: """Returns mounting binary in string given as the mount command. diff --git a/sky/data/storage.py b/sky/data/storage.py index 73bdf3aff00..188c97b9545 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -24,6 +24,7 @@ from sky.adaptors import cloudflare from sky.adaptors import gcp from sky.adaptors import ibm +from sky.adaptors import oci from sky.data import data_transfer from sky.data import data_utils from sky.data import mounting_utils @@ -54,7 +55,9 @@ str(clouds.AWS()), str(clouds.GCP()), str(clouds.Azure()), - str(clouds.IBM()), cloudflare.NAME + str(clouds.IBM()), + str(clouds.OCI()), + cloudflare.NAME, ] # Maximum number of concurrent rsync upload processes @@ -115,6 +118,7 @@ class StoreType(enum.Enum): AZURE = 'AZURE' R2 = 'R2' IBM = 'IBM' + OCI = 'OCI' @classmethod def from_cloud(cls, cloud: str) -> 'StoreType': @@ -128,6 +132,8 @@ def from_cloud(cls, cloud: str) -> 'StoreType': return StoreType.R2 elif cloud.lower() == str(clouds.Azure()).lower(): return StoreType.AZURE + elif cloud.lower() == str(clouds.OCI()).lower(): + return StoreType.OCI elif cloud.lower() == str(clouds.Lambda()).lower(): with ux_utils.print_exception_no_traceback(): raise ValueError('Lambda Cloud does not provide cloud storage.') @@ -149,6 +155,8 @@ def from_store(cls, store: 'AbstractStore') -> 'StoreType': return StoreType.R2 elif isinstance(store, IBMCosStore): return StoreType.IBM + elif isinstance(store, OciStore): + return StoreType.OCI else: with ux_utils.print_exception_no_traceback(): raise ValueError(f'Unknown store type: {store}') @@ -165,6 +173,8 @@ def store_prefix(self) -> str: return 'r2://' elif self == StoreType.IBM: return 'cos://' + elif self == StoreType.OCI: + return 'oci://' else: with ux_utils.print_exception_no_traceback(): raise ValueError(f'Unknown store type: {self}') @@ -564,6 +574,8 @@ def __init__(self, self.add_store(StoreType.R2) elif self.source.startswith('cos://'): self.add_store(StoreType.IBM) + elif self.source.startswith('oci://'): + self.add_store(StoreType.OCI) @staticmethod def _validate_source( @@ -644,7 +656,7 @@ def _validate_local_source(local_source): 'using a bucket by writing : ' f'{source} in the file_mounts section of your YAML') is_local_source = True - elif split_path.scheme in ['s3', 'gs', 'https', 'r2', 'cos']: + elif split_path.scheme in ['s3', 'gs', 'https', 'r2', 'cos', 'oci']: is_local_source = False # Storage mounting does not support mounting specific files from # cloud store - ensure path points to only a directory @@ -668,7 +680,7 @@ def _validate_local_source(local_source): with ux_utils.print_exception_no_traceback(): raise exceptions.StorageSourceError( f'Supported paths: local, s3://, gs://, https://, ' - f'r2://, cos://. Got: {source}') + f'r2://, cos://, oci://. Got: {source}') return source, is_local_source def _validate_storage_spec(self, name: Optional[str]) -> None: @@ -683,7 +695,7 @@ def validate_name(name): """ prefix = name.split('://')[0] prefix = prefix.lower() - if prefix in ['s3', 'gs', 'https', 'r2', 'cos']: + if prefix in ['s3', 'gs', 'https', 'r2', 'cos', 'oci']: with ux_utils.print_exception_no_traceback(): raise exceptions.StorageNameError( 'Prefix detected: `name` cannot start with ' @@ -798,6 +810,11 @@ def _add_store_from_metadata( s_metadata, source=self.source, sync_on_reconstruction=self.sync_on_reconstruction) + elif s_type == StoreType.OCI: + store = OciStore.from_metadata( + s_metadata, + source=self.source, + sync_on_reconstruction=self.sync_on_reconstruction) else: with ux_utils.print_exception_no_traceback(): raise ValueError(f'Unknown store type: {s_type}') @@ -886,6 +903,8 @@ def add_store(self, store_cls = R2Store elif store_type == StoreType.IBM: store_cls = IBMCosStore + elif store_type == StoreType.OCI: + store_cls = OciStore else: with ux_utils.print_exception_no_traceback(): raise exceptions.StorageSpecError( @@ -1149,6 +1168,9 @@ def _validate(self): assert data_utils.verify_ibm_cos_bucket(self.name), ( f'Source specified as {self.source}, a COS bucket. ', 'COS Bucket should exist.') + elif self.source.startswith('oci://'): + raise NotImplementedError( + 'Moving data from OCI to S3 is currently not supported.') # Validate name self.name = self.validate_name(self.name) @@ -1260,6 +1282,8 @@ def upload(self): self._transfer_to_s3() elif self.source.startswith('r2://'): self._transfer_to_s3() + elif self.source.startswith('oci://'): + self._transfer_to_s3() else: self.batch_aws_rsync([self.source]) except exceptions.StorageUploadError: @@ -1588,6 +1612,9 @@ def _validate(self): assert data_utils.verify_ibm_cos_bucket(self.name), ( f'Source specified as {self.source}, a COS bucket. ', 'COS Bucket should exist.') + elif self.source.startswith('oci://'): + raise NotImplementedError( + 'Moving data from OCI to GCS is currently not supported.') # Validate name self.name = self.validate_name(self.name) # Check if the storage is enabled @@ -1696,6 +1723,8 @@ def upload(self): self._transfer_to_gcs() elif self.source.startswith('r2://'): self._transfer_to_gcs() + elif self.source.startswith('oci://'): + self._transfer_to_gcs() else: # If a single directory is specified in source, upload # contents to root of bucket by suffixing /*. @@ -2122,6 +2151,9 @@ def _validate(self): assert data_utils.verify_ibm_cos_bucket(self.name), ( f'Source specified as {self.source}, a COS bucket. ', 'COS Bucket should exist.') + elif self.source.startswith('oci://'): + raise NotImplementedError( + 'Moving data from OCI to AZureBlob is not supported.') # Validate name self.name = self.validate_name(self.name) @@ -2474,6 +2506,8 @@ def upload(self): raise NotImplementedError(error_message.format('R2')) elif self.source.startswith('cos://'): raise NotImplementedError(error_message.format('IBM COS')) + elif self.source.startswith('oci://'): + raise NotImplementedError(error_message.format('OCI')) else: self.batch_az_blob_sync([self.source]) except exceptions.StorageUploadError: @@ -2833,6 +2867,10 @@ def _validate(self): assert data_utils.verify_ibm_cos_bucket(self.name), ( f'Source specified as {self.source}, a COS bucket. ', 'COS Bucket should exist.') + elif self.source.startswith('oci://'): + raise NotImplementedError( + 'Moving data from OCI to R2 is currently not supported.') + # Validate name self.name = S3Store.validate_name(self.name) # Check if the storage is enabled @@ -2884,6 +2922,8 @@ def upload(self): self._transfer_to_r2() elif self.source.startswith('r2://'): pass + elif self.source.startswith('oci://'): + self._transfer_to_r2() else: self.batch_aws_rsync([self.source]) except exceptions.StorageUploadError: @@ -3590,3 +3630,413 @@ def _delete_cos_bucket(self): if e.__class__.__name__ == 'NoSuchBucket': logger.debug('bucket already removed') Rclone.delete_rclone_bucket_profile(self.name, Rclone.RcloneClouds.IBM) + + +class OciStore(AbstractStore): + """OciStore inherits from Storage Object and represents the backend + for OCI buckets. + """ + + _ACCESS_DENIED_MESSAGE = 'AccessDeniedException' + + def __init__(self, + name: str, + source: str, + region: Optional[str] = None, + is_sky_managed: Optional[bool] = None, + sync_on_reconstruction: Optional[bool] = True): + self.client: Any + self.bucket: StorageHandle + self.oci_config_file: str + self.config_profile: str + self.compartment: str + self.namespace: str + + # Bucket region should be consistence with the OCI config file + region = oci.get_oci_config()['region'] + + super().__init__(name, source, region, is_sky_managed, + sync_on_reconstruction) + + def _validate(self): + if self.source is not None and isinstance(self.source, str): + if self.source.startswith('oci://'): + assert self.name == data_utils.split_oci_path(self.source)[0], ( + 'OCI Bucket is specified as path, the name should be ' + 'the same as OCI bucket.') + elif not re.search(r'^\w+://', self.source): + # Treat it as local path. + pass + else: + raise NotImplementedError( + f'Moving data from {self.source} to OCI is not supported.') + + # Validate name + self.name = self.validate_name(self.name) + # Check if the storage is enabled + if not _is_storage_cloud_enabled(str(clouds.OCI())): + with ux_utils.print_exception_no_traceback(): + raise exceptions.ResourcesUnavailableError( + 'Storage \'store: oci\' specified, but ' \ + 'OCI access is disabled. To fix, enable '\ + 'OCI by running `sky check`. '\ + 'More info: https://skypilot.readthedocs.io/en/latest/getting-started/installation.html.' # pylint: disable=line-too-long + ) + + @classmethod + def validate_name(cls, name) -> str: + """Validates the name of the OCI store. + + Source for rules: https://docs.oracle.com/en-us/iaas/Content/Object/Tasks/managingbuckets.htm#Managing_Buckets # pylint: disable=line-too-long + """ + + def _raise_no_traceback_name_error(err_str): + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageNameError(err_str) + + if name is not None and isinstance(name, str): + # Check for overall length + if not 1 <= len(name) <= 256: + _raise_no_traceback_name_error( + f'Invalid store name: name {name} must contain 1-256 ' + 'characters.') + + # Check for valid characters and start/end with a number or letter + pattern = r'^[A-Za-z0-9-._]+$' + if not re.match(pattern, name): + _raise_no_traceback_name_error( + f'Invalid store name: name {name} can only contain ' + 'upper or lower case letters, numeric characters, hyphens ' + '(-), underscores (_), and dots (.). Spaces are not ' + 'allowed. Names must start and end with a number or ' + 'letter.') + else: + _raise_no_traceback_name_error('Store name must be specified.') + return name + + def initialize(self): + """Initializes the OCI store object on the cloud. + + Initialization involves fetching bucket if exists, or creating it if + it does not. + + Raises: + StorageBucketCreateError: If bucket creation fails + StorageBucketGetError: If fetching existing bucket fails + StorageInitError: If general initialization fails. + """ + # pylint: disable=import-outside-toplevel + from sky.clouds.utils import oci_utils + from sky.provision.oci.query_utils import query_helper + + self.oci_config_file = oci.get_config_file() + self.config_profile = oci_utils.oci_config.get_profile() + + ## pylint: disable=line-too-long + # What's compartment? See thttps://docs.oracle.com/en/cloud/foundation/cloud_architecture/governance/compartments.html + self.compartment = query_helper.find_compartment(self.region) + self.client = oci.get_object_storage_client(region=self.region, + profile=self.config_profile) + self.namespace = self.client.get_namespace( + compartment_id=oci.get_oci_config()['tenancy']).data + + self.bucket, is_new_bucket = self._get_bucket() + if self.is_sky_managed is None: + # If is_sky_managed is not specified, then this is a new storage + # object (i.e., did not exist in global_user_state) and we should + # set the is_sky_managed property. + # If is_sky_managed is specified, then we take no action. + self.is_sky_managed = is_new_bucket + + def upload(self): + """Uploads source to store bucket. + + Upload must be called by the Storage handler - it is not called on + Store initialization. + + Raises: + StorageUploadError: if upload fails. + """ + try: + if isinstance(self.source, list): + self.batch_oci_rsync(self.source, create_dirs=True) + elif self.source is not None: + if self.source.startswith('oci://'): + pass + else: + self.batch_oci_rsync([self.source]) + except exceptions.StorageUploadError: + raise + except Exception as e: + raise exceptions.StorageUploadError( + f'Upload failed for store {self.name}') from e + + def delete(self) -> None: + deleted_by_skypilot = self._delete_oci_bucket(self.name) + if deleted_by_skypilot: + msg_str = f'Deleted OCI bucket {self.name}.' + else: + msg_str = (f'OCI bucket {self.name} may have been deleted ' + f'externally. Removing from local state.') + logger.info(f'{colorama.Fore.GREEN}{msg_str}' + f'{colorama.Style.RESET_ALL}') + + def get_handle(self) -> StorageHandle: + return self.client.get_bucket(namespace_name=self.namespace, + bucket_name=self.name).data + + def batch_oci_rsync(self, + source_path_list: List[Path], + create_dirs: bool = False) -> None: + """Invokes oci sync to batch upload a list of local paths to Bucket + + Use OCI bulk operation to batch process the file upload + + Args: + source_path_list: List of paths to local files or directories + create_dirs: If the local_path is a directory and this is set to + False, the contents of the directory are directly uploaded to + root of the bucket. If the local_path is a directory and this is + set to True, the directory is created in the bucket root and + contents are uploaded to it. + """ + + @oci.with_oci_env + def get_file_sync_command(base_dir_path, file_names): + includes = ' '.join( + [f'--include "{file_name}"' for file_name in file_names]) + sync_command = ( + 'oci os object bulk-upload --no-follow-symlinks --overwrite ' + f'--bucket-name {self.name} --namespace-name {self.namespace} ' + f'--src-dir "{base_dir_path}" {includes}') + + return sync_command + + @oci.with_oci_env + def get_dir_sync_command(src_dir_path, dest_dir_name): + if dest_dir_name and not str(dest_dir_name).endswith('/'): + dest_dir_name = f'{dest_dir_name}/' + + excluded_list = storage_utils.get_excluded_files(src_dir_path) + excluded_list.append('.git/*') + excludes = ' '.join([ + f'--exclude {shlex.quote(file_name)}' + for file_name in excluded_list + ]) + + # we exclude .git directory from the sync + sync_command = ( + 'oci os object bulk-upload --no-follow-symlinks --overwrite ' + f'--bucket-name {self.name} --namespace-name {self.namespace} ' + f'--object-prefix "{dest_dir_name}" --src-dir "{src_dir_path}" ' + f'{excludes} ') + + return sync_command + + # Generate message for upload + if len(source_path_list) > 1: + source_message = f'{len(source_path_list)} paths' + else: + source_message = source_path_list[0] + + log_path = sky_logging.generate_tmp_logging_file_path( + _STORAGE_LOG_FILE_NAME) + sync_path = f'{source_message} -> oci://{self.name}/' + with rich_utils.safe_status( + ux_utils.spinner_message(f'Syncing {sync_path}', + log_path=log_path)): + data_utils.parallel_upload( + source_path_list=source_path_list, + filesync_command_generator=get_file_sync_command, + dirsync_command_generator=get_dir_sync_command, + log_path=log_path, + bucket_name=self.name, + access_denied_message=self._ACCESS_DENIED_MESSAGE, + create_dirs=create_dirs, + max_concurrent_uploads=1) + + logger.info( + ux_utils.finishing_message(f'Storage synced: {sync_path}', + log_path)) + + def _get_bucket(self) -> Tuple[StorageHandle, bool]: + """Obtains the OCI bucket. + If the bucket exists, this method will connect to the bucket. + + If the bucket does not exist, there are three cases: + 1) Raise an error if the bucket source starts with oci:// + 2) Return None if bucket has been externally deleted and + sync_on_reconstruction is False + 3) Create and return a new bucket otherwise + + Return tuple (Bucket, Boolean): The first item is the bucket + json payload from the OCI API call, the second item indicates + if this is a new created bucket(True) or an existing bucket(False). + + Raises: + StorageBucketCreateError: If creating the bucket fails + StorageBucketGetError: If fetching a bucket fails + """ + try: + get_bucket_response = self.client.get_bucket( + namespace_name=self.namespace, bucket_name=self.name) + bucket = get_bucket_response.data + return bucket, False + except oci.service_exception() as e: + if e.status == 404: # Not Found + if isinstance(self.source, + str) and self.source.startswith('oci://'): + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageBucketGetError( + 'Attempted to connect to a non-existent bucket: ' + f'{self.source}') from e + else: + # If bucket cannot be found (i.e., does not exist), it is + # to be created by Sky. However, creation is skipped if + # Store object is being reconstructed for deletion. + if self.sync_on_reconstruction: + bucket = self._create_oci_bucket(self.name) + return bucket, True + else: + return None, False + elif e.status == 401: # Unauthorized + # AccessDenied error for buckets that are private and not + # owned by user. + command = ( + f'oci os object list --namespace-name {self.namespace} ' + f'--bucket-name {self.name}') + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageBucketGetError( + _BUCKET_FAIL_TO_CONNECT_MESSAGE.format(name=self.name) + + f' To debug, consider running `{command}`.') from e + else: + # Unknown / unexpected error happened. This might happen when + # Object storage service itself functions not normal (e.g. + # maintainance event causes internal server error or request + # timeout, etc). + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageBucketGetError( + f'Failed to connect to OCI bucket {self.name}') from e + + def mount_command(self, mount_path: str) -> str: + """Returns the command to mount the bucket to the mount_path. + + Uses Rclone to mount the bucket. + + Args: + mount_path: str; Path to mount the bucket to. + """ + install_cmd = mounting_utils.get_rclone_install_cmd() + mount_cmd = mounting_utils.get_oci_mount_cmd( + mount_path=mount_path, + store_name=self.name, + region=str(self.region), + namespace=self.namespace, + compartment=self.bucket.compartment_id, + config_file=self.oci_config_file, + config_profile=self.config_profile) + version_check_cmd = mounting_utils.get_rclone_version_check_cmd() + + return mounting_utils.get_mounting_command(mount_path, install_cmd, + mount_cmd, version_check_cmd) + + def _download_file(self, remote_path: str, local_path: str) -> None: + """Downloads file from remote to local on OCI bucket + + Args: + remote_path: str; Remote path on OCI bucket + local_path: str; Local path on user's device + """ + if remote_path.startswith(f'/{self.name}'): + # If the remote path is /bucket_name, we need to + # remove the leading / + remote_path = remote_path.lstrip('/') + + filename = os.path.basename(remote_path) + if not local_path.endswith(filename): + local_path = os.path.join(local_path, filename) + + @oci.with_oci_env + def get_file_download_command(remote_path, local_path): + download_command = (f'oci os object get --bucket-name {self.name} ' + f'--namespace-name {self.namespace} ' + f'--name {remote_path} --file {local_path}') + + return download_command + + download_command = get_file_download_command(remote_path, local_path) + + try: + with rich_utils.safe_status( + f'[bold cyan]Downloading: {remote_path} -> {local_path}[/]' + ): + subprocess.check_output(download_command, + stderr=subprocess.STDOUT, + shell=True) + except subprocess.CalledProcessError as e: + logger.error(f'Download failed: {remote_path} -> {local_path}.\n' + f'Detail errors: {e.output}') + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageBucketDeleteError( + f'Failed download file {self.name}:{remote_path}.') from e + + def _create_oci_bucket(self, bucket_name: str) -> StorageHandle: + """Creates OCI bucket with specific name in specific region + + Args: + bucket_name: str; Name of bucket + region: str; Region name, e.g. us-central1, us-west1 + """ + logger.debug(f'_create_oci_bucket: {bucket_name}') + try: + create_bucket_response = self.client.create_bucket( + namespace_name=self.namespace, + create_bucket_details=oci.oci.object_storage.models. + CreateBucketDetails( + name=bucket_name, + compartment_id=self.compartment, + )) + bucket = create_bucket_response.data + return bucket + except oci.service_exception() as e: + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageBucketCreateError( + f'Failed to create OCI bucket: {self.name}') from e + + def _delete_oci_bucket(self, bucket_name: str) -> bool: + """Deletes OCI bucket, including all objects in bucket + + Args: + bucket_name: str; Name of bucket + + Returns: + bool; True if bucket was deleted, False if it was deleted externally. + """ + logger.debug(f'_delete_oci_bucket: {bucket_name}') + + @oci.with_oci_env + def get_bucket_delete_command(bucket_name): + remove_command = (f'oci os bucket delete --bucket-name ' + f'{bucket_name} --empty --force') + + return remove_command + + remove_command = get_bucket_delete_command(bucket_name) + + try: + with rich_utils.safe_status( + f'[bold cyan]Deleting OCI bucket {bucket_name}[/]'): + subprocess.check_output(remove_command.split(' '), + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + if 'BucketNotFound' in e.output.decode('utf-8'): + logger.debug( + _BUCKET_EXTERNALLY_DELETED_DEBUG_MESSAGE.format( + bucket_name=bucket_name)) + return False + else: + logger.error(e.output) + with ux_utils.print_exception_no_traceback(): + raise exceptions.StorageBucketDeleteError( + f'Failed to delete OCI bucket {bucket_name}.') + return True diff --git a/sky/task.py b/sky/task.py index bd454216b0f..edd2fd211a3 100644 --- a/sky/task.py +++ b/sky/task.py @@ -1031,6 +1031,16 @@ def sync_storage_mounts(self) -> None: storage.name, data_utils.Rclone.RcloneClouds.IBM) blob_path = f'cos://{cos_region}/{storage.name}' self.update_file_mounts({mnt_path: blob_path}) + elif store_type is storage_lib.StoreType.OCI: + if storage.source is not None and not isinstance( + storage.source, + list) and storage.source.startswith('oci://'): + blob_path = storage.source + else: + blob_path = 'oci://' + storage.name + self.update_file_mounts({ + mnt_path: blob_path, + }) else: with ux_utils.print_exception_no_traceback(): raise ValueError(f'Storage Type {store_type} ' diff --git a/tests/smoke_tests/test_mount_and_storage.py b/tests/smoke_tests/test_mount_and_storage.py index 93a5f22c274..aa61282aa11 100644 --- a/tests/smoke_tests/test_mount_and_storage.py +++ b/tests/smoke_tests/test_mount_and_storage.py @@ -85,6 +85,23 @@ def test_scp_file_mounts(): smoke_tests_utils.run_one_test(test) +@pytest.mark.oci # For OCI object storage mounts and file mounts. +def test_oci_mounts(): + name = smoke_tests_utils.get_cluster_name() + test_commands = [ + *smoke_tests_utils.STORAGE_SETUP_COMMANDS, + f'sky launch -y -c {name} --cloud oci --num-nodes 2 examples/oci/oci-mounts.yaml', + f'sky logs {name} 1 --status', # Ensure the job succeeded. + ] + test = smoke_tests_utils.Test( + 'oci_mounts', + test_commands, + f'sky down -y {name}', + timeout=20 * 60, # 20 mins + ) + smoke_tests_utils.run_one_test(test) + + @pytest.mark.no_fluidstack # Requires GCP to be enabled def test_using_file_mounts_with_env_vars(generic_cloud: str): name = smoke_tests_utils.get_cluster_name()