Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Release] Release 0.7.1 #4438

Open
wants to merge 19 commits into
base: releases/0.7.1_pure
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/multi_echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ def run(cluster: Optional[str] = None, cloud: Optional[str] = None):

# Submit multiple tasks in parallel to trigger queueing behaviors.
def _exec(i):
task = sky.Task(run=f'echo {i}; sleep 5')
resources = sky.Resources(accelerators={'T4': 0.5})
task = sky.Task(run=f'echo {i}; sleep 60')
resources = sky.Resources(accelerators={'T4': 0.05})
task.set_resources(resources)
sky.exec(task, cluster_name=cluster, detach_run=True)

with pool.ThreadPool(8) as p:
list(p.imap(_exec, range(32)))
list(p.imap(_exec, range(150)))


if __name__ == '__main__':
Expand Down
58 changes: 45 additions & 13 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@
_ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, '
'please retry after a while.')

# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't
# see any instances in the cloud, the instances might be in the proccess of
# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double
# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY
# should be set longer than the delay between (sending the create instance
# request) and (the instances appearing on the cloud).
# See https://github.com/skypilot-org/skypilot/issues/4431.
_LAUNCH_DOUBLE_CHECK_WINDOW = 60
_LAUNCH_DOUBLE_CHECK_DELAY = 1

