Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve/Spot] Allow spot queue/cancel/logs during controller INIT state #3288

Merged
merged 23 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- tests/test_optimizer_random_dag.py
- tests/test_storage.py
- tests/test_wheels.py
- tests/test_spot.py
- tests/test_spot_serve.py
- tests/test_yaml_parser.py
runs-on: ubuntu-latest
steps:
Expand Down
144 changes: 97 additions & 47 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
import shlex
import subprocess
import sys
import tempfile
import textwrap
import time
Expand Down Expand Up @@ -1832,7 +1833,6 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return record

# All cases below are transitioning the cluster to non-UP states.

if len(node_statuses) > handle.launched_nodes:
# Unexpected: in the queried region more than 1 cluster with the same
# constructed name tag returned. This will typically not happen unless
Expand Down Expand Up @@ -1953,8 +1953,10 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:


def _update_cluster_status(
cluster_name: str,
acquire_per_cluster_status_lock: bool) -> Optional[Dict[str, Any]]:
cluster_name: str,
acquire_per_cluster_status_lock: bool,
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Update the cluster status.

The cluster status is updated by checking ray cluster and real status from
Expand All @@ -1967,9 +1969,9 @@ def _update_cluster_status(
Args:
cluster_name: The name of the cluster.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
need_owner_identity_check: Whether to check the owner identity before
updating
before updating the status.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock.

Returns:
If the cluster is terminated or does not exist, return None. Otherwise
Expand All @@ -1988,23 +1990,22 @@ def _update_cluster_status(
return _update_cluster_status_no_lock(cluster_name)

try:
# TODO(mraheja): remove pylint disabling when filelock
# version updated
# pylint: disable=abstract-class-instantiated
with filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name),
CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS):
timeout=cluster_status_lock_timeout):
return _update_cluster_status_no_lock(cluster_name)
except filelock.Timeout:
logger.debug('Refreshing status: Failed get the lock for cluster '
f'{cluster_name!r}. Using the cached status.')
return global_user_state.get_cluster_from_name(cluster_name)
record = global_user_state.get_cluster_from_name(cluster_name)
return record


