Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
landscapepainter committed Sep 17, 2023
1 parent 2bec928 commit 6e1e9f2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 83 deletions.
2 changes: 1 addition & 1 deletion sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2897,7 +2897,7 @@ def wait_and_terminate_csync(cluster_name: str) -> None:
handle.docker_user)
runners = command_runner.SSHCommandRunner.make_runner_list(
ip_list, port_list=port_list, **ssh_credentials)
csync_terminate_cmd = ('python -m sky.data.skystorage terminate -a '
csync_terminate_cmd = ('python -m sky.data.storage_csync terminate -a '
'>/dev/null')

def _run_csync_terminate(runner):
Expand Down
17 changes: 9 additions & 8 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3009,8 +3009,10 @@ def _sync_file_mounts(
) -> None:
"""Mounts all user files to the remote nodes."""
self._execute_file_mounts(handle, all_file_mounts)
self._execute_storage_mounts(handle, storage_mounts, storage_utils.StorageMode.MOUNT)
self._execute_storage_mounts(handle, storage_mounts, storage_utils.StorageMode.CSYNC)
self._execute_storage_mounts(handle, storage_mounts,
storage_utils.StorageMode.MOUNT)
self._execute_storage_mounts(handle, storage_mounts,
storage_utils.StorageMode.CSYNC)
self._set_cluster_storage_mounts_metadata(handle.cluster_name,
storage_mounts)

Expand Down Expand Up @@ -4414,8 +4416,7 @@ def _symlink_node(runner: command_runner.SSHCommandRunner):
logger.debug(f'File mount sync took {end - start} seconds.')

def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle,
storage_mounts: Dict[Path,
storage_lib.Storage],
storage_mounts: Dict[Path, storage_lib.Storage],
mount_mode: storage_utils.StorageMode):
"""Executes storage mounts: installing mounting tools and mounting."""
# Process only MOUNT mode objects here. COPY mode objects have been
Expand All @@ -4441,7 +4442,7 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle,
if mount_mode == storage_utils.StorageMode.MOUNT:
mode_str = 'mount'
action_message = 'Mounting'
else: # CSYNC mdoe
else: # CSYNC mdoe
mode_str = 'csync'
action_message = 'Setting up CSYNC'

Expand All @@ -4468,8 +4469,9 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle,
store = list(storage_obj.stores.values())[0]
if mount_mode == storage_utils.StorageMode.MOUNT:
mount_cmd = store.mount_command(dst)
else: # CSYNC mode
mount_cmd= store.csync_command(dst)
else: # CSYNC mode
mount_cmd = store.csync_command(dst,
storage_obj.interval_seconds)
src_print = (storage_obj.source
if storage_obj.source else storage_obj.name)
if isinstance(src_print, list):
Expand Down Expand Up @@ -4510,7 +4512,6 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle,
end = time.time()
logger.debug(f'Setting storage {mode_str} took {end - start} seconds.')


