Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve/Spot] Allow spot queue/cancel/logs during controller INIT state #3288

Merged
merged 23 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
def _update_cluster_status(
cluster_name: str,
acquire_per_cluster_status_lock: bool,
acquire_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Update the cluster status.

Expand All @@ -1969,9 +1969,9 @@ def _update_cluster_status(
Args:
cluster_name: The name of the cluster.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
need_owner_identity_check: Whether to check the owner identity before
updating
before updating the status.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock.

Returns:
If the cluster is terminated or does not exist, return None. Otherwise
Expand All @@ -1991,7 +1991,7 @@ def _update_cluster_status(

try:
with filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name),
timeout=acquire_lock_timeout):
timeout=cluster_status_lock_timeout):
return _update_cluster_status_no_lock(cluster_name)
except filelock.Timeout:
logger.debug('Refreshing status: Failed get the lock for cluster '
Expand All @@ -2005,8 +2005,7 @@ def refresh_cluster_record(
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
acquire_per_cluster_status_lock_timeout:
int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Refresh the cluster, and return the possibly updated record.

Expand All @@ -2017,16 +2016,15 @@ def refresh_cluster_record(
Args:
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
cluster_name: The name of the cluster.
force_refresh_statuses: if specified, refresh the cluster if it has one of
the specified statuses. Additionally, clusters satisfying the
following conditions will always be refreshed no matter the
argument is specified or not:
1. is a spot cluster, or
2. is a non-spot cluster, is not STOPPED, and autostop is set.
the specified statuses. Additionally, clusters satisfying the
following conditions will always be refreshed no matter the
argument is specified or not:
1. is a spot cluster, or
2. is a non-spot cluster, is not STOPPED, and autostop is set.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
acquire_per_cluster_status_lock_timeout: The timeout to acquire the
per-cluster lock. If timeout, the function will use the cached
status.
before updating the status.
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock. If timeout, the function will use the cached status.

Returns:
If the cluster is terminated or does not exist, return None.
Expand Down Expand Up @@ -2058,7 +2056,7 @@ def refresh_cluster_record(
record = _update_cluster_status(
cluster_name,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
acquire_lock_timeout=acquire_per_cluster_status_lock_timeout)
cluster_status_lock_timeout=cluster_status_lock_timeout)
return record


Expand All @@ -2068,8 +2066,7 @@ def refresh_cluster_status_handle(
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
acquire_per_cluster_status_lock_timeout:
int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Tuple[Optional[status_lib.ClusterStatus],
Optional[backends.ResourceHandle]]:
"""Refresh the cluster, and return the possibly updated status and handle.
Expand All @@ -2082,8 +2079,7 @@ def refresh_cluster_status_handle(
cluster_name,
force_refresh_statuses=force_refresh_statuses,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
acquire_per_cluster_status_lock_timeout=
acquire_per_cluster_status_lock_timeout)
cluster_status_lock_timeout=cluster_status_lock_timeout)
if record is None:
return None, None
return record['status'], record['handle']
Expand Down Expand Up @@ -2233,25 +2229,25 @@ def is_controller_accessible(
controller_type: controller_utils.Controllers,
stopped_message: str,
non_existent_message: Optional[str] = None,
exit_on_error: bool = False,
exit_if_not_accessible: bool = False,
) -> 'backends.CloudVmRayResourceHandle':
"""Check if the spot/serve controller is up.

The controller is accessible when it is in UP or INIT state, and the ssh
connection is successful.

It can be used to check if the controller is accessible (since the autostop
is set for the controller) before the spot/serve commands interact with the
controller.

The controller can be accessed when it is in UP or INIT state, and the ssh
connection is successful.

ClusterNotUpError will be raised whenever the controller cannot be accessed.

Args:
type: Type of the controller.
stopped_message: Message to print if the controller is STOPPED.
non_existent_message: Message to show if the controller does not exist.
exit_on_error: Whether to exit directly if the controller is not UP. If
False, the function will raise ClusterNotUpError.
exit_if_not_accessible: Whether to exit directly if the controller is not
accessible. If False, the function will raise ClusterNotUpError.

Returns:
handle: The ResourceHandle of the controller.
Expand All @@ -2261,15 +2257,16 @@ def is_controller_accessible(
the same as the user who created the cluster.
exceptions.CloudUserIdentityError: if we fail to get the current user
identity.
exceptions.ClusterNotUpError: if the controller is not UP, or failed to
be connected.
exceptions.ClusterNotUpError: if the controller is not accessible, or
failed to be connected.
"""
if non_existent_message is None:
non_existent_message = (
controller_type.value.default_hint_if_non_existent)
cluster_name = controller_type.value.cluster_name
controller_name = controller_type.value.name.replace(' controller', '')
need_connection_check = False
controller_status, handle = None, None
try:
# Set force_refresh_statuses=None to make sure the refresh only happens
# when the controller is INIT/UP (triggered in these statuses as the
Expand All @@ -2279,14 +2276,14 @@ def is_controller_accessible(
# start the controller manually from the cloud console.
#
# The acquire_lock_timeout is set to 0 to avoid hanging the command when
# multiple spot_launch commands are running at the same time. It should
# be safe to set it to 0 (try once to get the lock), as in all the all
# callers will get the requested information from the controller by ssh
# with best effort.
# multiple spot_launch commands are running at the same time. Our later
# code will check if the controller is accessible by directly checking
# the ssh connection to the controller, if it fails to get accurate
# status of the controller.
controller_status, handle = refresh_cluster_status_handle(
cluster_name,
force_refresh_statuses=None,
acquire_per_cluster_status_lock_timeout=0)
cluster_status_lock_timeout=0)
except exceptions.ClusterStatusFetchingError as e:
# We do not catch the exceptions related to the cluster owner identity
# mismatch, please refer to the comment in
Expand All @@ -2297,7 +2294,6 @@ def is_controller_accessible(
'stale information, when the controller is not up.\n'
f' Details: {common_utils.format_exception(e, use_bracket=True)}')
record = global_user_state.get_cluster_from_name(cluster_name)
controller_status, handle = None, None
if record is not None:
controller_status, handle = record['status'], record['handle']
# We check the connection even if the cluster has a cached status UP
Expand All @@ -2308,14 +2304,18 @@ def is_controller_accessible(
error_msg = None
if controller_status == status_lib.ClusterStatus.STOPPED:
error_msg = stopped_message
elif controller_status is None or handle.head_ip is None:
elif controller_status is None or handle is None or handle.head_ip is None:
# We check the controller is STOPPED before the check for handle.head_ip
# None because when the controller is STOPPED, handle.head_ip can also
# be None, but we only want to catch the case when the controller is
# being provisioned at the first time and have no head_ip.
error_msg = non_existent_message
elif (controller_status == status_lib.ClusterStatus.INIT or
need_connection_check):
# We do not directly check for UP because the controller may be in INIT
# state during another `sky spot launch` or `sky serve up`, but still
# have head_ip available. In this case, we can still try to ssh into the
# controller and fetch the status.
# We check the access to controller in INIT state or failed to fetch the
# status, as its lock can be hold by another `sky spot launch` or
# `sky serve up`, but still have head_ip available. In those cases,
# we can allow the access to the controller.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml,
handle.docker_user,
handle.ssh_user)
Expand All @@ -2324,20 +2324,20 @@ def is_controller_accessible(
**ssh_credentials,
port=handle.head_ssh_port)
if not runner.check_connection():
error_msg = controller_type.value.hint_for_connection_error
error_msg = controller_type.value.connection_error_hint
else:
assert (controller_status == status_lib.ClusterStatus.UP and
handle.head_ip is not None), (controller_status, handle)
assert controller_status == status_lib.ClusterStatus.UP, handle

if error_msg is not None:
if exit_on_error:
if exit_if_not_accessible:
sky_logging.print(error_msg)
sys.exit(1)
with ux_utils.print_exception_no_traceback():
raise exceptions.ClusterNotUpError(error_msg,
cluster_status=controller_status,
handle=handle)

assert handle is not None and handle.head_ip is not None, (
handle, controller_status)
return handle


Expand Down
26 changes: 12 additions & 14 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2889,16 +2889,16 @@ def _hint_or_raise_for_down_spot_controller(controller_name: str):
controller = controller_utils.Controllers.from_name(controller_name)
assert controller is not None, controller_name

with rich_utils.safe_status('[bold cyan]Checking for in-progress '
f'{controller.value.managing_name}[/]'):
with rich_utils.safe_status(
'[bold cyan]Checking for in-progress spot jobs[/]'):
try:
spot_jobs = core.spot_queue(refresh=False, skip_finished=True)
except exceptions.ClusterNotUpError as e:
if controller.value.hint_for_connection_error in str(e):
if controller.value.connection_error_hint in str(e):
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(
controller.value.
decline_down_in_when_failed_to_fetch_hint)
decline_down_when_failed_to_fetch_status_hint)
if e.cluster_status is None:
click.echo(
'Managed spot controller has already been torn down.')
Expand All @@ -2923,24 +2923,22 @@ def _hint_or_raise_for_down_spot_controller(controller_name: str):
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(msg)
else:
click.echo(
f' * No in-progress {controller.value.managing_name} found. It '
'should be safe to terminate (see caveats above).')
click.echo(' * No in-progress spot jobs found. It should be safe to '
'terminate (see caveats above).')


def _hint_or_raise_for_down_sky_serve_controller(controller_name: str):
controller = controller_utils.Controllers.from_name(controller_name)
assert controller is not None, controller_name
with rich_utils.safe_status('[bold cyan]Checking for in-progress '
f'{controller.value.managing_name}[/]'):
with rich_utils.safe_status('[bold cyan]Checking for live services[/]'):
try:
services = serve_lib.status()
except exceptions.ClusterNotUpError as e:
if controller.value.hint_for_connection_error in str(e):
if controller.value.connection_error_hint in str(e):
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(
controller.value.
decline_down_in_when_failed_to_fetch_hint)
decline_down_when_failed_to_fetch_status_hint)
if e.cluster_status is None:
click.echo('Serve controller has already been torn down.')
sys.exit(0)
Expand Down Expand Up @@ -4072,7 +4070,7 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool):
backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SPOT_CONTROLLER,
stopped_message='All managed spot jobs should have finished.',
exit_on_error=True)
exit_if_not_accessible=True)

job_id_str = ','.join(map(str, job_ids))
if sum([len(job_ids) > 0, name is not None, all]) != 1:
Expand Down Expand Up @@ -4156,7 +4154,7 @@ def spot_dashboard(port: Optional[int]):
controller_type=controller_utils.Controllers.SPOT_CONTROLLER,
stopped_message=hint,
non_existent_message=hint,
exit_on_error=True)
exit_if_not_accessible=True)

# SSH forward a free local port to remote's dashboard port.
remote_port = constants.SPOT_DASHBOARD_REMOTE_PORT
Expand Down Expand Up @@ -4663,7 +4661,7 @@ def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool):
backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SKY_SERVE_CONTROLLER,
stopped_message='All services should have been terminated.',
exit_on_error=True)
exit_if_not_accessible=True)

if not yes:
quoted_service_names = [f'{name!r}' for name in service_names]
Expand Down
7 changes: 4 additions & 3 deletions sky/serve/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,10 @@ def status(
raise RuntimeError(
'Failed to refresh service status due to network error.') from e

controller_type = controller_utils.Controllers.SKY_SERVE_CONTROLLER
handle = backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SKY_SERVE_CONTROLLER,
stopped_message='No in-progress services.')
controller_type=controller_type,
stopped_message=controller_type.value.default_hint_if_non_existent)

backend = backend_utils.get_backend_from_handle(handle)
assert isinstance(backend, backends.CloudVmRayBackend)
Expand Down Expand Up @@ -639,7 +640,7 @@ def tail_logs(
'target=CONTROLLER/LOAD_BALANCER.')
handle = backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SKY_SERVE_CONTROLLER,
stopped_message='No in-progress services.')
stopped_message='No live services.')
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

backend = backend_utils.get_backend_from_handle(handle)
assert isinstance(backend, backends.CloudVmRayBackend), backend
Expand Down
2 changes: 1 addition & 1 deletion sky/spot/spot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ def format_job_table(
if status_str:
status_str = f'In progress tasks: {status_str}'
else:
status_str = 'No in progress tasks.'
status_str = 'No in-progress spot jobs.'
output = status_str
if str(job_table):
output += f'\n{job_table}'
Expand Down
2 changes: 1 addition & 1 deletion sky/utils/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ssh_options_list(
*,
ssh_proxy_command: Optional[str] = None,
docker_ssh_proxy_command: Optional[str] = None,
connect_timeout: Optional[int] = 30,
connect_timeout: Optional[int] = None,
port: int = 22,
disable_control_master: Optional[bool] = False,
) -> List[str]:
Expand Down
Loading
Loading