Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Continue storage deletion in sky storage delete when some fail #4454

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
18 changes: 14 additions & 4 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3525,11 +3525,10 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
if sum([len(names) > 0, all]) != 1:
raise click.UsageError('Either --all or a name must be specified.')
if all:
storages = sky.storage_ls()
if not storages:
names = global_user_state.get_glob_storage_name('*')
andylizf marked this conversation as resolved.
Show resolved Hide resolved
if not names:
click.echo('No storage(s) to delete.')
return
names = [s['name'] for s in storages]
else:
names = _get_glob_storages(names)
if names:
Expand All @@ -3543,7 +3542,18 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
abort=True,
show_default=True)

subprocess_utils.run_in_parallel(sky.storage_delete, names)
def delete_storage(name: str) -> None:
try:
sky.storage_delete(name)
except (exceptions.StorageBucketDeleteError, PermissionError) as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')
except ValueError as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except (exceptions.StorageBucketDeleteError, PermissionError) as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')
except ValueError as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')
except (exceptions.StorageBucketDeleteError, PermissionError, ValueError) as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')

can we merge them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, seems like here is one exception that is not being caught. After a second thought, for simplicity, we can probably just do broad except here..

(skypilot) (base) ➜  skypilot git:(remove_token) ✗ sky storage delete -a                                                   
Deleting 15 storages: skypilot-workdir-windsey-afa32d67, skypilot-workdir-windsey-81d2115f, skypilot-workdir-windsey-b8fd0382, skypilot-workdir-windsey-499178bb, skypilot-workdir-windsey-dd64cc00, skypilot-workdir-windsey-d04e17a8, skypilot-workdir-windsey-43925bf5, skypilot-workdir-windsey-3375424a, skypilot-workdir-windsey-5e824f17, skypilot-workdir-windsey-db755382, skypilot-workdir-windsey-25270ec6, skypilot-workdir-windsey-1cb4689a, skypilot-workdir-windsey-f40df433, skypilot-workdir-windsey-2ef8a0bf, skypilot-workdir-windsey-58878e24. Proceed? [Y/n]: 
No backing stores found. Deleting storage.
botocore.exceptions.ClientError: An error occurred (403) when calling the HeadBucket operation: Forbidden

The above exception was the direct cause of the following exception:

sky.exceptions.StorageBucketGetError: Failed to access existing bucket 'skypilot-workdir-windsey-afa32d67'. This is likely because it is a private bucket you do not have access to.
To fix: 
  1. If you are trying to create a new bucket: use a different name.
  2. If you are trying to connect to an existing bucket: make sure your cloud credentials have access to it. To debug, consider running `aws s3 ls skypilot-workdir-windsey-afa32d67`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I agree

except Exception as e: # pylint: disable=broad-except
with ux_utils.print_exception_no_traceback():
raise e

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except Exception as e: # pylint: disable=broad-except
with ux_utils.print_exception_no_traceback():
raise e

It seems like we dont need it? We can simply not except it. And any exceptions raised by sky.storage_delete should already have the surpress traceback context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why they should have the traceback context supressed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cuz this is a user-facing API and we want to keep the error message simple. The content in the error message should be self-explanatory.

subprocess_utils.run_in_parallel(delete_storage, names)


@cli.group(cls=_NaturalOrderGroup)
Expand Down
13 changes: 8 additions & 5 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,11 @@ def storage_delete(name: str) -> None:
handle = global_user_state.get_handle_from_storage_name(name)
if handle is None:
raise ValueError(f'Storage name {name!r} not found.')
else:
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()

assert handle.storage_name == name, (
f'In global_user_state, storage name {name!r} does not match '
f'handle.storage_name {handle.storage_name!r}')
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()
21 changes: 15 additions & 6 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,18 +950,16 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
if not self.stores:
logger.info('No backing stores found. Deleting storage.')
global_user_state.remove_storage(self.name)
if store_type:
if store_type is not None:
store = self.stores[store_type]
is_sky_managed = store.is_sky_managed
# We delete a store from the cloud if it's sky managed. Else just
# remove handle and return
if is_sky_managed:
if store.is_sky_managed:
self.handle.remove_store(store)
store.delete()
# Check remaining stores - if none is sky managed, remove
# the storage from global_user_state.
delete = all(
s.is_sky_managed is False for s in self.stores.values())
delete = all(not s.is_sky_managed for s in self.stores.values())
if delete:
global_user_state.remove_storage(self.name)
else:
Expand Down Expand Up @@ -1491,6 +1489,9 @@ def _delete_s3_bucket(self, bucket_name: str) -> bool:

Returns:
bool; True if bucket was deleted, False if it was deleted externally.

Raises:
StorageBucketDeleteError: If deleting the bucket fails.
"""
# Deleting objects is very slow programatically
# (i.e. bucket.objects.all().delete() is slow).
Expand Down Expand Up @@ -1934,6 +1935,11 @@ def _delete_gcs_bucket(self, bucket_name: str) -> bool:

Returns:
bool; True if bucket was deleted, False if it was deleted externally.

Raises:
StorageBucketDeleteError: If deleting the bucket fails.
PermissionError: If the bucket is external and the user is not
allowed to delete it.
"""

with rich_utils.safe_status(
Expand Down Expand Up @@ -3096,6 +3102,9 @@ def _delete_r2_bucket(self, bucket_name: str) -> bool:

Returns:
bool; True if bucket was deleted, False if it was deleted externally.

Raises:
StorageBucketDeleteError: If deleting the bucket fails.
"""
# Deleting objects is very slow programatically
# (i.e. bucket.objects.all().delete() is slow).
Expand Down Expand Up @@ -3532,7 +3541,7 @@ def _create_cos_bucket(self,

return self.bucket

def _delete_cos_bucket(self):
def _delete_cos_bucket(self) -> None:
bucket = self.s3_resource.Bucket(self.name)
try:
bucket_versioning = self.s3_resource.BucketVersioning(self.name)
Expand Down
2 changes: 1 addition & 1 deletion sky/global_user_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def get_storage_names_start_with(starts_with: str) -> List[str]:


def get_storage() -> List[Dict[str, Any]]:
rows = _DB.cursor.execute('select * from storage')
rows = _DB.cursor.execute('SELECT * FROM storage')
records = []
for name, launched_at, handle, last_use, status in rows:
# TODO: use namedtuple instead of dict
Expand Down
16 changes: 8 additions & 8 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ def run_in_parallel(func: Callable,
num_threads: Optional[int] = None) -> List[Any]:
"""Run a function in parallel on a list of arguments.

The function 'func' should raise a CommandError if the command fails.

Args:
func: The function to run in parallel
args: Iterable of arguments to pass to func
Expand All @@ -111,14 +109,16 @@ def run_in_parallel(func: Callable,

Returns:
A list of the return values of the function func, in the same order as the
arguments.
arguments.

Raises:
Exception: The first exception encountered.
"""
# Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not removing this reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean why remove the reference? I think it's trivial now, and the reference doesn't add much value to the code.

processes = num_threads if num_threads is not None else get_parallel_threads(
)
processes = (num_threads
if num_threads is not None else get_parallel_threads())
with pool.ThreadPool(processes=processes) as p:
# Run the function in parallel on the arguments, keeping the order.
return list(p.imap(func, args))
ordered_iterators = p.imap(func, args)
return list(ordered_iterators)


def handle_returncode(returncode: int,
Expand Down
Loading