Skip to content

Commit

Permalink
avoid catching ValueError during failover (#4432)
Browse files Browse the repository at this point in the history
* avoid catching ValueError during failover

If the cloud api raises ValueError or a subclass of ValueError during instance
termination, we will assume the cluster was downed. Fix this by introducing a
new exception ClusterDoesNotExist that we can catch instead of the more general
ValueError.

* add unit test

* lint
  • Loading branch information
cg505 authored Dec 3, 2024
1 parent 747382a commit 6f96e7a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 25 deletions.
9 changes: 5 additions & 4 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1612,14 +1612,14 @@ def check_can_clone_disk_and_override_task(
The task to use and the resource handle of the source cluster.
Raises:
ValueError: If the source cluster does not exist.
exceptions.ClusterDoesNotExist: If the source cluster does not exist.
exceptions.NotSupportedError: If the source cluster is not valid or the
task is not compatible to clone disk from the source cluster.
"""
source_cluster_status, handle = refresh_cluster_status_handle(cluster_name)
if source_cluster_status is None:
with ux_utils.print_exception_no_traceback():
raise ValueError(
raise exceptions.ClusterDoesNotExist(
f'Cannot find cluster {cluster_name!r} to clone disk from.')

if not isinstance(handle, backends.CloudVmRayResourceHandle):
Expand Down Expand Up @@ -2136,7 +2136,7 @@ def check_cluster_available(
"""Check if the cluster is available.
Raises:
ValueError: if the cluster does not exist.
exceptions.ClusterDoesNotExist: if the cluster does not exist.
exceptions.ClusterNotUpError: if the cluster is not UP.
exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -2201,7 +2201,8 @@ def check_cluster_available(
error_msg += message

with ux_utils.print_exception_no_traceback():
raise ValueError(f'{colorama.Fore.YELLOW}{error_msg}{reset}')
raise exceptions.ClusterDoesNotExist(
f'{colorama.Fore.YELLOW}{error_msg}{reset}')
assert cluster_status is not None, 'handle is not None but status is None'
backend = get_backend_from_handle(handle)
if check_cloud_vm_ray_backend and not isinstance(
Expand Down
43 changes: 25 additions & 18 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ def _start(
cluster_status, handle = backend_utils.refresh_cluster_status_handle(
cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
raise exceptions.ClusterDoesNotExist(
f'Cluster {cluster_name!r} does not exist.')
if not force and cluster_status == status_lib.ClusterStatus.UP:
sky_logging.print(f'Cluster {cluster_name!r} is already up.')
return handle
Expand Down Expand Up @@ -359,12 +360,13 @@ def start(
Useful for upgrading SkyPilot runtime.
Raises:
ValueError: argument values are invalid: (1) the specified cluster does
not exist; (2) if ``down`` is set to True but
``idle_minutes_to_autostop`` is None; (3) if the specified cluster is
the managed jobs controller, and either ``idle_minutes_to_autostop``
is not None or ``down`` is True (omit them to use the default
autostop settings).
ValueError: argument values are invalid: (1) if ``down`` is set to True
but ``idle_minutes_to_autostop`` is None; (2) if the specified
cluster is the managed jobs controller, and either
``idle_minutes_to_autostop`` is not None or ``down`` is True (omit
them to use the default autostop settings).
sky.exceptions.ClusterDoesNotExist: the specified cluster does not
exist.
sky.exceptions.NotSupportedError: if the cluster to restart was
launched using a non-default backend that does not support this
operation.
Expand Down Expand Up @@ -412,7 +414,8 @@ def stop(cluster_name: str, purge: bool = False) -> None:
related resources.
Raises:
ValueError: the specified cluster does not exist.
sky.exceptions.ClusterDoesNotExist: the specified cluster does not
exist.
RuntimeError: failed to stop the cluster.
sky.exceptions.NotSupportedError: if the specified cluster is a spot
cluster, or a TPU VM Pod cluster, or the managed jobs controller.
Expand All @@ -423,7 +426,8 @@ def stop(cluster_name: str, purge: bool = False) -> None:
f'is not supported.')
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
raise exceptions.ClusterDoesNotExist(
f'Cluster {cluster_name!r} does not exist.')

backend = backend_utils.get_backend_from_handle(handle)

Expand Down Expand Up @@ -467,14 +471,16 @@ def down(cluster_name: str, purge: bool = False) -> None:
resources.
Raises:
ValueError: the specified cluster does not exist.
sky.exceptions.ClusterDoesNotExist: the specified cluster does not
exist.
RuntimeError: failed to tear down the cluster.
sky.exceptions.NotSupportedError: the specified cluster is the managed
jobs controller.
"""
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
raise exceptions.ClusterDoesNotExist(
f'Cluster {cluster_name!r} does not exist.')

usage_lib.record_cluster_name_for_current_operation(cluster_name)
backend = backend_utils.get_backend_from_handle(handle)
Expand Down Expand Up @@ -521,7 +527,7 @@ def autostop(
rather than autostop (restartable).
Raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend or the cluster is TPU VM Pod.
Expand Down Expand Up @@ -615,7 +621,7 @@ def queue(cluster_name: str,
}
]
raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -674,7 +680,8 @@ def cancel(
worker node is preempted in the spot cluster.
Raises:
ValueError: if arguments are invalid, or the cluster does not exist.
ValueError: if arguments are invalid.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the specified cluster is a
controller that does not support this operation.
Expand Down Expand Up @@ -750,8 +757,8 @@ def tail_logs(cluster_name: str,
Please refer to the sky.cli.tail_logs for the document.
Raises:
ValueError: arguments are invalid or the cluster is not supported or
the cluster does not exist.
ValueError: if arguments are invalid or the cluster is not supported.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -793,7 +800,7 @@ def download_logs(
Returns:
Dict[str, str]: a mapping of job_id to local log path.
Raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -838,7 +845,7 @@ def job_status(cluster_name: str,
If job_ids is None and there is no job on the cluster, it will return
{None: None}.
Raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down
7 changes: 7 additions & 0 deletions sky/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ class ClusterSetUpError(Exception):
pass


class ClusterDoesNotExist(ValueError):
"""Raise when trying to operate on a cluster that does not exist."""
# This extends ValueError for compatibility reasons - we used to throw
# ValueError instead of this.
pass


class NotSupportedError(Exception):
"""Raised when a feature is not supported."""
pass
Expand Down
5 changes: 3 additions & 2 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,9 @@ def exec( # pylint: disable=redefined-builtin
submitted.
Raises:
ValueError: if the specified cluster does not exist or is not in UP
status.
ValueError: if the specified cluster is not in UP status.
sky.exceptions.ClusterDoesNotExist: if the specified cluster does not
exist.
sky.exceptions.NotSupportedError: if the specified cluster is a
controller that does not support this operation.
Expand Down
3 changes: 2 additions & 1 deletion sky/jobs/recovery_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def terminate_cluster(cluster_name: str, max_retry: int = 3) -> None:
usage_lib.messages.usage.set_internal()
sky.down(cluster_name)
return
except ValueError:
except exceptions.ClusterDoesNotExist:
# The cluster is already down.
logger.debug(f'The cluster {cluster_name} is already down.')
return
except Exception as e: # pylint: disable=broad-except
retry_cnt += 1
Expand Down
48 changes: 48 additions & 0 deletions tests/unit_tests/test_recovery_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from unittest import mock

from sky.exceptions import ClusterDoesNotExist
from sky.jobs import recovery_strategy


@mock.patch('sky.down')
@mock.patch('sky.usage.usage_lib.messages.usage.set_internal')
def test_terminate_cluster_retry_on_value_error(mock_set_internal,
mock_sky_down) -> None:
# Set up mock to fail twice with ValueError, then succeed
mock_sky_down.side_effect = [
ValueError('Mock error 1'),
ValueError('Mock error 2'),
None,
]

# Call should succeed after retries
recovery_strategy.terminate_cluster('test-cluster')

# Verify sky.down was called 3 times
assert mock_sky_down.call_count == 3
mock_sky_down.assert_has_calls([
mock.call('test-cluster'),
mock.call('test-cluster'),
mock.call('test-cluster'),
])

# Verify usage.set_internal was called before each sky.down
assert mock_set_internal.call_count == 3


@mock.patch('sky.down')
@mock.patch('sky.usage.usage_lib.messages.usage.set_internal')
def test_terminate_cluster_handles_nonexistent_cluster(mock_set_internal,
mock_sky_down) -> None:
# Set up mock to raise ClusterDoesNotExist
mock_sky_down.side_effect = ClusterDoesNotExist('test-cluster')

# Call should succeed silently
recovery_strategy.terminate_cluster('test-cluster')

# Verify sky.down was called once
assert mock_sky_down.call_count == 1
mock_sky_down.assert_called_once_with('test-cluster')

# Verify usage.set_internal was called once
assert mock_set_internal.call_count == 1

0 comments on commit 6f96e7a

Please sign in to comment.