From b9e81d1bf203f5726d91b3b4efb2346c10cf2efa Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Tue, 10 Dec 2024 13:41:13 -0800 Subject: [PATCH 1/9] feat: support storage deletion recovery --- sky/cli.py | 7 ++++++- sky/utils/subprocess_utils.py | 36 ++++++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 1faf0003ff9..2faf9dc8dc8 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3543,7 +3543,12 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r abort=True, show_default=True) - subprocess_utils.run_in_parallel(sky.storage_delete, names) + results: List[Union[None, Exception]] = subprocess_utils.run_in_parallel( + sky.storage_delete, names, continue_on_error=True) + + for idx, result in enumerate(results): + if isinstance(result, Exception): + click.secho(f'Failed to delete storage {names[idx]}: {result}', fg='red') @cli.group(cls=_NaturalOrderGroup) diff --git a/sky/utils/subprocess_utils.py b/sky/utils/subprocess_utils.py index 992c6bbe3ff..7da98132209 100644 --- a/sky/utils/subprocess_utils.py +++ b/sky/utils/subprocess_utils.py @@ -96,29 +96,47 @@ def get_parallel_threads(cloud_str: Optional[str] = None) -> int: return max(4, cpu_count - 1) * _get_thread_multiplier(cloud_str) +# TODO(andyl): Why this function returns a list of results? Why not yielding? def run_in_parallel(func: Callable, args: Iterable[Any], - num_threads: Optional[int] = None) -> List[Any]: + num_threads: Optional[int] = None, + continue_on_error: bool = False) -> List[Any]: """Run a function in parallel on a list of arguments. - The function 'func' should raise a CommandError if the command fails. - Args: func: The function to run in parallel args: Iterable of arguments to pass to func num_threads: Number of threads to use. If None, uses get_parallel_threads() + continue_on_error: If True, continues execution when errors occur + If False (default), raises the first error immediately Returns: A list of the return values of the function func, in the same order as the - arguments. + arguments. If continue_on_error=True, failed operations will have + their exceptions in the result list. """ - # Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long - processes = num_threads if num_threads is not None else get_parallel_threads( - ) + processes = (num_threads + if num_threads is not None else get_parallel_threads()) with pool.ThreadPool(processes=processes) as p: - # Run the function in parallel on the arguments, keeping the order. - return list(p.imap(func, args)) + ordered_iterators = p.imap(func, args) + + # TODO(andyl): Is this list(ordered_iterators) clear? Maybe we should + # merge two cases, and move this logic deeper to + # `except e: results.append(e) if continue_on_error else raise e` + if not continue_on_error: + return list(ordered_iterators) + else: + results: List[Union[Any, Exception]] = [] + while True: + try: + result = next(ordered_iterators) + results.append(result) + except StopIteration: + break + except Exception as e: # pylint: disable=broad-except + results.append(e) + return results def handle_returncode(returncode: int, From 9f874049c69de0f239642a63d83cdd5501dbf293 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Tue, 10 Dec 2024 14:51:04 -0800 Subject: [PATCH 2/9] fix: check handle None later and add storage name consistency check --- sky/cli.py | 20 +++++++++++--------- sky/core.py | 13 ++++++++----- sky/global_user_state.py | 7 ++++++- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 2faf9dc8dc8..60b8f6622c6 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3525,11 +3525,10 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r if sum([len(names) > 0, all]) != 1: raise click.UsageError('Either --all or a name must be specified.') if all: - storages = sky.storage_ls() - if not storages: + names = global_user_state.get_storage_names() + if not names: click.echo('No storage(s) to delete.') return - names = [s['name'] for s in storages] else: names = _get_glob_storages(names) if names: @@ -3543,12 +3542,15 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r abort=True, show_default=True) - results: List[Union[None, Exception]] = subprocess_utils.run_in_parallel( - sky.storage_delete, names, continue_on_error=True) - - for idx, result in enumerate(results): - if isinstance(result, Exception): - click.secho(f'Failed to delete storage {names[idx]}: {result}', fg='red') + def delete_storage(name: str): + try: + sky.storage_delete(name) + except Exception as e: # pylint: disable=broad-except + click.secho(f'Error deleting storage {name}: {e}', fg='red') + + subprocess_utils.run_in_parallel(delete_storage, + names, + continue_on_error=True) @cli.group(cls=_NaturalOrderGroup) diff --git a/sky/core.py b/sky/core.py index 9f1288d7fb6..732d7202bc5 100644 --- a/sky/core.py +++ b/sky/core.py @@ -915,8 +915,11 @@ def storage_delete(name: str) -> None: handle = global_user_state.get_handle_from_storage_name(name) if handle is None: raise ValueError(f'Storage name {name!r} not found.') - else: - storage_object = data.Storage(name=handle.storage_name, - source=handle.source, - sync_on_reconstruction=False) - storage_object.delete() + + assert handle.storage_name == name, ( + f'In global_user_state, storage name {name!r} does not match ' + f'handle.storage_name {handle.storage_name!r}') + storage_object = data.Storage(name=handle.storage_name, + source=handle.source, + sync_on_reconstruction=False) + storage_object.delete() diff --git a/sky/global_user_state.py b/sky/global_user_state.py index 2a5cbc7eb3f..52dad495c3d 100644 --- a/sky/global_user_state.py +++ b/sky/global_user_state.py @@ -826,8 +826,13 @@ def get_storage_names_start_with(starts_with: str) -> List[str]: return [row[0] for row in rows] +def get_storage_names() -> List[str]: + rows = _DB.cursor.execute('SELECT name FROM storage') + return [row[0] for row in rows] + + def get_storage() -> List[Dict[str, Any]]: - rows = _DB.cursor.execute('select * from storage') + rows = _DB.cursor.execute('SELECT * FROM storage') records = [] for name, launched_at, handle, last_use, status in rows: # TODO: use namedtuple instead of dict From 4fd28bba066255d9ab0a5b9a101182b88c88f0bb Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 11 Dec 2024 11:47:17 -0800 Subject: [PATCH 3/9] Apply suggestions from code review Co-authored-by: Tian Xia --- sky/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index 60b8f6622c6..60e054b9128 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3542,7 +3542,7 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r abort=True, show_default=True) - def delete_storage(name: str): + def delete_storage(name: str) -> None: try: sky.storage_delete(name) except Exception as e: # pylint: disable=broad-except From 36ca3a36698ea05cd3ac32a381056d8488ebfc82 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 11 Dec 2024 11:49:45 -0800 Subject: [PATCH 4/9] refactor: use storage_name=* to replace a trivial function --- sky/cli.py | 2 +- sky/global_user_state.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 60e054b9128..5bba5021ce5 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3525,7 +3525,7 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r if sum([len(names) > 0, all]) != 1: raise click.UsageError('Either --all or a name must be specified.') if all: - names = global_user_state.get_storage_names() + names = global_user_state.get_glob_storage_name('*') if not names: click.echo('No storage(s) to delete.') return diff --git a/sky/global_user_state.py b/sky/global_user_state.py index 52dad495c3d..2c1ecc33575 100644 --- a/sky/global_user_state.py +++ b/sky/global_user_state.py @@ -826,11 +826,6 @@ def get_storage_names_start_with(starts_with: str) -> List[str]: return [row[0] for row in rows] -def get_storage_names() -> List[str]: - rows = _DB.cursor.execute('SELECT name FROM storage') - return [row[0] for row in rows] - - def get_storage() -> List[Dict[str, Any]]: rows = _DB.cursor.execute('SELECT * FROM storage') records = [] From 8b7b0753318a2cc4b6a23c79e82096f2fceca1f0 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 11 Dec 2024 12:01:59 -0800 Subject: [PATCH 5/9] refactor: fine-grained catch --- sky/cli.py | 7 ++++++- sky/data/storage.py | 21 +++++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 5bba5021ce5..79a77130424 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3545,8 +3545,13 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r def delete_storage(name: str) -> None: try: sky.storage_delete(name) - except Exception as e: # pylint: disable=broad-except + except (exceptions.StorageBucketDeleteError, PermissionError) as e: + click.secho(f'Error deleting storage {name}: {e}', fg='red') + except ValueError as e: click.secho(f'Error deleting storage {name}: {e}', fg='red') + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise e subprocess_utils.run_in_parallel(delete_storage, names, diff --git a/sky/data/storage.py b/sky/data/storage.py index 897f2f96b94..63544e1c353 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -950,18 +950,16 @@ def delete(self, store_type: Optional[StoreType] = None) -> None: if not self.stores: logger.info('No backing stores found. Deleting storage.') global_user_state.remove_storage(self.name) - if store_type: + if store_type is not None: store = self.stores[store_type] - is_sky_managed = store.is_sky_managed # We delete a store from the cloud if it's sky managed. Else just # remove handle and return - if is_sky_managed: + if store.is_sky_managed: self.handle.remove_store(store) store.delete() # Check remaining stores - if none is sky managed, remove # the storage from global_user_state. - delete = all( - s.is_sky_managed is False for s in self.stores.values()) + delete = all(not s.is_sky_managed for s in self.stores.values()) if delete: global_user_state.remove_storage(self.name) else: @@ -1491,6 +1489,9 @@ def _delete_s3_bucket(self, bucket_name: str) -> bool: Returns: bool; True if bucket was deleted, False if it was deleted externally. + + Raises: + StorageBucketDeleteError: If deleting the bucket fails. """ # Deleting objects is very slow programatically # (i.e. bucket.objects.all().delete() is slow). @@ -1934,6 +1935,11 @@ def _delete_gcs_bucket(self, bucket_name: str) -> bool: Returns: bool; True if bucket was deleted, False if it was deleted externally. + + Raises: + StorageBucketDeleteError: If deleting the bucket fails. + PermissionError: If the bucket is external and the user is not + allowed to delete it. """ with rich_utils.safe_status( @@ -3096,6 +3102,9 @@ def _delete_r2_bucket(self, bucket_name: str) -> bool: Returns: bool; True if bucket was deleted, False if it was deleted externally. + + Raises: + StorageBucketDeleteError: If deleting the bucket fails. """ # Deleting objects is very slow programatically # (i.e. bucket.objects.all().delete() is slow). @@ -3532,7 +3541,7 @@ def _create_cos_bucket(self, return self.bucket - def _delete_cos_bucket(self): + def _delete_cos_bucket(self) -> None: bucket = self.s3_resource.Bucket(self.name) try: bucket_versioning = self.s3_resource.BucketVersioning(self.name) From fa6037fc0d3e44d823995b364d4632c8a04fefaf Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 11 Dec 2024 12:04:20 -0800 Subject: [PATCH 6/9] revert: run_in_parallel `continue_on_errors` --- sky/cli.py | 4 +--- sky/utils/subprocess_utils.py | 29 ++++++----------------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 79a77130424..afbcec5e13e 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3553,9 +3553,7 @@ def delete_storage(name: str) -> None: with ux_utils.print_exception_no_traceback(): raise e - subprocess_utils.run_in_parallel(delete_storage, - names, - continue_on_error=True) + subprocess_utils.run_in_parallel(delete_storage, names) @cli.group(cls=_NaturalOrderGroup) diff --git a/sky/utils/subprocess_utils.py b/sky/utils/subprocess_utils.py index 7da98132209..a83d9fa8262 100644 --- a/sky/utils/subprocess_utils.py +++ b/sky/utils/subprocess_utils.py @@ -99,8 +99,7 @@ def get_parallel_threads(cloud_str: Optional[str] = None) -> int: # TODO(andyl): Why this function returns a list of results? Why not yielding? def run_in_parallel(func: Callable, args: Iterable[Any], - num_threads: Optional[int] = None, - continue_on_error: bool = False) -> List[Any]: + num_threads: Optional[int] = None) -> List[Any]: """Run a function in parallel on a list of arguments. Args: @@ -108,35 +107,19 @@ def run_in_parallel(func: Callable, args: Iterable of arguments to pass to func num_threads: Number of threads to use. If None, uses get_parallel_threads() - continue_on_error: If True, continues execution when errors occur - If False (default), raises the first error immediately Returns: A list of the return values of the function func, in the same order as the - arguments. If continue_on_error=True, failed operations will have - their exceptions in the result list. + arguments. + + Raises: + Exception: The first exception encountered. """ processes = (num_threads if num_threads is not None else get_parallel_threads()) with pool.ThreadPool(processes=processes) as p: ordered_iterators = p.imap(func, args) - - # TODO(andyl): Is this list(ordered_iterators) clear? Maybe we should - # merge two cases, and move this logic deeper to - # `except e: results.append(e) if continue_on_error else raise e` - if not continue_on_error: - return list(ordered_iterators) - else: - results: List[Union[Any, Exception]] = [] - while True: - try: - result = next(ordered_iterators) - results.append(result) - except StopIteration: - break - except Exception as e: # pylint: disable=broad-except - results.append(e) - return results + return list(ordered_iterators) def handle_returncode(returncode: int, From 1d67da7ed0fbd0c7113ba268595ae2c589b24a43 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Wed, 11 Dec 2024 12:05:23 -0800 Subject: [PATCH 7/9] chore: remove TODO --- sky/utils/subprocess_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/utils/subprocess_utils.py b/sky/utils/subprocess_utils.py index a83d9fa8262..e38ed3ec87a 100644 --- a/sky/utils/subprocess_utils.py +++ b/sky/utils/subprocess_utils.py @@ -96,7 +96,6 @@ def get_parallel_threads(cloud_str: Optional[str] = None) -> int: return max(4, cpu_count - 1) * _get_thread_multiplier(cloud_str) -# TODO(andyl): Why this function returns a list of results? Why not yielding? def run_in_parallel(func: Callable, args: Iterable[Any], num_threads: Optional[int] = None) -> List[Any]: From cee19ea097315290eba89d0823e71340b9a129c3 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Thu, 19 Dec 2024 22:05:18 -0800 Subject: [PATCH 8/9] style: comment on why * Co-authored-by: Tian Xia --- sky/cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sky/cli.py b/sky/cli.py index afbcec5e13e..9bfa0f06d0a 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3525,7 +3525,8 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r if sum([len(names) > 0, all]) != 1: raise click.UsageError('Either --all or a name must be specified.') if all: - names = global_user_state.get_glob_storage_name('*') + # Use '*' to get all storages. + names = global_user_state.get_glob_storage_name(cluster_name='*') if not names: click.echo('No storage(s) to delete.') return From 47d01fa98a53d0201591e5aee89c2956cf2584bb Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Fri, 20 Dec 2024 06:12:28 +0000 Subject: [PATCH 9/9] revert: use broad-except --- sky/cli.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 9bfa0f06d0a..66360d91847 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3526,7 +3526,7 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r raise click.UsageError('Either --all or a name must be specified.') if all: # Use '*' to get all storages. - names = global_user_state.get_glob_storage_name(cluster_name='*') + names = global_user_state.get_glob_storage_name(storage_name='*') if not names: click.echo('No storage(s) to delete.') return @@ -3546,13 +3546,8 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r def delete_storage(name: str) -> None: try: sky.storage_delete(name) - except (exceptions.StorageBucketDeleteError, PermissionError) as e: - click.secho(f'Error deleting storage {name}: {e}', fg='red') - except ValueError as e: - click.secho(f'Error deleting storage {name}: {e}', fg='red') except Exception as e: # pylint: disable=broad-except - with ux_utils.print_exception_no_traceback(): - raise e + click.secho(f'Error deleting storage {name}: {e}', fg='red') subprocess_utils.run_in_parallel(delete_storage, names)