Skip to content

Commit

Permalink
[Jobs] Allowing to specify intermediate bucket for file upload (#4257)
Browse files Browse the repository at this point in the history
* debug

* support workdir_bucket_name config on yaml file

* change the match statement to if else due to mypy limit

* pass mypy

* yapf format fix

* reformat

* remove debug line

* all dir to same bucket

* private member function

* fix mypy

* support sub dir config to separate to different directory

* rename and add smoke test

* bucketname

* support sub dir mount

* private member for _bucket_sub_path and smoke test fix

* support copy mount for sub dir

* support gcs, s3 delete folder

* doc

* r2 remove_objects_from_sub_path

* support azure remove directory and cos remove

* doc string for remove_objects_from_sub_path

* fix sky jobs subdir issue

* test case update

* rename to _bucket_sub_path

* change the config schema

* setter

* bug fix and test update

* delete bucket depends on user config or sky generated

* add test case

* smoke test bug fix

* robust smoke test

* fix comment

* bug fix

* set the storage manually

* better structure

* fix mypy

* Update docs/source/reference/config.rst

Co-authored-by: Romil Bhardwaj <[email protected]>

* Update docs/source/reference/config.rst

Co-authored-by: Romil Bhardwaj <[email protected]>

* limit creation for bucket and delete sub dir only

* resolve comment

* Update docs/source/reference/config.rst

Co-authored-by: Romil Bhardwaj <[email protected]>

* Update sky/utils/controller_utils.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* resolve PR comment

* bug fix

* bug fix

* fix test case

* bug fix

* fix

* fix test case

* bug fix

* support is_sky_managed param in config

* pass param intermediate_bucket_is_sky_managed

* resolve PR comment

* Update sky/utils/controller_utils.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* hide bucket creation log

* reset green color

* rename is_sky_managed to _is_sky_managed

* bug fix

* retrieve _is_sky_managed from stores

* propogate the log

---------

Co-authored-by: Romil Bhardwaj <[email protected]>
  • Loading branch information
zpoint and romilbhardwaj authored Dec 30, 2024
1 parent 7ae2d25 commit 7e40bcd
Show file tree
Hide file tree
Showing 11 changed files with 862 additions and 188 deletions.
4 changes: 4 additions & 0 deletions docs/source/reference/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ Available fields and semantics:
#
# Ref: https://docs.skypilot.co/en/latest/examples/managed-jobs.html#customizing-job-controller-resources
jobs:
# Bucket to store managed jobs mount files and tmp files. Bucket must already exist.
# Optional. If not set, SkyPilot will create a new bucket for each managed job launch.
# Supports s3://, gs://, https://<azure_storage_account>.blob.core.windows.net/<container>, r2://, cos://<region>/<bucket>
bucket: s3://my-bucket/
controller:
resources: # same spec as 'resources' in a task YAML
cloud: gcp
Expand Down
63 changes: 49 additions & 14 deletions sky/data/mounting_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,19 @@ def get_s3_mount_install_cmd() -> str:
return install_cmd


def get_s3_mount_cmd(bucket_name: str, mount_path: str) -> str:
# pylint: disable=invalid-name
def get_s3_mount_cmd(bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount an S3 bucket using goofys."""
if _bucket_sub_path is None:
_bucket_sub_path = ''
else:
_bucket_sub_path = f':{_bucket_sub_path}'
mount_cmd = ('goofys -o allow_other '
f'--stat-cache-ttl {_STAT_CACHE_TTL} '
f'--type-cache-ttl {_TYPE_CACHE_TTL} '
f'{bucket_name} {mount_path}')
f'{bucket_name}{_bucket_sub_path} {mount_path}')
return mount_cmd


Expand All @@ -50,15 +57,20 @@ def get_gcs_mount_install_cmd() -> str:
return install_cmd


def get_gcs_mount_cmd(bucket_name: str, mount_path: str) -> str:
# pylint: disable=invalid-name
def get_gcs_mount_cmd(bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount a GCS bucket using gcsfuse."""

bucket_sub_path_arg = f'--only-dir {_bucket_sub_path} '\
if _bucket_sub_path else ''
mount_cmd = ('gcsfuse -o allow_other '
'--implicit-dirs '
f'--stat-cache-capacity {_STAT_CACHE_CAPACITY} '
f'--stat-cache-ttl {_STAT_CACHE_TTL} '
f'--type-cache-ttl {_TYPE_CACHE_TTL} '
f'--rename-dir-limit {_RENAME_DIR_LIMIT} '
f'{bucket_sub_path_arg}'
f'{bucket_name} {mount_path}')
return mount_cmd

Expand All @@ -79,10 +91,12 @@ def get_az_mount_install_cmd() -> str:
return install_cmd


# pylint: disable=invalid-name
def get_az_mount_cmd(container_name: str,
storage_account_name: str,
mount_path: str,
storage_account_key: Optional[str] = None) -> str:
storage_account_key: Optional[str] = None,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount an AZ Container using blobfuse2.
Args:
Expand All @@ -91,6 +105,7 @@ def get_az_mount_cmd(container_name: str,
belongs to.
mount_path: Path where the container will be mounting.
storage_account_key: Access key for the given storage account.
_bucket_sub_path: Sub path of the mounting container.
Returns:
str: Command used to mount AZ container with blobfuse2.
Expand All @@ -107,25 +122,38 @@ def get_az_mount_cmd(container_name: str,
cache_path = _BLOBFUSE_CACHE_DIR.format(
storage_account_name=storage_account_name,
container_name=container_name)
if _bucket_sub_path is None:
bucket_sub_path_arg = ''
else:
bucket_sub_path_arg = f'--subdirectory={_bucket_sub_path}/ '
mount_cmd = (f'AZURE_STORAGE_ACCOUNT={storage_account_name} '
f'{key_env_var} '
f'blobfuse2 {mount_path} --allow-other --no-symlinks '
'-o umask=022 -o default_permissions '
f'--tmp-path {cache_path} '
f'{bucket_sub_path_arg}'
f'--container-name {container_name}')
return mount_cmd


def get_r2_mount_cmd(r2_credentials_path: str, r2_profile_name: str,
endpoint_url: str, bucket_name: str,
mount_path: str) -> str:
# pylint: disable=invalid-name
def get_r2_mount_cmd(r2_credentials_path: str,
r2_profile_name: str,
endpoint_url: str,
bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to install R2 mount utility goofys."""
if _bucket_sub_path is None:
_bucket_sub_path = ''
else:
_bucket_sub_path = f':{_bucket_sub_path}'
mount_cmd = (f'AWS_SHARED_CREDENTIALS_FILE={r2_credentials_path} '
f'AWS_PROFILE={r2_profile_name} goofys -o allow_other '
f'--stat-cache-ttl {_STAT_CACHE_TTL} '
f'--type-cache-ttl {_TYPE_CACHE_TTL} '
f'--endpoint {endpoint_url} '
f'{bucket_name} {mount_path}')
f'{bucket_name}{_bucket_sub_path} {mount_path}')
return mount_cmd


Expand All @@ -137,9 +165,12 @@ def get_cos_mount_install_cmd() -> str:
return install_cmd


def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str,
bucket_rclone_profile: str, bucket_name: str,
mount_path: str) -> str:
def get_cos_mount_cmd(rclone_config_data: str,
rclone_config_path: str,
bucket_rclone_profile: str,
bucket_name: str,
mount_path: str,
_bucket_sub_path: Optional[str] = None) -> str:
"""Returns a command to mount an IBM COS bucket using rclone."""
# creates a fusermount soft link on older (<22) Ubuntu systems for
# rclone's mount utility.
Expand All @@ -151,10 +182,14 @@ def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str,
'mkdir -p ~/.config/rclone/ && '
f'echo "{rclone_config_data}" >> '
f'{rclone_config_path}')
if _bucket_sub_path is None:
sub_path_arg = f'{bucket_name}/{_bucket_sub_path}'
else:
sub_path_arg = f'/{bucket_name}'
# --daemon will keep the mounting process running in the background.
mount_cmd = (f'{configure_rclone_profile} && '
'rclone mount '
f'{bucket_rclone_profile}:{bucket_name} {mount_path} '
f'{bucket_rclone_profile}:{sub_path_arg} {mount_path} '
'--daemon')
return mount_cmd

Expand Down Expand Up @@ -252,7 +287,7 @@ def get_mounting_script(
script = textwrap.dedent(f"""
#!/usr/bin/env bash
set -e
{command_runner.ALIAS_SUDO_TO_EMPTY_FOR_ROOT_CMD}
MOUNT_PATH={mount_path}
Expand Down
Loading

0 comments on commit 7e40bcd

Please sign in to comment.