From 6f96e7a451714dd9180156df393b2cf8a10e5c1f Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Tue, 3 Dec 2024 11:01:10 -0800 Subject: [PATCH] avoid catching ValueError during failover (#4432) * avoid catching ValueError during failover If the cloud api raises ValueError or a subclass of ValueError during instance termination, we will assume the cluster was downed. Fix this by introducing a new exception ClusterDoesNotExist that we can catch instead of the more general ValueError. * add unit test * lint --- sky/backends/backend_utils.py | 9 ++-- sky/core.py | 43 +++++++++++-------- sky/exceptions.py | 7 ++++ sky/execution.py | 5 ++- sky/jobs/recovery_strategy.py | 3 +- tests/unit_tests/test_recovery_strategy.py | 48 ++++++++++++++++++++++ 6 files changed, 90 insertions(+), 25 deletions(-) create mode 100644 tests/unit_tests/test_recovery_strategy.py diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index a116681da1b..9c56546234a 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1612,14 +1612,14 @@ def check_can_clone_disk_and_override_task( The task to use and the resource handle of the source cluster. Raises: - ValueError: If the source cluster does not exist. + exceptions.ClusterDoesNotExist: If the source cluster does not exist. exceptions.NotSupportedError: If the source cluster is not valid or the task is not compatible to clone disk from the source cluster. """ source_cluster_status, handle = refresh_cluster_status_handle(cluster_name) if source_cluster_status is None: with ux_utils.print_exception_no_traceback(): - raise ValueError( + raise exceptions.ClusterDoesNotExist( f'Cannot find cluster {cluster_name!r} to clone disk from.') if not isinstance(handle, backends.CloudVmRayResourceHandle): @@ -2136,7 +2136,7 @@ def check_cluster_available( """Check if the cluster is available. Raises: - ValueError: if the cluster does not exist. + exceptions.ClusterDoesNotExist: if the cluster does not exist. exceptions.ClusterNotUpError: if the cluster is not UP. exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -2201,7 +2201,8 @@ def check_cluster_available( error_msg += message with ux_utils.print_exception_no_traceback(): - raise ValueError(f'{colorama.Fore.YELLOW}{error_msg}{reset}') + raise exceptions.ClusterDoesNotExist( + f'{colorama.Fore.YELLOW}{error_msg}{reset}') assert cluster_status is not None, 'handle is not None but status is None' backend = get_backend_from_handle(handle) if check_cloud_vm_ray_backend and not isinstance( diff --git a/sky/core.py b/sky/core.py index 4bb12f4a21a..9f1288d7fb6 100644 --- a/sky/core.py +++ b/sky/core.py @@ -268,7 +268,8 @@ def _start( cluster_status, handle = backend_utils.refresh_cluster_status_handle( cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') if not force and cluster_status == status_lib.ClusterStatus.UP: sky_logging.print(f'Cluster {cluster_name!r} is already up.') return handle @@ -359,12 +360,13 @@ def start( Useful for upgrading SkyPilot runtime. Raises: - ValueError: argument values are invalid: (1) the specified cluster does - not exist; (2) if ``down`` is set to True but - ``idle_minutes_to_autostop`` is None; (3) if the specified cluster is - the managed jobs controller, and either ``idle_minutes_to_autostop`` - is not None or ``down`` is True (omit them to use the default - autostop settings). + ValueError: argument values are invalid: (1) if ``down`` is set to True + but ``idle_minutes_to_autostop`` is None; (2) if the specified + cluster is the managed jobs controller, and either + ``idle_minutes_to_autostop`` is not None or ``down`` is True (omit + them to use the default autostop settings). + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. sky.exceptions.NotSupportedError: if the cluster to restart was launched using a non-default backend that does not support this operation. @@ -412,7 +414,8 @@ def stop(cluster_name: str, purge: bool = False) -> None: related resources. Raises: - ValueError: the specified cluster does not exist. + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. RuntimeError: failed to stop the cluster. sky.exceptions.NotSupportedError: if the specified cluster is a spot cluster, or a TPU VM Pod cluster, or the managed jobs controller. @@ -423,7 +426,8 @@ def stop(cluster_name: str, purge: bool = False) -> None: f'is not supported.') handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') backend = backend_utils.get_backend_from_handle(handle) @@ -467,14 +471,16 @@ def down(cluster_name: str, purge: bool = False) -> None: resources. Raises: - ValueError: the specified cluster does not exist. + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. RuntimeError: failed to tear down the cluster. sky.exceptions.NotSupportedError: the specified cluster is the managed jobs controller. """ handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') usage_lib.record_cluster_name_for_current_operation(cluster_name) backend = backend_utils.get_backend_from_handle(handle) @@ -521,7 +527,7 @@ def autostop( rather than autostop (restartable). Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend or the cluster is TPU VM Pod. @@ -615,7 +621,7 @@ def queue(cluster_name: str, } ] raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -674,7 +680,8 @@ def cancel( worker node is preempted in the spot cluster. Raises: - ValueError: if arguments are invalid, or the cluster does not exist. + ValueError: if arguments are invalid. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. @@ -750,8 +757,8 @@ def tail_logs(cluster_name: str, Please refer to the sky.cli.tail_logs for the document. Raises: - ValueError: arguments are invalid or the cluster is not supported or - the cluster does not exist. + ValueError: if arguments are invalid or the cluster is not supported. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -793,7 +800,7 @@ def download_logs( Returns: Dict[str, str]: a mapping of job_id to local log path. Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -838,7 +845,7 @@ def job_status(cluster_name: str, If job_ids is None and there is no job on the cluster, it will return {None: None}. Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. diff --git a/sky/exceptions.py b/sky/exceptions.py index c1ade2eb02a..40d2b4d867b 100644 --- a/sky/exceptions.py +++ b/sky/exceptions.py @@ -132,6 +132,13 @@ class ClusterSetUpError(Exception): pass +class ClusterDoesNotExist(ValueError): + """Raise when trying to operate on a cluster that does not exist.""" + # This extends ValueError for compatibility reasons - we used to throw + # ValueError instead of this. + pass + + class NotSupportedError(Exception): """Raised when a feature is not supported.""" pass diff --git a/sky/execution.py b/sky/execution.py index 963e0356753..103dcf5ee83 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -581,8 +581,9 @@ def exec( # pylint: disable=redefined-builtin submitted. Raises: - ValueError: if the specified cluster does not exist or is not in UP - status. + ValueError: if the specified cluster is not in UP status. + sky.exceptions.ClusterDoesNotExist: if the specified cluster does not + exist. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. diff --git a/sky/jobs/recovery_strategy.py b/sky/jobs/recovery_strategy.py index 09e4bd8ed6e..4fda1a07e08 100644 --- a/sky/jobs/recovery_strategy.py +++ b/sky/jobs/recovery_strategy.py @@ -50,8 +50,9 @@ def terminate_cluster(cluster_name: str, max_retry: int = 3) -> None: usage_lib.messages.usage.set_internal() sky.down(cluster_name) return - except ValueError: + except exceptions.ClusterDoesNotExist: # The cluster is already down. + logger.debug(f'The cluster {cluster_name} is already down.') return except Exception as e: # pylint: disable=broad-except retry_cnt += 1 diff --git a/tests/unit_tests/test_recovery_strategy.py b/tests/unit_tests/test_recovery_strategy.py new file mode 100644 index 00000000000..da8e8142da0 --- /dev/null +++ b/tests/unit_tests/test_recovery_strategy.py @@ -0,0 +1,48 @@ +from unittest import mock + +from sky.exceptions import ClusterDoesNotExist +from sky.jobs import recovery_strategy + + +@mock.patch('sky.down') +@mock.patch('sky.usage.usage_lib.messages.usage.set_internal') +def test_terminate_cluster_retry_on_value_error(mock_set_internal, + mock_sky_down) -> None: + # Set up mock to fail twice with ValueError, then succeed + mock_sky_down.side_effect = [ + ValueError('Mock error 1'), + ValueError('Mock error 2'), + None, + ] + + # Call should succeed after retries + recovery_strategy.terminate_cluster('test-cluster') + + # Verify sky.down was called 3 times + assert mock_sky_down.call_count == 3 + mock_sky_down.assert_has_calls([ + mock.call('test-cluster'), + mock.call('test-cluster'), + mock.call('test-cluster'), + ]) + + # Verify usage.set_internal was called before each sky.down + assert mock_set_internal.call_count == 3 + + +@mock.patch('sky.down') +@mock.patch('sky.usage.usage_lib.messages.usage.set_internal') +def test_terminate_cluster_handles_nonexistent_cluster(mock_set_internal, + mock_sky_down) -> None: + # Set up mock to raise ClusterDoesNotExist + mock_sky_down.side_effect = ClusterDoesNotExist('test-cluster') + + # Call should succeed silently + recovery_strategy.terminate_cluster('test-cluster') + + # Verify sky.down was called once + assert mock_sky_down.call_count == 1 + mock_sky_down.assert_called_once_with('test-cluster') + + # Verify usage.set_internal was called once + assert mock_set_internal.call_count == 1