Skip to content

Commit

Permalink
combine reserved prefix & name
Browse files Browse the repository at this point in the history
  • Loading branch information
cblmemo committed Sep 14, 2023
1 parent 78d7342 commit baf62f9
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 107 deletions.
77 changes: 54 additions & 23 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Util constants/functions for the backends."""
import copy
import dataclasses
from datetime import datetime
import difflib
import enum
Expand All @@ -14,7 +15,8 @@
import textwrap
import time
import typing
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
from typing import (Any, Callable, Dict, List, Optional, Sequence, Set, Tuple,
Union)
import uuid

import colorama
Expand Down Expand Up @@ -101,18 +103,52 @@
# Note: This value cannot be too small, otherwise OOM issue may occur.
DEFAULT_TASK_CPU_DEMAND = 0.5

# Mapping from reserved cluster names to the corresponding group name (logging
# purpose).
# NOTE: each group can only have one reserved cluster name for now.
SKY_RESERVED_CLUSTER_NAMES: Dict[str, str] = {
spot_lib.SPOT_CONTROLLER_NAME: 'Managed spot controller'
}

# Mapping from reserved cluster prefixes to the corresponding group name
# (logging purpose).
SKY_RESERVED_CLUSTER_PREFIXES: Dict[str, str] = {
serve_lib.CONTROLLER_PREFIX: 'SkyServe controller',
}
@dataclasses.dataclass
class ReservedClusterRecord:
"""Record for reserved cluster group."""
group_name: str
check: Callable[[str], bool]
sky_status_hint: str
decline_stop_hint: str
decline_cancel_hint: str
check_cluster_name_hint: str


class ReservedClusterGroup(enum.Enum):
"""Reserved cluster groups for skypilot."""
# NOTE(dev): Keep this align with
# sky/cli.py::_RESERVED_CLUSTER_GROUP_TO_HINT_OR_RAISE
SPOT_CONTROLLER = ReservedClusterRecord(
group_name='Managed spot controller',
check=lambda name: name == spot_lib.SPOT_CONTROLLER_NAME,
sky_status_hint=(
f'* To see detailed spot job status: {colorama.Style.BRIGHT}'
f'sky spot queue{colorama.Style.RESET_ALL}'),
decline_stop_hint=('Spot controller will be auto-stopped after all '
'spot jobs finish.'),
check_cluster_name_hint=(
f'Cluster {spot_lib.SPOT_CONTROLLER_NAME} is reserved for '
'managed spot controller. '))
SKY_SERVE_CONTROLLER = ReservedClusterRecord(
group_name='Sky Serve controller',
check=lambda name: name.startswith(serve_lib.CONTROLLER_PREFIX),
sky_status_hint=(
f'* To see detailed service status: {colorama.Style.BRIGHT}'
f'sky serve status{colorama.Style.RESET_ALL}'),
decline_stop_hint=(f'To teardown a service, use {colorama.Style.BRIGHT}'
f'sky serve down{colorama.Style.RESET_ALL}.'),
check_cluster_name_hint=(
f'Cluster prefix {serve_lib.CONTROLLER_PREFIX} is reserved for '
'sky serve controller. '))

@classmethod
def get_group(cls, name: str) -> Optional['ReservedClusterGroup']:
for group in cls:
if group.value.check(name):
return group
return None


# Filelocks for the cluster status change.
CLUSTER_STATUS_LOCK_PATH = os.path.expanduser('~/.sky/.{}.lock')
Expand Down Expand Up @@ -2534,7 +2570,7 @@ def get_clusters(
if not include_reserved:
records = [
record for record in records
if record['name'] not in SKY_RESERVED_CLUSTER_NAMES
if ReservedClusterGroup.get_group(record['name']) is None
]

yellow = colorama.Fore.YELLOW
Expand Down Expand Up @@ -2875,16 +2911,11 @@ def check_cluster_name_not_reserved(
Returns:
None, if the cluster name is not reserved.
"""
msg = None
if cluster_name in SKY_RESERVED_CLUSTER_NAMES:
msg = (f'Cluster {cluster_name!r} is reserved for the '
f'{SKY_RESERVED_CLUSTER_NAMES[cluster_name].lower()}.')
for prefix in SKY_RESERVED_CLUSTER_PREFIXES:
if cluster_name is not None and cluster_name.startswith(prefix):
msg = (f'Cluster prefix {prefix!r} is reserved for the '
f'{SKY_RESERVED_CLUSTER_PREFIXES[prefix].lower()}.')
break
if msg is not None:
if cluster_name is None:
return
group = ReservedClusterGroup.get_group(cluster_name)
if group is not None:
msg = group.value.check_cluster_name_hint
if operation_str is not None:
msg += f' {operation_str} is not allowed.'
with ux_utils.print_exception_no_traceback():
Expand Down
131 changes: 49 additions & 82 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1754,38 +1754,30 @@ def status(all: bool, refresh: bool, show_spot_jobs: bool, clusters: List[str]):
query_clusters = _get_glob_clusters(clusters)
cluster_records = core.status(cluster_names=query_clusters,
refresh=refresh)
hints = []
nonreserved_cluster_records = []
reserved_clusters = []
# TODO(tian): Rename this variable if other reserved prefix are added.
skyserve_controllers = []
for cluster_record in cluster_records:
cluster_name = cluster_record['name']
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
group = backend_utils.ReservedClusterGroup.get_group(cluster_name)
if group is not None:
reserved_clusters.append(cluster_record)
hints.append(group.value.sky_status_hint)
else:
for prefix in backend_utils.SKY_RESERVED_CLUSTER_PREFIXES:
if cluster_name.startswith(prefix):
skyserve_controllers.append(cluster_record)
break
else:
nonreserved_cluster_records.append(cluster_record)
nonreserved_cluster_records.append(cluster_record)
local_clusters = onprem_utils.check_and_get_local_clusters(
suppress_error=True)