def refresh_cluster_record(
cluster_name: str,
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True
cluster_name: str,
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Refresh the cluster, and return the possibly updated record.

Expand All @@ -2015,13 +2016,15 @@ def refresh_cluster_record(
Args:
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
cluster_name: The name of the cluster.
force_refresh_statuses: if specified, refresh the cluster if it has one of
the specified statuses. Additionally, clusters satisfying the
following conditions will always be refreshed no matter the
argument is specified or not:
1. is a spot cluster, or
2. is a non-spot cluster, is not STOPPED, and autostop is set.
the specified statuses. Additionally, clusters satisfying the
following conditions will always be refreshed no matter the
argument is specified or not:
1. is a spot cluster, or
2. is a non-spot cluster, is not STOPPED, and autostop is set.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
before updating the status.
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock. If timeout, the function will use the cached status.

Returns:
If the cluster is terminated or does not exist, return None.
Expand Down Expand Up @@ -2052,7 +2055,8 @@ def refresh_cluster_record(
if force_refresh_for_cluster or has_autostop or use_spot:
record = _update_cluster_status(
cluster_name,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock)
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
cluster_status_lock_timeout=cluster_status_lock_timeout)
return record


Expand All @@ -2062,6 +2066,7 @@ def refresh_cluster_status_handle(
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Tuple[Optional[status_lib.ClusterStatus],
Optional[backends.ResourceHandle]]:
"""Refresh the cluster, and return the possibly updated status and handle.
Expand All @@ -2073,7 +2078,8 @@ def refresh_cluster_status_handle(
record = refresh_cluster_record(
cluster_name,
force_refresh_statuses=force_refresh_statuses,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock)
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
cluster_status_lock_timeout=cluster_status_lock_timeout)
if record is None:
return None, None
return record['status'], record['handle']
Expand Down Expand Up @@ -2219,50 +2225,65 @@ def check_cluster_available(


# TODO(tian): Refactor to controller_utils. Current blocker: circular import.
def is_controller_up(
def is_controller_accessible(
controller_type: controller_utils.Controllers,
stopped_message: str,
non_existent_message: Optional[str] = None,
) -> Tuple[Optional[status_lib.ClusterStatus],
Optional['backends.CloudVmRayResourceHandle']]:
exit_if_not_accessible: bool = False,
) -> 'backends.CloudVmRayResourceHandle':
"""Check if the spot/serve controller is up.

It can be used to check the actual controller status (since the autostop is
set for the controller) before the spot/serve commands interact with the
The controller is accessible when it is in UP or INIT state, and the ssh
connection is successful.

It can be used to check if the controller is accessible (since the autostop
is set for the controller) before the spot/serve commands interact with the
controller.

ClusterNotUpError will be raised whenever the controller cannot be accessed.

Args:
type: Type of the controller.
stopped_message: Message to print if the controller is STOPPED.
non_existent_message: Message to show if the controller does not exist.
exit_if_not_accessible: Whether to exit directly if the controller is not
accessible. If False, the function will raise ClusterNotUpError.

Returns:
controller_status: The status of the controller. If it fails during
refreshing the status, it will be the cached status. None if the
controller does not exist.
handle: The ResourceHandle of the controller. None if the
controller is not UP or does not exist.
handle: The ResourceHandle of the controller.

Raises:
exceptions.ClusterOwnerIdentityMismatchError: if the current user is not
the same as the user who created the cluster.
exceptions.CloudUserIdentityError: if we fail to get the current user
identity.
exceptions.ClusterNotUpError: if the controller is not accessible, or
failed to be connected.
"""
if non_existent_message is None:
non_existent_message = (
controller_type.value.default_hint_if_non_existent)
cluster_name = controller_type.value.cluster_name
controller_name = controller_type.value.name.replace(' controller', '')
need_connection_check = False
controller_status, handle = None, None
try:
# Set force_refresh_statuses=None to make sure the refresh only happens
# when the controller is INIT/UP (triggered in these statuses as the
# autostop is always set for the controller). This optimization avoids
# unnecessary costly refresh when the controller is already stopped.
# This optimization is based on the assumption that the user will not
# start the controller manually from the cloud console.
#
# The acquire_lock_timeout is set to 0 to avoid hanging the command when
# multiple spot_launch commands are running at the same time. Our later
# code will check if the controller is accessible by directly checking
# the ssh connection to the controller, if it fails to get accurate
# status of the controller.
controller_status, handle = refresh_cluster_status_handle(
cluster_name, force_refresh_statuses=None)
cluster_name,
force_refresh_statuses=None,
cluster_status_lock_timeout=0)
except exceptions.ClusterStatusFetchingError as e:
# We do not catch the exceptions related to the cluster owner identity
# mismatch, please refer to the comment in
Expand All @@ -2273,22 +2294,51 @@ def is_controller_up(
'stale information, when the controller is not up.\n'
f' Details: {common_utils.format_exception(e, use_bracket=True)}')
record = global_user_state.get_cluster_from_name(cluster_name)
controller_status, handle = None, None
if record is not None:
controller_status, handle = record['status'], record['handle']
# We check the connection even if the cluster has a cached status UP
# to make sure the controller is actually accessible, as the cached
# status might be stale.
need_connection_check = True

error_msg = None
if controller_status == status_lib.ClusterStatus.STOPPED:
error_msg = stopped_message
elif controller_status is None or handle is None or handle.head_ip is None:
# We check the controller is STOPPED before the check for handle.head_ip
# None because when the controller is STOPPED, handle.head_ip can also
# be None, but we only want to catch the case when the controller is
# being provisioned at the first time and have no head_ip.
error_msg = non_existent_message
elif (controller_status == status_lib.ClusterStatus.INIT or
need_connection_check):
# We check the access to controller in INIT state or failed to fetch the
# status, as its lock can be hold by another `sky spot launch` or
# `sky serve up`, but still have head_ip available. In those cases,
# we can allow the access to the controller.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml,
handle.docker_user,
handle.ssh_user)

runner = command_runner.SSHCommandRunner(handle.head_ip,
**ssh_credentials,
port=handle.head_ssh_port)
if not runner.check_connection():
error_msg = controller_type.value.connection_error_hint
else:
assert controller_status == status_lib.ClusterStatus.UP, handle

if controller_status is None:
sky_logging.print(non_existent_message)
elif controller_status != status_lib.ClusterStatus.UP:
msg = (f'{controller_name.capitalize()} controller {cluster_name} '
f'is {controller_status.value}.')
if controller_status == status_lib.ClusterStatus.STOPPED:
msg += f'\n{stopped_message}'
if controller_status == status_lib.ClusterStatus.INIT:
msg += '\nPlease wait for the controller to be ready.'
sky_logging.print(msg)
handle = None
return controller_status, handle
if error_msg is not None:
if exit_if_not_accessible:
sky_logging.print(error_msg)
sys.exit(1)
with ux_utils.print_exception_no_traceback():
raise exceptions.ClusterNotUpError(error_msg,
cluster_status=controller_status,
handle=handle)
assert handle is not None and handle.head_ip is not None, (
handle, controller_status)
return handle


class CloudFilter(enum.Enum):
Expand Down
Loading
Loading