def _set_cluster_storage_mounts_metadata(
self, cluster_name: str,
storage_mounts: Dict[Path, storage_lib.Storage]) -> None:
Expand Down
34 changes: 17 additions & 17 deletions sky/data/mounting_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def get_mounting_command(
MOUNT and CSYNC
mount_path: Path to mount the bucket at.
mount_cmd: Command to mount the bucket. Should be single line.
install_cmd: Command to install the mounting utility. Should be
single line.
install_cmd: Command to install the mounting utility for MOUNT mode.
Should be single line.
Returns:
str: Mounting command with the mounting script as a heredoc.
Expand All @@ -58,17 +58,17 @@ def get_mounting_command(
MOUNT_BINARY={mount_binary}
# Check if path is already mounted
if grep -q $MOUNT_PATH /proc/mounts ; then
echo "Path already mounted - unmounting..."
fusermount -uz "$MOUNT_PATH"
echo "Successfully unmounted $MOUNT_PATH."
echo "Path already mounted - unmounting..."
fusermount -uz "$MOUNT_PATH"
echo "Successfully unmounted $MOUNT_PATH."
fi
# Install MOUNT_BINARY if not already installed
if {installed_check}; then
echo "$MOUNT_BINARY already installed. Proceeding..."
echo "$MOUNT_BINARY already installed. Proceeding..."
else
echo "Installing $MOUNT_BINARY..."
{install_cmd}
echo "Installing $MOUNT_BINARY..."
{install_cmd}
fi
fi
Expand All @@ -81,21 +81,21 @@ def get_mounting_command(
# Check if mount path contains files for MOUNT mode only
if [ "$MOUNT_MODE" = "MOUNT" ]; then
if [ "$(ls -A $MOUNT_PATH)" ]; then
echo "Mount path $MOUNT_PATH is not empty. Please mount to another path or remove it first."
exit {exceptions.MOUNT_PATH_NON_EMPTY_CODE}
echo "Mount path $MOUNT_PATH is not empty. Please mount to another path or remove it first."
exit {exceptions.MOUNT_PATH_NON_EMPTY_CODE}
fi
fi
fi
if [ "$MOUNT_MODE" = "MOUNT" ]; then
echo "Mounting source bucket to $MOUNT_PATH with $MOUNT_BINARY..."
{mount_cmd}
echo "Mounting done."
echo "Mounting source bucket to $MOUNT_PATH with $MOUNT_BINARY..."
{mount_cmd}
echo "Mounting done."
else
# running CSYNC cmd
echo "Setting up CSYNC on $MOUNT_PATH to source bucket..."
setsid {mount_cmd} >/dev/null 2>&1 &
echo "CSYNC is set."
# running CSYNC cmd
echo "Setting up CSYNC on $MOUNT_PATH to source bucket..."
setsid {mount_cmd} >/dev/null 2>&1 &
echo "CSYNC is set."
fi
""")

Expand Down
18 changes: 11 additions & 7 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ def __init__(self,
self.persistent = persistent
self.mode = mode
assert mode in StorageMode
self.interval_seconds = interval_seconds
if self.mode != StorageMode.CSYNC:
assert interval_seconds is None
self.sync_on_reconstruction = sync_on_reconstruction

# TODO(romilb, zhwu): This is a workaround to support storage deletion
Expand Down Expand Up @@ -746,7 +747,8 @@ def _add_store_from_metadata(self, sky_stores) -> None:
self._add_store(store, is_reconstructed=True)

@classmethod
def from_metadata(cls, metadata: StorageMetadata, **override_args):
def from_metadata(cls, metadata: StorageMetadata,
**override_args) -> 'Storage':
"""Create Storage from a StorageMetadata object.
Used when reconstructing Storage and Store objects from
Expand Down Expand Up @@ -988,6 +990,7 @@ class S3Store(AbstractStore):
"""

_ACCESS_DENIED_MESSAGE = 'Access Denied'
_CSYNC_DEFAULT_INTERVAL_SECONDS = 600

def __init__(self,
name: str,
Expand Down Expand Up @@ -1308,15 +1311,15 @@ def mount_command(self, mount_path: str) -> str:

def csync_command(self,
csync_path: str,
interval_seconds: Optional[int] = 600) -> str:
interval_seconds: Optional[int] = None) -> str:
"""Returns command to mount CSYNC with Storage bucket on CSYNC_PATH.
Args:
csync_path: str; Path to continuously sync the bucket to.
interval_seconds: int; runs the sync command every INTERVAL seconds
"""
if interval_seconds is None:
interval_seconds = 600
interval_seconds = self._CSYNC_DEFAULT_INTERVAL_SECONDS
if data_utils.is_cloud_store_url(self.source):
if self.source is not None:
if isinstance(self.source, (str, Path)):
Expand All @@ -1327,7 +1330,7 @@ def csync_command(self,
)
else:
dst = self.bucket.name
csync_cmd = (f'python -m sky.data.skystorage csync {csync_path} '
csync_cmd = (f'python -m sky.data.storage_csync csync {csync_path} '
f's3 {dst} --interval-seconds {interval_seconds} '
'--delete --no-follow-symlinks')
return mounting_utils.get_mounting_command(StorageMode.CSYNC,
Expand Down Expand Up @@ -1407,6 +1410,7 @@ class GcsStore(AbstractStore):

_ACCESS_DENIED_MESSAGE = 'AccessDeniedException'
GCSFUSE_VERSION = '1.0.1'
_CSYNC_DEFAULT_INTERVAL_SECONDS = 600

def __init__(self,
name: str,
Expand Down Expand Up @@ -1776,7 +1780,7 @@ def csync_command(self,
interval_seconds: int; runs the sync command every INTERVAL seconds
"""
if interval_seconds is None:
interval_seconds = 600
interval_seconds = self._CSYNC_DEFAULT_INTERVAL_SECONDS
if data_utils.is_cloud_store_url(self.source):
if self.source is not None:
if isinstance(self.source, (str, Path)):
Expand All @@ -1787,7 +1791,7 @@ def csync_command(self,
)
else:
dst = self.bucket.name
csync_cmd = (f'python -m sky.data.skystorage csync {csync_path} '
csync_cmd = (f'python -m sky.data.storage_csync csync {csync_path} '
f'gcs {dst} --interval-seconds {interval_seconds} '
'--delete --no-follow-symlinks')
return mounting_utils.get_mounting_command(StorageMode.CSYNC,
Expand Down
Loading

0 comments on commit 6e1e9f2

Please sign in to comment.