num_pending_autostop = 0
num_pending_autostop += status_utils.show_status_table(
nonreserved_cluster_records + reserved_clusters, all)
nonreserved_cluster_records, all)
status_utils.show_local_status_table(local_clusters)

hints = []
if skyserve_controllers:
if reserved_clusters:
click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}\n'
f'SkyServe Controllers{colorama.Style.RESET_ALL}')
f'Controllers{colorama.Style.RESET_ALL}')
num_pending_autostop += status_utils.show_status_table(
skyserve_controllers, all)
hints.append(
f'* To see detailed service status: {colorama.Style.BRIGHT}'
f'sky serve status{colorama.Style.RESET_ALL}')
reserved_clusters, all)

if show_spot_jobs:
click.echo(f'\n{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
Expand Down Expand Up @@ -1879,9 +1871,9 @@ def cost_report(all: bool): # pylint: disable=redefined-builtin
reserved_clusters = dict()
for cluster_record in cluster_records:
cluster_name = cluster_record['name']
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
cluster_group_name = backend_utils.SKY_RESERVED_CLUSTER_NAMES[
cluster_name]
group = backend_utils.ReservedClusterGroup.get_group(cluster_name)
if group is not None:
cluster_group_name = group.value.group_name
# to display most recent entry for each reserved cluster
# TODO(sgurram): fix assumption of sorted order of clusters
if cluster_group_name not in reserved_clusters:
Expand Down Expand Up @@ -2442,7 +2434,8 @@ def start(
clusters = [
cluster['name']
for cluster in global_user_state.get_clusters()
if cluster['name'] not in backend_utils.SKY_RESERVED_CLUSTER_NAMES
if backend_utils.ReservedClusterGroup.get_group(cluster['name']) is
None
]

if not clusters:
Expand Down Expand Up @@ -2510,26 +2503,24 @@ def start(
# Checks for reserved clusters (spot controller).
reserved, non_reserved = [], []
for name in to_start:
if name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
if backend_utils.ReservedClusterGroup.get_group(name) is not None:
reserved.append(name)
else:
non_reserved.append(name)
if reserved and non_reserved:
assert len(reserved) == 1, reserved
# Keep this behavior the same as _down_or_stop_clusters().
raise click.UsageError(
'Starting the spot controller with other cluster(s) '
'Starting skypilot controllers with other cluster(s) '
'is currently not supported.\n'
'Please start the former independently.')
if reserved:
assert len(reserved) == 1, reserved
bold = backend_utils.BOLD
reset_bold = backend_utils.RESET_BOLD
if idle_minutes_to_autostop is not None:
raise click.UsageError(
'Autostop options are currently not allowed when starting the '
'spot controller. Use the default autostop settings by directly'
f' calling: {bold}sky start {reserved[0]}{reset_bold}')
'controllers. Use the default autostop settings by directly '
f'calling: {bold}sky start {" ".join(reserved)}{reset_bold}')

if not yes:
cluster_str = 'clusters' if len(to_start) > 1 else 'cluster'
Expand Down Expand Up @@ -2698,10 +2689,18 @@ def _hint_or_raise_for_down_sky_serve_controller(controller_name: str):
f'the service{plural} first with {colorama.Style.BRIGHT}sky '
f'serve down {" ".join(service_names)}'
f'{colorama.Style.RESET_ALL}.')
msg = (f'Tearing down sky serve controller: {controller_name}.')
msg = f'Tearing down sky serve controller: {controller_name}.'
click.echo(msg)


_RESERVED_CLUSTER_GROUP_TO_HINT_OR_RAISE = {
backend_utils.ReservedClusterGroup.SPOT_CONTROLLER:
(_hint_or_raise_for_down_spot_controller),
backend_utils.ReservedClusterGroup.SKY_SERVE_CONTROLLER:
(_hint_or_raise_for_down_sky_serve_controller),
}


def _down_or_stop_clusters(
names: List[str],
apply_to_all: Optional[bool],
Expand All @@ -2711,9 +2710,9 @@ def _down_or_stop_clusters(
idle_minutes_to_autostop: Optional[int] = None) -> None:
"""Tears down or (auto-)stops a cluster (or all clusters).
Reserved clusters (spot controller) can only be terminated if the cluster
name is explicitly and uniquely specified (not via glob) and purge is set
to True.
Reserved clusters (spot controller and sky serve controller) can only be
terminated if the cluster name is explicitly and uniquely specified (not
via glob) and purge is set to True.
"""
if down:
command = 'down'
Expand Down Expand Up @@ -2745,12 +2744,12 @@ def _down_or_stop_clusters(
if len(names) > 0:
reserved_clusters = [
name for name in names
if name in backend_utils.SKY_RESERVED_CLUSTER_NAMES
if backend_utils.ReservedClusterGroup.get_group(name) is not None
]
reserved_clusters_str = ', '.join(map(repr, reserved_clusters))
names = [
name for name in _get_glob_clusters(names)
if name not in backend_utils.SKY_RESERVED_CLUSTER_NAMES
if backend_utils.ReservedClusterGroup.get_group(name) is None
]
if not down:
local_clusters = onprem_utils.check_and_get_local_clusters()
Expand All @@ -2763,50 +2762,19 @@ def _down_or_stop_clusters(
f'Skipping local cluster {c}, as it does not support '
'`sky stop/autostop`.'))
]
name_to_reserved_prefix = dict()
for name in names:
for prefix in backend_utils.SKY_RESERVED_CLUSTER_PREFIXES:
if name.startswith(prefix):
name_to_reserved_prefix[name] = prefix
break
names = [name for name in names if name not in name_to_reserved_prefix]
reserve_prefix_str = ', '.join(
[f'{prefix}*' for prefix in name_to_reserved_prefix.values()])
if len(name_to_reserved_prefix) > 0:
if len(names) != 0:
names_str = ', '.join(map(repr, names))
raise click.UsageError(
f'{operation} cluster(s) with reserved prefix '
f'{reserve_prefix_str} with other cluster(s) '
f'{names_str} is currently not supported.\n'
'Please omit the cluster(s) with reserved prefix '
f'{name_to_reserved_prefix.values()}.')
if not down:
raise click.UsageError(
f'{operation} cluster(s) with reserved prefix '
f'{reserve_prefix_str} is not supported. To teardown a '
'service, please use `sky serve down`.')
else:
if len(name_to_reserved_prefix) > 1:
raise click.UsageError(
f'{operation} multiple clusters with reserved prefix '
f'{reserve_prefix_str} is currently not supported.\n'
'Please omit all but one of the clusters.')
# We can only teardown one reserved cluster (sky serve
# controller) for now.
_hint_or_raise_for_down_sky_serve_controller(
list(name_to_reserved_prefix.keys())[0])
confirm_str = 'delete'
user_input = click.prompt(
f'To proceed, please type {colorama.Style.BRIGHT}'
f'{confirm_str!r}{colorama.Style.RESET_ALL}',
type=str)
if user_input != confirm_str:
raise click.Abort()
no_confirm = True
# Make sure the reserved clusters are explicitly specified without other
# normal clusters.
if len(reserved_clusters) > 0:
name2group: Dict[str, backend_utils.ReservedClusterGroup] = dict()
for name in reserved_clusters:
group = backend_utils.ReservedClusterGroup.get_group(name)
assert group is not None
name2group[name] = group
# Use set to remove duplicated sky serve controller messages.
decline_stop_hints = set()
for group in name2group.values():
decline_stop_hints.add(group.value.decline_stop_hint)
decline_stop_hints = ' '.join(decline_stop_hints)
if len(names) != 0:
names_str = ', '.join(map(repr, names))
raise click.UsageError(
Expand All @@ -2818,12 +2786,12 @@ def _down_or_stop_clusters(
raise click.UsageError(
f'{operation} reserved cluster(s) '
f'{reserved_clusters_str} is currently not supported. '
'It will be auto-stopped after all spot jobs finish.')
f'{decline_stop_hints}')
else:
# TODO(zhwu): We can only have one reserved cluster (spot
# controller).
assert len(reserved_clusters) == 1, reserved_clusters
_hint_or_raise_for_down_spot_controller(reserved_clusters[0])
for reserved_cluster in reserved_clusters:
hint_or_raise = _RESERVED_CLUSTER_GROUP_TO_HINT_OR_RAISE[
name2group[reserved_cluster]]
hint_or_raise(reserved_cluster)
confirm_str = 'delete'
user_input = click.prompt(
f'To proceed, please check the warning above and type '
Expand All @@ -2845,9 +2813,8 @@ def _down_or_stop_clusters(
# Otherwise, it would be very easy to accidentally delete a reserved
# cluster.
names = [
record['name']
for record in all_clusters
if record['name'] not in backend_utils.SKY_RESERVED_CLUSTER_NAMES
record['name'] for record in all_clusters if
backend_utils.ReservedClusterGroup.get_group(record['name']) is None
]

clusters = []
Expand Down
4 changes: 2 additions & 2 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def stop(cluster_name: str, purge: bool = False) -> None:
sky.exceptions.NotSupportedError: if the specified cluster is a spot
cluster, or a TPU VM Pod cluster, or the managed spot controller.
"""
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
if backend_utils.ReservedClusterGroup.get_group(cluster_name) is not None:
raise exceptions.NotSupportedError(
f'Stopping sky reserved cluster {cluster_name!r} '
f'is not supported.')
Expand Down Expand Up @@ -422,7 +422,7 @@ def autostop(
if is_cancel:
option_str = '{stop,down}'
operation = f'{verb} auto{option_str}'
if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES:
if backend_utils.ReservedClusterGroup.get_group(cluster_name) is not None:
raise exceptions.NotSupportedError(
f'{operation} sky reserved cluster {cluster_name!r} '
f'is not supported.')
Expand Down

0 comments on commit baf62f9

Please sign in to comment.