From 6e1e9f27ee196b34314f93f8709b9e1a0c81ce37 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sun, 17 Sep 2023 05:03:20 +0000 Subject: [PATCH] updates --- sky/backends/backend_utils.py | 2 +- sky/backends/cloud_vm_ray_backend.py | 17 ++-- sky/data/mounting_utils.py | 34 +++---- sky/data/storage.py | 18 ++-- sky/data/{skystorage.py => storage_csync.py} | 93 +++++++++----------- 5 files changed, 81 insertions(+), 83 deletions(-) rename sky/data/{skystorage.py => storage_csync.py} (85%) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index ab971a22407..71df9e86269 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -2897,7 +2897,7 @@ def wait_and_terminate_csync(cluster_name: str) -> None: handle.docker_user) runners = command_runner.SSHCommandRunner.make_runner_list( ip_list, port_list=port_list, **ssh_credentials) - csync_terminate_cmd = ('python -m sky.data.skystorage terminate -a ' + csync_terminate_cmd = ('python -m sky.data.storage_csync terminate -a ' '>/dev/null') def _run_csync_terminate(runner): diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index ec8ce356026..ad256587914 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3009,8 +3009,10 @@ def _sync_file_mounts( ) -> None: """Mounts all user files to the remote nodes.""" self._execute_file_mounts(handle, all_file_mounts) - self._execute_storage_mounts(handle, storage_mounts, storage_utils.StorageMode.MOUNT) - self._execute_storage_mounts(handle, storage_mounts, storage_utils.StorageMode.CSYNC) + self._execute_storage_mounts(handle, storage_mounts, + storage_utils.StorageMode.MOUNT) + self._execute_storage_mounts(handle, storage_mounts, + storage_utils.StorageMode.CSYNC) self._set_cluster_storage_mounts_metadata(handle.cluster_name, storage_mounts) @@ -4414,8 +4416,7 @@ def _symlink_node(runner: command_runner.SSHCommandRunner): logger.debug(f'File mount sync took {end - start} seconds.') def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle, - storage_mounts: Dict[Path, - storage_lib.Storage], + storage_mounts: Dict[Path, storage_lib.Storage], mount_mode: storage_utils.StorageMode): """Executes storage mounts: installing mounting tools and mounting.""" # Process only MOUNT mode objects here. COPY mode objects have been @@ -4441,7 +4442,7 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle, if mount_mode == storage_utils.StorageMode.MOUNT: mode_str = 'mount' action_message = 'Mounting' - else: # CSYNC mdoe + else: # CSYNC mdoe mode_str = 'csync' action_message = 'Setting up CSYNC' @@ -4468,8 +4469,9 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle, store = list(storage_obj.stores.values())[0] if mount_mode == storage_utils.StorageMode.MOUNT: mount_cmd = store.mount_command(dst) - else: # CSYNC mode - mount_cmd= store.csync_command(dst) + else: # CSYNC mode + mount_cmd = store.csync_command(dst, + storage_obj.interval_seconds) src_print = (storage_obj.source if storage_obj.source else storage_obj.name) if isinstance(src_print, list): @@ -4510,7 +4512,6 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle, end = time.time() logger.debug(f'Setting storage {mode_str} took {end - start} seconds.') - def _set_cluster_storage_mounts_metadata( self, cluster_name: str, storage_mounts: Dict[Path, storage_lib.Storage]) -> None: diff --git a/sky/data/mounting_utils.py b/sky/data/mounting_utils.py index 9559ee842bc..afc3fe670a6 100644 --- a/sky/data/mounting_utils.py +++ b/sky/data/mounting_utils.py @@ -31,8 +31,8 @@ def get_mounting_command( MOUNT and CSYNC mount_path: Path to mount the bucket at. mount_cmd: Command to mount the bucket. Should be single line. - install_cmd: Command to install the mounting utility. Should be - single line. + install_cmd: Command to install the mounting utility for MOUNT mode. + Should be single line. Returns: str: Mounting command with the mounting script as a heredoc. @@ -58,17 +58,17 @@ def get_mounting_command( MOUNT_BINARY={mount_binary} # Check if path is already mounted if grep -q $MOUNT_PATH /proc/mounts ; then - echo "Path already mounted - unmounting..." - fusermount -uz "$MOUNT_PATH" - echo "Successfully unmounted $MOUNT_PATH." + echo "Path already mounted - unmounting..." + fusermount -uz "$MOUNT_PATH" + echo "Successfully unmounted $MOUNT_PATH." fi # Install MOUNT_BINARY if not already installed if {installed_check}; then - echo "$MOUNT_BINARY already installed. Proceeding..." + echo "$MOUNT_BINARY already installed. Proceeding..." else - echo "Installing $MOUNT_BINARY..." - {install_cmd} + echo "Installing $MOUNT_BINARY..." + {install_cmd} fi fi @@ -81,21 +81,21 @@ def get_mounting_command( # Check if mount path contains files for MOUNT mode only if [ "$MOUNT_MODE" = "MOUNT" ]; then if [ "$(ls -A $MOUNT_PATH)" ]; then - echo "Mount path $MOUNT_PATH is not empty. Please mount to another path or remove it first." - exit {exceptions.MOUNT_PATH_NON_EMPTY_CODE} + echo "Mount path $MOUNT_PATH is not empty. Please mount to another path or remove it first." + exit {exceptions.MOUNT_PATH_NON_EMPTY_CODE} fi fi fi if [ "$MOUNT_MODE" = "MOUNT" ]; then - echo "Mounting source bucket to $MOUNT_PATH with $MOUNT_BINARY..." - {mount_cmd} - echo "Mounting done." + echo "Mounting source bucket to $MOUNT_PATH with $MOUNT_BINARY..." + {mount_cmd} + echo "Mounting done." else - # running CSYNC cmd - echo "Setting up CSYNC on $MOUNT_PATH to source bucket..." - setsid {mount_cmd} >/dev/null 2>&1 & - echo "CSYNC is set." + # running CSYNC cmd + echo "Setting up CSYNC on $MOUNT_PATH to source bucket..." + setsid {mount_cmd} >/dev/null 2>&1 & + echo "CSYNC is set." fi """) diff --git a/sky/data/storage.py b/sky/data/storage.py index c652cbbe7d4..e73842824c1 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -457,7 +457,8 @@ def __init__(self, self.persistent = persistent self.mode = mode assert mode in StorageMode - self.interval_seconds = interval_seconds + if self.mode != StorageMode.CSYNC: + assert interval_seconds is None self.sync_on_reconstruction = sync_on_reconstruction # TODO(romilb, zhwu): This is a workaround to support storage deletion @@ -746,7 +747,8 @@ def _add_store_from_metadata(self, sky_stores) -> None: self._add_store(store, is_reconstructed=True) @classmethod - def from_metadata(cls, metadata: StorageMetadata, **override_args): + def from_metadata(cls, metadata: StorageMetadata, + **override_args) -> 'Storage': """Create Storage from a StorageMetadata object. Used when reconstructing Storage and Store objects from @@ -988,6 +990,7 @@ class S3Store(AbstractStore): """ _ACCESS_DENIED_MESSAGE = 'Access Denied' + _CSYNC_DEFAULT_INTERVAL_SECONDS = 600 def __init__(self, name: str, @@ -1308,7 +1311,7 @@ def mount_command(self, mount_path: str) -> str: def csync_command(self, csync_path: str, - interval_seconds: Optional[int] = 600) -> str: + interval_seconds: Optional[int] = None) -> str: """Returns command to mount CSYNC with Storage bucket on CSYNC_PATH. Args: @@ -1316,7 +1319,7 @@ def csync_command(self, interval_seconds: int; runs the sync command every INTERVAL seconds """ if interval_seconds is None: - interval_seconds = 600 + interval_seconds = self._CSYNC_DEFAULT_INTERVAL_SECONDS if data_utils.is_cloud_store_url(self.source): if self.source is not None: if isinstance(self.source, (str, Path)): @@ -1327,7 +1330,7 @@ def csync_command(self, ) else: dst = self.bucket.name - csync_cmd = (f'python -m sky.data.skystorage csync {csync_path} ' + csync_cmd = (f'python -m sky.data.storage_csync csync {csync_path} ' f's3 {dst} --interval-seconds {interval_seconds} ' '--delete --no-follow-symlinks') return mounting_utils.get_mounting_command(StorageMode.CSYNC, @@ -1407,6 +1410,7 @@ class GcsStore(AbstractStore): _ACCESS_DENIED_MESSAGE = 'AccessDeniedException' GCSFUSE_VERSION = '1.0.1' + _CSYNC_DEFAULT_INTERVAL_SECONDS = 600 def __init__(self, name: str, @@ -1776,7 +1780,7 @@ def csync_command(self, interval_seconds: int; runs the sync command every INTERVAL seconds """ if interval_seconds is None: - interval_seconds = 600 + interval_seconds = self._CSYNC_DEFAULT_INTERVAL_SECONDS if data_utils.is_cloud_store_url(self.source): if self.source is not None: if isinstance(self.source, (str, Path)): @@ -1787,7 +1791,7 @@ def csync_command(self, ) else: dst = self.bucket.name - csync_cmd = (f'python -m sky.data.skystorage csync {csync_path} ' + csync_cmd = (f'python -m sky.data.storage_csync csync {csync_path} ' f'gcs {dst} --interval-seconds {interval_seconds} ' '--delete --no-follow-symlinks') return mounting_utils.get_mounting_command(StorageMode.CSYNC, diff --git a/sky/data/skystorage.py b/sky/data/storage_csync.py similarity index 85% rename from sky/data/skystorage.py rename to sky/data/storage_csync.py index f92df8eeb57..9bed4526f38 100644 --- a/sky/data/skystorage.py +++ b/sky/data/storage_csync.py @@ -1,4 +1,4 @@ -"""Skystorage module""" +"""CSYNC module""" import functools import os import pathlib @@ -17,6 +17,7 @@ logger = sky_logging.init_logger(__name__) _CSYNC_BASE_PATH = '~/.skystorage' +_CSYNC_DB_PATH = '~/.sky/storage_csync.db' _DB = None _CURSOR = None @@ -24,7 +25,7 @@ _BOOT_TIME = None -def db(func): +def connect_db(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -41,8 +42,7 @@ def create_table(cursor, conn): conn.commit() if _DB is None: - db_path = os.path.expanduser('~/.sky/skystorage.db') - pathlib.Path(db_path).parents[0].mkdir(parents=True, exist_ok=True) + db_path = os.path.expanduser(_CSYNC_DB_PATH) _DB = db_utils.SQLiteConn(db_path, create_table) _CURSOR = _DB.cursor @@ -54,7 +54,7 @@ def create_table(cursor, conn): return wrapper -@db +@connect_db def _add_running_csync(csync_pid: int, source_path: str): """Given the process id of CSYNC, it should create a row with it""" assert _CURSOR is not None @@ -66,7 +66,7 @@ def _add_running_csync(csync_pid: int, source_path: str): _CONN.commit() -@db +@connect_db def _get_all_running_csync_pid() -> List[Any]: """Returns all the registerd pid of CSYNC processes""" assert _CURSOR is not None @@ -77,7 +77,7 @@ def _get_all_running_csync_pid() -> List[Any]: return csync_pids -@db +@connect_db def _get_running_csync_source_path() -> List[Any]: """Returns all the registerd source path of CSYNC processes""" assert _CURSOR is not None @@ -89,7 +89,7 @@ def _get_running_csync_source_path() -> List[Any]: return source_paths -@db +@connect_db def _set_running_csync_sync_pid(csync_pid: int, sync_pid: Optional[int]): """Given the process id of CSYNC, sets the sync_pid column value""" assert _CURSOR is not None @@ -100,7 +100,7 @@ def _set_running_csync_sync_pid(csync_pid: int, sync_pid: Optional[int]): _CONN.commit() -@db +@connect_db def _get_running_csync_sync_pid(csync_pid: int) -> Optional[int]: """Given the process id of CSYNC, returns the sync_pid column value""" assert _CURSOR is not None @@ -113,7 +113,7 @@ def _get_running_csync_sync_pid(csync_pid: int) -> Optional[int]: raise ValueError(f'CSYNC PID {csync_pid} not found.') -@db +@connect_db def _delete_running_csync(csync_pid: int): """Deletes the row with process id of CSYNC from running_csync table""" assert _CURSOR is not None @@ -123,7 +123,7 @@ def _delete_running_csync(csync_pid: int): _CONN.commit() -@db +@connect_db def _get_csync_pid_from_source_path(path: str) -> Optional[int]: """Given the path, returns process ID of csync running on it""" assert _CURSOR is not None @@ -146,7 +146,7 @@ def get_s3_upload_cmd(src_path: str, dst: str, num_threads: int, delete: bool, """Builds sync command for aws s3""" config_cmd = ('aws configure set default.s3.max_concurrent_requests ' f'{num_threads}') - subprocess.check_output(config_cmd, shell=True) + subprocess.run(config_cmd, shell=True) sync_cmd = f'aws s3 sync {src_path} s3://{dst}' if delete: sync_cmd += ' --delete' @@ -168,16 +168,9 @@ def get_gcs_upload_cmd(src_path: str, dst: str, num_threads: int, delete: bool, return sync_cmd -def run_sync(src: str, - storetype: str, - dst: str, - num_threads: int, - interval_seconds: int, - delete: bool, - no_follow_symlinks: bool, - csync_pid: int, - max_retries: int = 10, - backoff: Optional[common_utils.Backoff] = None): +def run_sync(src: str, storetype: str, dst: str, num_threads: int, + interval_seconds: int, delete: bool, no_follow_symlinks: bool, + csync_pid: int): """Runs the sync command to from src to storetype bucket""" #TODO(Doyoung): add enum type class to handle storetypes storetype = storetype.lower() @@ -199,40 +192,40 @@ def run_sync(src: str, log_path = os.path.expanduser(os.path.join(base_dir, log_file_name)) with open(log_path, 'a') as fout: - try: - with subprocess.Popen(sync_cmd, - stdout=fout, - stderr=fout, - start_new_session=True, - shell=True) as proc: - _set_running_csync_sync_pid(csync_pid, proc.pid) - proc.wait() - except subprocess.CalledProcessError: - src_to_bucket = (f'\'{src}\' to \'{dst}\' ' - f'at \'{storetype}\'') - if max_retries > 0: - if backoff is None: - # interval_seconds/2 is heuristically determined - # as initial backoff - backoff = common_utils.Backoff(int(interval_seconds / 2)) + max_retries = 10 + # interval_seconds/2 is heuristically determined + # as initial backoff + initial_backoff = int(interval_seconds / 2) + backoff = common_utils.Backoff(initial_backoff) + for _ in range(max_retries): + try: + with subprocess.Popen(sync_cmd, + stdout=fout, + stderr=fout, + start_new_session=True, + shell=True) as proc: + _set_running_csync_sync_pid(csync_pid, proc.pid) + proc.wait() + _set_running_csync_sync_pid(csync_pid, -1) + except subprocess.CalledProcessError: + # reset sync pid as the sync process is terminated + _set_running_csync_sync_pid(csync_pid, -1) + src_to_bucket = (f'\'{src}\' to \'{dst}\' ' + f'at \'{storetype}\'') wait_time = backoff.current_backoff() fout.write('Encountered an error while syncing ' - f'{src_to_bucket}. Retrying' - f' in {wait_time}s. {max_retries} more reattempts ' + f'{src_to_bucket}. Retrying sync ' + f'in {wait_time}s. {max_retries} more reattempts ' f'remaining. Check {log_path} for details.') time.sleep(wait_time) - # reset sync pid as the sync process is terminated - _set_running_csync_sync_pid(csync_pid, -1) - run_sync(src, storetype, dst, num_threads, interval_seconds, - delete, no_follow_symlinks, csync_pid, max_retries - 1, - backoff) else: - raise RuntimeError(f'Failed to sync {src_to_bucket} after ' - f'number of retries. Check {log_path} for' - 'details') from None + break + else: + raise RuntimeError(f'Failed to sync {src_to_bucket} after ' + f'{max_retries} number of retries. Check ' + f'{log_path} for more details') from None # run necessary post-processes - _set_running_csync_sync_pid(csync_pid, -1) if storetype == 's3': # set number of threads back to its default value config_cmd = \ @@ -304,7 +297,7 @@ def csync(source: str, storetype: str, destination: str, num_threads: int, # operation, we compute remaining time to wait before the next # sync operation. elapsed_time = int(end_time - start_time) - remaining_interval = max(0, interval_seconds-elapsed_time) + remaining_interval = max(0, interval_seconds - elapsed_time) # sync_pid column is set to 0 when sync is not running time.sleep(remaining_interval)