Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve/Spot] Allow spot queue/cancel/logs during controller INIT state #3288

Merged
merged 23 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- tests/test_optimizer_random_dag.py
- tests/test_storage.py
- tests/test_wheels.py
- tests/test_spot.py
- tests/test_spot_serve.py
- tests/test_yaml_parser.py
runs-on: ubuntu-latest
steps:
Expand Down
54 changes: 41 additions & 13 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,8 @@ def refresh_cluster_record(
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
acquire_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
acquire_per_cluster_status_lock_timeout:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Refresh the cluster, and return the possibly updated record.

Expand All @@ -2023,6 +2024,9 @@ def refresh_cluster_record(
2. is a non-spot cluster, is not STOPPED, and autostop is set.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
acquire_per_cluster_status_lock_timeout: The timeout to acquire the
per-cluster lock. If timeout, the function will use the cached
status.

Returns:
If the cluster is terminated or does not exist, return None.
Expand Down Expand Up @@ -2054,7 +2058,7 @@ def refresh_cluster_record(
record = _update_cluster_status(
cluster_name,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
acquire_lock_timeout=acquire_lock_timeout)
acquire_lock_timeout=acquire_per_cluster_status_lock_timeout)
return record


Expand All @@ -2064,7 +2068,8 @@ def refresh_cluster_status_handle(
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
acquire_per_cluster_status_lock: bool = True,
acquire_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
acquire_per_cluster_status_lock_timeout:
int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Tuple[Optional[status_lib.ClusterStatus],
Optional[backends.ResourceHandle]]:
"""Refresh the cluster, and return the possibly updated status and handle.
Expand All @@ -2077,7 +2082,8 @@ def refresh_cluster_status_handle(
cluster_name,
force_refresh_statuses=force_refresh_statuses,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
acquire_lock_timeout=acquire_lock_timeout)
acquire_per_cluster_status_lock_timeout=
acquire_per_cluster_status_lock_timeout)
if record is None:
return None, None
return record['status'], record['handle']
Expand Down Expand Up @@ -2223,24 +2229,29 @@ def check_cluster_available(


# TODO(tian): Refactor to controller_utils. Current blocker: circular import.
def is_controller_up(
def is_controller_accessible(
controller_type: controller_utils.Controllers,
stopped_message: str,
non_existent_message: Optional[str] = None,
exit_on_error: bool = False,
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
) -> 'backends.CloudVmRayResourceHandle':
"""Check if the spot/serve controller is up.

It can be used to check the actual controller status (since the autostop is
set for the controller) before the spot/serve commands interact with the
It can be used to check if the controller is accessible (since the autostop
is set for the controller) before the spot/serve commands interact with the
controller.

The controller can be accessed when it is in UP or INIT state, and the ssh
connection is successful.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

ClusterNotUpError will be raised whenever the controller cannot be accessed.

Args:
type: Type of the controller.
stopped_message: Message to print if the controller is STOPPED.
non_existent_message: Message to show if the controller does not exist.
exit_on_error: Whether to exit directly if the controller is not UP. If
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
False, the function will raise ClusterNotUpError.

Returns:
handle: The ResourceHandle of the controller.
Expand All @@ -2258,6 +2269,7 @@ def is_controller_up(
controller_type.value.default_hint_if_non_existent)
cluster_name = controller_type.value.cluster_name
controller_name = controller_type.value.name.replace(' controller', '')
need_connection_check = False
try:
# Set force_refresh_statuses=None to make sure the refresh only happens
# when the controller is INIT/UP (triggered in these statuses as the
Expand All @@ -2268,9 +2280,13 @@ def is_controller_up(
#
# The acquire_lock_timeout is set to 0 to avoid hanging the command when
# multiple spot_launch commands are running at the same time. It should
# be safe to set it to 0 (try once to get the lock).
# be safe to set it to 0 (try once to get the lock), as in all the all
# callers will get the requested information from the controller by ssh
# with best effort.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
controller_status, handle = refresh_cluster_status_handle(
cluster_name, force_refresh_statuses=None, acquire_lock_timeout=0)
cluster_name,
force_refresh_statuses=None,
acquire_per_cluster_status_lock_timeout=0)
except exceptions.ClusterStatusFetchingError as e:
# We do not catch the exceptions related to the cluster owner identity
# mismatch, please refer to the comment in
Expand All @@ -2284,13 +2300,22 @@ def is_controller_up(
controller_status, handle = None, None
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
if record is not None:
controller_status, handle = record['status'], record['handle']
# We check the connection even if the cluster has a cached status UP
# to make sure the controller is actually accessible, as the cached
# status might be stale.
need_connection_check = True

error_msg = None
if controller_status is None or handle.head_ip is None:
error_msg = non_existent_message
elif controller_status == status_lib.ClusterStatus.STOPPED:
if controller_status == status_lib.ClusterStatus.STOPPED:
error_msg = stopped_message
elif controller_status == status_lib.ClusterStatus.INIT:
elif controller_status is None or handle.head_ip is None:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
error_msg = non_existent_message
elif (controller_status == status_lib.ClusterStatus.INIT or
need_connection_check):
# We do not directly check for UP because the controller may be in INIT
# state during another `sky spot launch` or `sky serve up`, but still
# have head_ip available. In this case, we can still try to ssh into the
# controller and fetch the status.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml,
handle.docker_user,
handle.ssh_user)
Expand All @@ -2300,6 +2325,9 @@ def is_controller_up(
port=handle.head_ssh_port)
if not runner.check_connection():
error_msg = controller_type.value.hint_for_connection_error
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
else:
assert (controller_status == status_lib.ClusterStatus.UP and
handle.head_ip is not None), (controller_status, handle)

if error_msg is not None:
if exit_on_error:
Expand Down
172 changes: 80 additions & 92 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,15 +1648,10 @@ def _get_spot_jobs(
num_in_progress_jobs = len(spot_jobs)
except exceptions.ClusterNotUpError as e:
controller_status = e.cluster_status
if controller_status == status_lib.ClusterStatus.INIT:
msg = ('Controller\'s latest status is INIT; jobs '
'will not be shown until it becomes UP.')
else:
assert controller_status in [None, status_lib.ClusterStatus.STOPPED]
msg = 'No in progress jobs.'
if controller_status is None:
msg += (f' (See: {colorama.Style.BRIGHT}sky spot -h'
f'{colorama.Style.RESET_ALL})')
msg = str(e)
if controller_status is None:
msg += (f' (See: {colorama.Style.BRIGHT}sky spot -h'
f'{colorama.Style.RESET_ALL})')
except RuntimeError as e:
msg = ('Failed to query spot jobs due to connection '
'issues. Try again later. '
Expand Down Expand Up @@ -1703,14 +1698,10 @@ def _get_services(service_names: Optional[List[str]],
num_services = len(service_records)
except exceptions.ClusterNotUpError as e:
controller_status = e.cluster_status
if controller_status == status_lib.ClusterStatus.INIT:
msg = 'Controller is initializing. Please wait for a while.'
else:
assert controller_status in [None, status_lib.ClusterStatus.STOPPED]
msg = 'No existing services. '
if controller_status is None:
msg += (f'(See: {colorama.Style.BRIGHT}sky serve -h'
f'{colorama.Style.RESET_ALL})')
msg = str(e)
if controller_status is None:
msg += (f'(See: {colorama.Style.BRIGHT}sky serve -h'
f'{colorama.Style.RESET_ALL})')
except RuntimeError as e:
msg = ('Failed to fetch service statuses due to connection issues. '
'Please try again later. Details: '
Expand Down Expand Up @@ -2895,86 +2886,76 @@ def down(


def _hint_or_raise_for_down_spot_controller(controller_name: str):
# spot_jobs will be empty when the spot cluster is not running.
cluster_status, _ = backend_utils.refresh_cluster_status_handle(
controller_name)
if cluster_status is None:
click.echo('Managed spot controller has already been torn down.')
return

controller = controller_utils.Controllers.from_name(controller_name)
assert controller is not None, controller_name
if cluster_status == status_lib.ClusterStatus.INIT:
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(
controller.value.decline_down_in_init_status_hint)

with rich_utils.safe_status('[bold cyan]Checking for in-progress '
f'{controller.value.managing_name}[/]'):
try:
spot_jobs = core.spot_queue(refresh=False, skip_finished=True)
except exceptions.ClusterNotUpError as e:
if controller.value.hint_for_connection_error in str(e):
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(
controller.value.
decline_down_in_when_failed_to_fetch_hint)
if e.cluster_status is None:
click.echo(
'Managed spot controller has already been torn down.')
sys.exit(0)
# At this point, the spot jobs are failed to be fetched due to the
# controller being STOPPED or being firstly launched, i.e., there is
# no in-prgress spot jobs.
spot_jobs = []

msg = (f'{colorama.Fore.YELLOW}WARNING: Tearing down the managed '
f'spot controller ({cluster_status.value}). Please be '
f'aware of the following:{colorama.Style.RESET_ALL}'
'spot controller. Please be aware of the following:'
f'{colorama.Style.RESET_ALL}'
'\n * All logs and status information of the spot '
'jobs (output of `sky spot queue`) will be lost.')
click.echo(msg)
if cluster_status == status_lib.ClusterStatus.UP:
with rich_utils.safe_status(
'[bold cyan]Checking for in-progress spot jobs[/]'):
try:
spot_jobs = core.spot_queue(refresh=False)
except exceptions.ClusterNotUpError:
# The spot controller cluster status changed during querying
# the spot jobs, use the latest cluster status, so that the
# message for INIT and STOPPED states will be correctly
# added to the message.
cluster_status = backend_utils.refresh_cluster_status_handle(
controller_name)
spot_jobs = []

# Find in-progress spot jobs, and hint users to cancel them.
non_terminal_jobs = [
job for job in spot_jobs if not job['status'].is_terminal()
]
if (cluster_status == status_lib.ClusterStatus.UP and
non_terminal_jobs):
job_table = spot_lib.format_job_table(non_terminal_jobs,
show_all=False)
msg = controller.value.decline_down_for_dirty_controller_hint
# Add prefix to each line to align with the bullet point.
msg += '\n'.join(
[' ' + line for line in job_table.split('\n') if line != ''])
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(msg)
else:
click.echo(' * No in-progress spot jobs found. It should be safe '
'to terminate (see caveats above).')
if spot_jobs:
job_table = spot_lib.format_job_table(spot_jobs, show_all=False)
msg = controller.value.decline_down_for_dirty_controller_hint
# Add prefix to each line to align with the bullet point.
msg += '\n'.join(
[' ' + line for line in job_table.split('\n') if line != ''])
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(msg)
else:
click.echo(
f' * No in-progress {controller.value.managing_name} found. It '
'should be safe to terminate (see caveats above).')


def _hint_or_raise_for_down_sky_serve_controller(controller_name: str):
cluster_status, _ = backend_utils.refresh_cluster_status_handle(
controller_name)
if cluster_status is None:
click.echo('Sky serve controller has already been torn down.')
return

controller = controller_utils.Controllers.from_name(controller_name)
assert controller is not None, controller_name
if cluster_status == status_lib.ClusterStatus.INIT:
with rich_utils.safe_status('[bold cyan]Checking for in-progress '
f'{controller.value.managing_name}[/]'):
try:
services = serve_lib.status()
except exceptions.ClusterNotUpError as e:
if controller.value.hint_for_connection_error in str(e):
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(
controller.value.
decline_down_in_when_failed_to_fetch_hint)
if e.cluster_status is None:
click.echo('Serve controller has already been torn down.')
sys.exit(0)
# At this point, the services are failed to be fetched due to the
# controller being STOPPED or being firstly launched, i.e., there is
# no in-prgress services.
services = []

if services:
service_names = [service['name'] for service in services]
with ux_utils.print_exception_no_traceback():
raise exceptions.NotSupportedError(
controller.value.decline_down_in_init_status_hint)
elif cluster_status == status_lib.ClusterStatus.UP:
with rich_utils.safe_status(
'[bold cyan]Checking for running services[/]'):
try:
services = serve_lib.status()
except exceptions.ClusterNotUpError:
cluster_status = backend_utils.refresh_cluster_status_handle(
controller_name)
services = []
if services:
service_names = [service['name'] for service in services]
with ux_utils.print_exception_no_traceback():
msg = (controller.value.decline_down_for_dirty_controller_hint.
format(service_names=', '.join(service_names)))
raise exceptions.NotSupportedError(msg)
msg = (
controller.value.decline_down_for_dirty_controller_hint.format(
service_names=', '.join(service_names)))
raise exceptions.NotSupportedError(msg)
# Do nothing for STOPPED state, as it is safe to terminate the cluster.
click.echo(f'Terminate sky serve controller: {controller_name}.')

Expand Down Expand Up @@ -3064,6 +3045,13 @@ def _down_or_stop_clusters(
assert controller is not None
hint_or_raise = _CONTROLLER_TO_HINT_OR_RAISE[controller]
try:
# TODO(zhwu): This hint or raise is not transactional, which
# means even if it passed the check with no in-progress spot
# or service and prompt the confirmation for termination,
# a user could still do a `sky spot launch` or a
# `sky serve up` before typing the delete, causing a leaked
# spot job or service. We should make this check atomic with
# the termination.
hint_or_raise(controller_name)
except exceptions.ClusterOwnerIdentityMismatchError as e:
if purge:
Expand Down Expand Up @@ -4081,7 +4069,7 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool):
# Cancel managed spot jobs with IDs 1, 2, 3
$ sky spot cancel 1 2 3
"""
backend_utils.is_controller_up(
backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SPOT_CONTROLLER,
stopped_message='All managed spot jobs should have finished.',
exit_on_error=True)
Expand Down Expand Up @@ -4138,8 +4126,8 @@ def spot_logs(name: Optional[str], job_id: Optional[int], follow: bool,
follow=follow)
else:
core.spot_tail_logs(name=name, job_id=job_id, follow=follow)
except exceptions.ClusterNotUpError:
# Hint messages already printed by the call above.
except exceptions.ClusterNotUpError as e:
click.echo(e)
sys.exit(1)


Expand All @@ -4164,7 +4152,7 @@ def spot_dashboard(port: Optional[int]):
hint = (
'Dashboard is not available if spot controller is not up. Run a spot '
'job first.')
backend_utils.is_controller_up(
backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SPOT_CONTROLLER,
stopped_message=hint,
non_existent_message=hint,
Expand Down Expand Up @@ -4672,7 +4660,7 @@ def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool):
'Can only specify one of SERVICE_NAMES or --all. '
f'Provided {argument_str!r}.')

backend_utils.is_controller_up(
backend_utils.is_controller_accessible(
controller_type=controller_utils.Controllers.SKY_SERVE_CONTROLLER,
stopped_message='All services should have been terminated.',
exit_on_error=True)
Expand Down Expand Up @@ -4755,8 +4743,8 @@ def serve_logs(
target=target_component,
replica_id=replica_id,
follow=follow)
except exceptions.ClusterNotUpError:
# Hint messages already printed by the call above.
except exceptions.ClusterNotUpError as e:
click.echo(e)
sys.exit(1)


Expand Down
Loading
Loading