Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve/Spot] Allow spot queue/cancel/logs during controller INIT state #3288

Merged
merged 23 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- tests/test_optimizer_random_dag.py
- tests/test_storage.py
- tests/test_wheels.py
- tests/test_spot.py
- tests/test_spot_serve.py
- tests/test_yaml_parser.py
runs-on: ubuntu-latest
steps:
Expand Down
124 changes: 87 additions & 37 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
import shlex
import subprocess
import sys
import tempfile
import textwrap
import time
Expand Down Expand Up @@ -1832,7 +1833,6 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return record

# All cases below are transitioning the cluster to non-UP states.

if len(node_statuses) > handle.launched_nodes:
# Unexpected: in the queried region more than 1 cluster with the same
# constructed name tag returned. This will typically not happen unless
Expand Down Expand Up @@ -1953,8 +1953,10 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:


def _update_cluster_status(
cluster_name: str,
acquire_per_cluster_status_lock: bool) -> Optional[Dict[str, Any]]:
cluster_name: str,
acquire_per_cluster_status_lock: bool,
acquire_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
) -> Optional[Dict[str, Any]]:
"""Update the cluster status.

The cluster status is updated by checking ray cluster and real status from
Expand Down Expand Up @@ -1988,23 +1990,23 @@ def _update_cluster_status(
return _update_cluster_status_no_lock(cluster_name)

try:
# TODO(mraheja): remove pylint disabling when filelock
# version updated
# pylint: disable=abstract-class-instantiated
with filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name),
CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS):
timeout=acquire_lock_timeout):
return _update_cluster_status_no_lock(cluster_name)
except filelock.Timeout:
logger.debug('Refreshing status: Failed get the lock for cluster '
f'{cluster_name!r}. Using the cached status.')
return global_user_state.get_cluster_from_name(cluster_name)
record = global_user_state.get_cluster_from_name(cluster_name)
return record


def refresh_cluster_record(
cluster_name: str,
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True
cluster_name: str,
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
acquire_per_cluster_status_lock_timeout:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Refresh the cluster, and return the possibly updated record.

Expand All @@ -2022,6 +2024,9 @@ def refresh_cluster_record(
2. is a non-spot cluster, is not STOPPED, and autostop is set.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
acquire_per_cluster_status_lock_timeout: The timeout to acquire the
per-cluster lock. If timeout, the function will use the cached
status.

Returns:
If the cluster is terminated or does not exist, return None.
Expand Down Expand Up @@ -2052,7 +2057,8 @@ def refresh_cluster_record(
if force_refresh_for_cluster or has_autostop or use_spot:
record = _update_cluster_status(
cluster_name,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock)
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
acquire_lock_timeout=acquire_per_cluster_status_lock_timeout)
return record


Expand All @@ -2062,6 +2068,8 @@ def refresh_cluster_status_handle(
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
acquire_per_cluster_status_lock_timeout:
int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Tuple[Optional[status_lib.ClusterStatus],
Optional[backends.ResourceHandle]]:
"""Refresh the cluster, and return the possibly updated status and handle.
Expand All @@ -2073,7 +2081,9 @@ def refresh_cluster_status_handle(
record = refresh_cluster_record(
cluster_name,
force_refresh_statuses=force_refresh_statuses,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock)
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
acquire_per_cluster_status_lock_timeout=
acquire_per_cluster_status_lock_timeout)
if record is None:
return None, None
return record['status'], record['handle']
Expand Down Expand Up @@ -2219,50 +2229,64 @@ def check_cluster_available(


# TODO(tian): Refactor to controller_utils. Current blocker: circular import.
def is_controller_up(
def is_controller_accessible(
controller_type: controller_utils.Controllers,
stopped_message: str,
non_existent_message: Optional[str] = None,
) -> Tuple[Optional[status_lib.ClusterStatus],
Optional['backends.CloudVmRayResourceHandle']]:
exit_on_error: bool = False,
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
) -> 'backends.CloudVmRayResourceHandle':
"""Check if the spot/serve controller is up.

It can be used to check the actual controller status (since the autostop is
set for the controller) before the spot/serve commands interact with the
It can be used to check if the controller is accessible (since the autostop
is set for the controller) before the spot/serve commands interact with the
controller.

The controller can be accessed when it is in UP or INIT state, and the ssh
connection is successful.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

ClusterNotUpError will be raised whenever the controller cannot be accessed.

Args:
type: Type of the controller.
stopped_message: Message to print if the controller is STOPPED.
non_existent_message: Message to show if the controller does not exist.
exit_on_error: Whether to exit directly if the controller is not UP. If
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
False, the function will raise ClusterNotUpError.

Returns:
controller_status: The status of the controller. If it fails during
refreshing the status, it will be the cached status. None if the
controller does not exist.
handle: The ResourceHandle of the controller. None if the
controller is not UP or does not exist.
handle: The ResourceHandle of the controller.

Raises:
exceptions.ClusterOwnerIdentityMismatchError: if the current user is not
the same as the user who created the cluster.
exceptions.CloudUserIdentityError: if we fail to get the current user
identity.
exceptions.ClusterNotUpError: if the controller is not UP, or failed to
be connected.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
"""
if non_existent_message is None:
non_existent_message = (
controller_type.value.default_hint_if_non_existent)
cluster_name = controller_type.value.cluster_name
controller_name = controller_type.value.name.replace(' controller', '')
need_connection_check = False
try:
# Set force_refresh_statuses=None to make sure the refresh only happens
# when the controller is INIT/UP (triggered in these statuses as the
# autostop is always set for the controller). This optimization avoids
# unnecessary costly refresh when the controller is already stopped.
# This optimization is based on the assumption that the user will not
# start the controller manually from the cloud console.
#
# The acquire_lock_timeout is set to 0 to avoid hanging the command when
# multiple spot_launch commands are running at the same time. It should
# be safe to set it to 0 (try once to get the lock), as in all the all
# callers will get the requested information from the controller by ssh
# with best effort.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
controller_status, handle = refresh_cluster_status_handle(
cluster_name, force_refresh_statuses=None)
cluster_name,
force_refresh_statuses=None,
acquire_per_cluster_status_lock_timeout=0)
except exceptions.ClusterStatusFetchingError as e:
# We do not catch the exceptions related to the cluster owner identity
# mismatch, please refer to the comment in
Expand All @@ -2276,19 +2300,45 @@ def is_controller_up(
controller_status, handle = None, None
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
if record is not None:
controller_status, handle = record['status'], record['handle']
# We check the connection even if the cluster has a cached status UP
# to make sure the controller is actually accessible, as the cached
# status might be stale.
need_connection_check = True

error_msg = None
if controller_status == status_lib.ClusterStatus.STOPPED:
error_msg = stopped_message
elif controller_status is None or handle.head_ip is None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
error_msg = non_existent_message
elif (controller_status == status_lib.ClusterStatus.INIT or
need_connection_check):
# We do not directly check for UP because the controller may be in INIT
# state during another `sky spot launch` or `sky serve up`, but still
# have head_ip available. In this case, we can still try to ssh into the
# controller and fetch the status.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml,
handle.docker_user,
handle.ssh_user)

runner = command_runner.SSHCommandRunner(handle.head_ip,
**ssh_credentials,
port=handle.head_ssh_port)
if not runner.check_connection():
error_msg = controller_type.value.hint_for_connection_error
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
else:
assert (controller_status == status_lib.ClusterStatus.UP and
handle.head_ip is not None), (controller_status, handle)

if controller_status is None:
sky_logging.print(non_existent_message)
elif controller_status != status_lib.ClusterStatus.UP:
msg = (f'{controller_name.capitalize()} controller {cluster_name} '
f'is {controller_status.value}.')
if controller_status == status_lib.ClusterStatus.STOPPED:
msg += f'\n{stopped_message}'
if controller_status == status_lib.ClusterStatus.INIT:
msg += '\nPlease wait for the controller to be ready.'
sky_logging.print(msg)
handle = None
return controller_status, handle
if error_msg is not None:
if exit_on_error:
sky_logging.print(error_msg)
sys.exit(1)
with ux_utils.print_exception_no_traceback():
raise exceptions.ClusterNotUpError(error_msg,
cluster_status=controller_status,
handle=handle)

return handle


class CloudFilter(enum.Enum):
Expand Down
Loading
Loading