# Include the fields that will be used for generating tags that distinguishes
# the cluster in ray, to avoid the stopped cluster being discarded due to
# updates in the yaml template.
Expand Down Expand Up @@ -1567,14 +1577,14 @@ def check_can_clone_disk_and_override_task(
The task to use and the resource handle of the source cluster.

Raises:
ValueError: If the source cluster does not exist.
exceptions.ClusterDoesNotExist: If the source cluster does not exist.
exceptions.NotSupportedError: If the source cluster is not valid or the
task is not compatible to clone disk from the source cluster.
"""
source_cluster_status, handle = refresh_cluster_status_handle(cluster_name)
if source_cluster_status is None:
with ux_utils.print_exception_no_traceback():
raise ValueError(
raise exceptions.ClusterDoesNotExist(
f'Cannot find cluster {cluster_name!r} to clone disk from.')

if not isinstance(handle, backends.CloudVmRayResourceHandle):
Expand Down Expand Up @@ -1732,13 +1742,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
logger.debug(
f'Refreshing status ({cluster_name!r}) failed to get IPs.')
except RuntimeError as e:
logger.debug(str(e))
logger.debug(common_utils.format_exception(e))
except Exception as e: # pylint: disable=broad-except
# This can be raised by `external_ssh_ports()`, due to the
# underlying call to kubernetes API.
logger.debug(
f'Refreshing status ({cluster_name!r}) failed: '
f'{common_utils.format_exception(e, use_bracket=True)}')
logger.debug(f'Refreshing status ({cluster_name!r}) failed: ',
exc_info=e)
return False

# Determining if the cluster is healthy (UP):
Expand All @@ -1765,6 +1774,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return record

# All cases below are transitioning the cluster to non-UP states.

if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >=
clouds.StatusVersion.SKYPILOT):
# Note: launched_at is set during sky launch, even on an existing
# cluster. This will catch the case where the cluster was terminated on
# the cloud and restarted by sky launch.
time_since_launch = time.time() - record['launched_at']
if (record['status'] == status_lib.ClusterStatus.INIT and
time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW):
# It's possible the instances for this cluster were just created,
# and haven't appeared yet in the cloud API/console. Wait for a bit
# and check again. This is a best-effort leak prevention check.
# See https://github.com/skypilot-org/skypilot/issues/4431.
time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY)
node_statuses = _query_cluster_status_via_cloud_api(handle)
# Note: even if all the node_statuses are UP now, we will still
# consider this cluster abnormal, and its status will be INIT.

if len(node_statuses) > handle.launched_nodes:
# Unexpected: in the queried region more than 1 cluster with the same
# constructed name tag returned. This will typically not happen unless
Expand Down Expand Up @@ -1793,13 +1820,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
f'{colorama.Style.RESET_ALL}')
assert len(node_statuses) <= handle.launched_nodes

# If the node_statuses is empty, all the nodes are terminated. We can
# safely set the cluster status to TERMINATED. This handles the edge case
# where the cluster is terminated by the user manually through the UI.
# If the node_statuses is empty, it should mean that all the nodes are
# terminated and we can set the cluster status to TERMINATED. This handles
# the edge case where the cluster is terminated by the user manually through
# the UI.
to_terminate = not node_statuses

# A cluster is considered "abnormal", if not all nodes are TERMINATED or
# not all nodes are STOPPED. We check that with the following logic:
# A cluster is considered "abnormal", if some (but not all) nodes are
# TERMINATED, or not all nodes are STOPPED. We check that with the following
# logic:
# * Not all nodes are terminated and there's at least one node
# terminated; or
# * Any of the non-TERMINATED nodes is in a non-STOPPED status.
Expand All @@ -1811,6 +1840,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
# cluster is probably down.
# * The cluster is partially terminated or stopped should be considered
# abnormal.
# * The cluster is partially or completely in the INIT state, which means
# that provisioning was interrupted. This is considered abnormal.
#
# An abnormal cluster will transition to INIT and have any autostop setting
# reset (unless it's autostopping/autodowning).
Expand Down Expand Up @@ -2060,7 +2091,7 @@ def check_cluster_available(
"""Check if the cluster is available.

Raises:
ValueError: if the cluster does not exist.
exceptions.ClusterDoesNotExist: if the cluster does not exist.
exceptions.ClusterNotUpError: if the cluster is not UP.
exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -2125,7 +2156,8 @@ def check_cluster_available(
error_msg += message

with ux_utils.print_exception_no_traceback():
raise ValueError(f'{colorama.Fore.YELLOW}{error_msg}{reset}')
raise exceptions.ClusterDoesNotExist(
f'{colorama.Fore.YELLOW}{error_msg}{reset}')
assert cluster_status is not None, 'handle is not None but status is None'
backend = get_backend_from_handle(handle)
if check_cloud_vm_ray_backend and not isinstance(
Expand Down
58 changes: 49 additions & 9 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
# The maximum retry count for fetching IP address.
_FETCH_IP_MAX_ATTEMPTS = 3

# How many times to query the cloud provider to make sure instances are
# stopping/terminating, and how long to wait between each query.
_TEARDOWN_WAIT_MAX_ATTEMPTS = 10
_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1

_TEARDOWN_FAILURE_MESSAGE = (
f'\n{colorama.Fore.RED}Failed to terminate '
'{cluster_name}. {extra_reason}'
Expand Down Expand Up @@ -2357,15 +2362,17 @@ def get_command_runners(self,
zip(ip_list, port_list), **ssh_credentials)
return runners
if self.cached_cluster_info is None:
# We have `or self.cached_external_ips is None` here, because
# We have `and self.cached_external_ips is None` here, because
# when a cluster's cloud is just upgraded to the new provsioner,
# although it has the cached_external_ips, the cached_cluster_info
# can be None. We need to update it here, even when force_cached is
# set to True.
# TODO: We can remove `self.cached_external_ips is None` after
# version 0.8.0.
assert not force_cached or self.cached_external_ips is not None, (
force_cached, self.cached_external_ips)
if force_cached and self.cached_external_ips is None:
raise RuntimeError(
'Tried to use cached cluster info, but it\'s missing for '
f'cluster "{self.cluster_name}"')
self._update_cluster_info()
assert self.cached_cluster_info is not None, self
runners = provision_lib.get_command_runners(
Expand Down Expand Up @@ -2784,9 +2791,6 @@ def _provision(
if e.no_failover:
error_message = str(e)
else:
# Clean up the cluster's entry in `sky status`.
global_user_state.remove_cluster(cluster_name,
terminate=True)
usage_lib.messages.usage.update_final_cluster_status(
None)
error_message = (
Expand Down Expand Up @@ -3928,7 +3932,6 @@ def teardown_no_lock(self,
limit=1000).get_result()['items']
vpc_id = None
try:
# pylint: disable=line-too-long
vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit(
':', 1)[-1]
vpc_found = True
Expand All @@ -3937,7 +3940,6 @@ def teardown_no_lock(self,
returncode = -1

if vpc_found:
# pylint: disable=line-too-long E1136
# Delete VPC and it's associated resources
vpc_provider = IBMVPCProvider(
config_provider['resource_group_id'], region,
Expand Down Expand Up @@ -4058,6 +4060,7 @@ def post_teardown_cleanup(self,
* Removing the terminated cluster's scripts and ray yaml files.
"""
cluster_name_on_cloud = handle.cluster_name_on_cloud
cloud = handle.launched_resources.cloud

if (terminate and handle.launched_resources.is_image_managed is True):
# Delete the image when terminating a "cloned" cluster, i.e.,
Expand All @@ -4078,7 +4081,6 @@ def post_teardown_cleanup(self,
'remove it manually to avoid image leakage. Details: '
f'{common_utils.format_exception(e, use_bracket=True)}')
if terminate:
cloud = handle.launched_resources.cloud
config = common_utils.read_yaml(handle.cluster_yaml)
try:
cloud.check_features_are_supported(
Expand All @@ -4105,6 +4107,44 @@ def post_teardown_cleanup(self,
config = common_utils.read_yaml(handle.cluster_yaml)
backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name)

# Confirm that instances have actually transitioned state before
# updating the state database. We do this immediately before removing
# the state from the database, so that we can guarantee that this is
# always called before the state is removed. We considered running this
# check as part of provisioner.teardown_cluster or
# provision.terminate_instances, but it would open the door code paths
# that successfully call this function but do not first call
# teardown_cluster or terminate_instances. See
# https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032
attempts = 0
while True:
logger.debug(f'instance statuses attempt {attempts + 1}')
node_status_dict = provision_lib.query_instances(
repr(cloud),
cluster_name_on_cloud,
config['provider'],
non_terminated_only=False)

unexpected_node_state: Optional[Tuple[str, str]] = None
for node_id, node_status in node_status_dict.items():
logger.debug(f'{node_id} status: {node_status}')
# FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish
# between "stopping/stopped" and "terminating/terminated", so we
# allow for either status instead of casing on `terminate`.
if node_status not in [None, status_lib.ClusterStatus.STOPPED]:
unexpected_node_state = (node_id, node_status)

if unexpected_node_state is None:
break

attempts += 1
if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS:
time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS)
else:
(node_id, node_status) = unexpected_node_state
raise RuntimeError(f'Instance {node_id} in unexpected state '
f'{node_status}.')

global_user_state.remove_cluster(handle.cluster_name,
terminate=terminate)

Expand Down
43 changes: 25 additions & 18 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ def _start(
cluster_status, handle = backend_utils.refresh_cluster_status_handle(
cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
raise exceptions.ClusterDoesNotExist(
f'Cluster {cluster_name!r} does not exist.')
if not force and cluster_status == status_lib.ClusterStatus.UP:
sky_logging.print(f'Cluster {cluster_name!r} is already up.')
return handle
Expand Down Expand Up @@ -359,12 +360,13 @@ def start(
Useful for upgrading SkyPilot runtime.

Raises:
ValueError: argument values are invalid: (1) the specified cluster does
not exist; (2) if ``down`` is set to True but
``idle_minutes_to_autostop`` is None; (3) if the specified cluster is
the managed jobs controller, and either ``idle_minutes_to_autostop``
is not None or ``down`` is True (omit them to use the default
autostop settings).
ValueError: argument values are invalid: (1) if ``down`` is set to True
but ``idle_minutes_to_autostop`` is None; (2) if the specified
cluster is the managed jobs controller, and either
``idle_minutes_to_autostop`` is not None or ``down`` is True (omit
them to use the default autostop settings).
sky.exceptions.ClusterDoesNotExist: the specified cluster does not
exist.
sky.exceptions.NotSupportedError: if the cluster to restart was
launched using a non-default backend that does not support this
operation.
Expand Down Expand Up @@ -412,7 +414,8 @@ def stop(cluster_name: str, purge: bool = False) -> None:
related resources.

Raises:
ValueError: the specified cluster does not exist.
sky.exceptions.ClusterDoesNotExist: the specified cluster does not
exist.
RuntimeError: failed to stop the cluster.
sky.exceptions.NotSupportedError: if the specified cluster is a spot
cluster, or a TPU VM Pod cluster, or the managed jobs controller.
Expand All @@ -423,7 +426,8 @@ def stop(cluster_name: str, purge: bool = False) -> None:
f'is not supported.')
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
raise exceptions.ClusterDoesNotExist(
f'Cluster {cluster_name!r} does not exist.')

backend = backend_utils.get_backend_from_handle(handle)

Expand Down Expand Up @@ -467,14 +471,16 @@ def down(cluster_name: str, purge: bool = False) -> None:
resources.

Raises:
ValueError: the specified cluster does not exist.
sky.exceptions.ClusterDoesNotExist: the specified cluster does not
exist.
RuntimeError: failed to tear down the cluster.
sky.exceptions.NotSupportedError: the specified cluster is the managed
jobs controller.
"""
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is None:
raise ValueError(f'Cluster {cluster_name!r} does not exist.')
raise exceptions.ClusterDoesNotExist(
f'Cluster {cluster_name!r} does not exist.')

usage_lib.record_cluster_name_for_current_operation(cluster_name)
backend = backend_utils.get_backend_from_handle(handle)
Expand Down Expand Up @@ -521,7 +527,7 @@ def autostop(
rather than autostop (restartable).

Raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend or the cluster is TPU VM Pod.
Expand Down Expand Up @@ -615,7 +621,7 @@ def queue(cluster_name: str,
}
]
raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -674,7 +680,8 @@ def cancel(
worker node is preempted in the spot cluster.

Raises:
ValueError: if arguments are invalid, or the cluster does not exist.
ValueError: if arguments are invalid.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the specified cluster is a
controller that does not support this operation.
Expand Down Expand Up @@ -749,8 +756,8 @@ def tail_logs(cluster_name: str,
Please refer to the sky.cli.tail_logs for the document.

Raises:
ValueError: arguments are invalid or the cluster is not supported or
the cluster does not exist.
ValueError: if arguments are invalid or the cluster is not supported.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -792,7 +799,7 @@ def download_logs(
Returns:
Dict[str, str]: a mapping of job_id to local log path.
Raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down Expand Up @@ -837,7 +844,7 @@ def job_status(cluster_name: str,
If job_ids is None and there is no job on the cluster, it will return
{None: None}.
Raises:
ValueError: if the cluster does not exist.
sky.exceptions.ClusterDoesNotExist: if the cluster does not exist.
sky.exceptions.ClusterNotUpError: if the cluster is not UP.
sky.exceptions.NotSupportedError: if the cluster is not based on
CloudVmRayBackend.
Expand Down
Loading
Loading