diff --git a/examples/multi_echo.py b/examples/multi_echo.py index a9fe49fad2d..7f310a4bb6b 100644 --- a/examples/multi_echo.py +++ b/examples/multi_echo.py @@ -31,13 +31,13 @@ def run(cluster: Optional[str] = None, cloud: Optional[str] = None): # Submit multiple tasks in parallel to trigger queueing behaviors. def _exec(i): - task = sky.Task(run=f'echo {i}; sleep 5') - resources = sky.Resources(accelerators={'T4': 0.5}) + task = sky.Task(run=f'echo {i}; sleep 60') + resources = sky.Resources(accelerators={'T4': 0.05}) task.set_resources(resources) sky.exec(task, cluster_name=cluster, detach_run=True) with pool.ThreadPool(8) as p: - list(p.imap(_exec, range(32))) + list(p.imap(_exec, range(150))) if __name__ == '__main__': diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index da1e68991a4..c45524cbe5a 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -111,6 +111,16 @@ _ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, ' 'please retry after a while.') +# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't +# see any instances in the cloud, the instances might be in the proccess of +# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double +# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY +# should be set longer than the delay between (sending the create instance +# request) and (the instances appearing on the cloud). +# See https://github.com/skypilot-org/skypilot/issues/4431. +_LAUNCH_DOUBLE_CHECK_WINDOW = 60 +_LAUNCH_DOUBLE_CHECK_DELAY = 1 + # Include the fields that will be used for generating tags that distinguishes # the cluster in ray, to avoid the stopped cluster being discarded due to # updates in the yaml template. @@ -1567,14 +1577,14 @@ def check_can_clone_disk_and_override_task( The task to use and the resource handle of the source cluster. Raises: - ValueError: If the source cluster does not exist. + exceptions.ClusterDoesNotExist: If the source cluster does not exist. exceptions.NotSupportedError: If the source cluster is not valid or the task is not compatible to clone disk from the source cluster. """ source_cluster_status, handle = refresh_cluster_status_handle(cluster_name) if source_cluster_status is None: with ux_utils.print_exception_no_traceback(): - raise ValueError( + raise exceptions.ClusterDoesNotExist( f'Cannot find cluster {cluster_name!r} to clone disk from.') if not isinstance(handle, backends.CloudVmRayResourceHandle): @@ -1732,13 +1742,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: logger.debug( f'Refreshing status ({cluster_name!r}) failed to get IPs.') except RuntimeError as e: - logger.debug(str(e)) + logger.debug(common_utils.format_exception(e)) except Exception as e: # pylint: disable=broad-except # This can be raised by `external_ssh_ports()`, due to the # underlying call to kubernetes API. - logger.debug( - f'Refreshing status ({cluster_name!r}) failed: ' - f'{common_utils.format_exception(e, use_bracket=True)}') + logger.debug(f'Refreshing status ({cluster_name!r}) failed: ', + exc_info=e) return False # Determining if the cluster is healthy (UP): @@ -1765,6 +1774,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: return record # All cases below are transitioning the cluster to non-UP states. + + if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >= + clouds.StatusVersion.SKYPILOT): + # Note: launched_at is set during sky launch, even on an existing + # cluster. This will catch the case where the cluster was terminated on + # the cloud and restarted by sky launch. + time_since_launch = time.time() - record['launched_at'] + if (record['status'] == status_lib.ClusterStatus.INIT and + time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW): + # It's possible the instances for this cluster were just created, + # and haven't appeared yet in the cloud API/console. Wait for a bit + # and check again. This is a best-effort leak prevention check. + # See https://github.com/skypilot-org/skypilot/issues/4431. + time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY) + node_statuses = _query_cluster_status_via_cloud_api(handle) + # Note: even if all the node_statuses are UP now, we will still + # consider this cluster abnormal, and its status will be INIT. + if len(node_statuses) > handle.launched_nodes: # Unexpected: in the queried region more than 1 cluster with the same # constructed name tag returned. This will typically not happen unless @@ -1793,13 +1820,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: f'{colorama.Style.RESET_ALL}') assert len(node_statuses) <= handle.launched_nodes - # If the node_statuses is empty, all the nodes are terminated. We can - # safely set the cluster status to TERMINATED. This handles the edge case - # where the cluster is terminated by the user manually through the UI. + # If the node_statuses is empty, it should mean that all the nodes are + # terminated and we can set the cluster status to TERMINATED. This handles + # the edge case where the cluster is terminated by the user manually through + # the UI. to_terminate = not node_statuses - # A cluster is considered "abnormal", if not all nodes are TERMINATED or - # not all nodes are STOPPED. We check that with the following logic: + # A cluster is considered "abnormal", if some (but not all) nodes are + # TERMINATED, or not all nodes are STOPPED. We check that with the following + # logic: # * Not all nodes are terminated and there's at least one node # terminated; or # * Any of the non-TERMINATED nodes is in a non-STOPPED status. @@ -1811,6 +1840,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: # cluster is probably down. # * The cluster is partially terminated or stopped should be considered # abnormal. + # * The cluster is partially or completely in the INIT state, which means + # that provisioning was interrupted. This is considered abnormal. # # An abnormal cluster will transition to INIT and have any autostop setting # reset (unless it's autostopping/autodowning). @@ -2060,7 +2091,7 @@ def check_cluster_available( """Check if the cluster is available. Raises: - ValueError: if the cluster does not exist. + exceptions.ClusterDoesNotExist: if the cluster does not exist. exceptions.ClusterNotUpError: if the cluster is not UP. exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -2125,7 +2156,8 @@ def check_cluster_available( error_msg += message with ux_utils.print_exception_no_traceback(): - raise ValueError(f'{colorama.Fore.YELLOW}{error_msg}{reset}') + raise exceptions.ClusterDoesNotExist( + f'{colorama.Fore.YELLOW}{error_msg}{reset}') assert cluster_status is not None, 'handle is not None but status is None' backend = get_backend_from_handle(handle) if check_cloud_vm_ray_backend and not isinstance( diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index a256277085f..07bdaa5cfb9 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -98,6 +98,11 @@ # The maximum retry count for fetching IP address. _FETCH_IP_MAX_ATTEMPTS = 3 +# How many times to query the cloud provider to make sure instances are +# stopping/terminating, and how long to wait between each query. +_TEARDOWN_WAIT_MAX_ATTEMPTS = 10 +_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1 + _TEARDOWN_FAILURE_MESSAGE = ( f'\n{colorama.Fore.RED}Failed to terminate ' '{cluster_name}. {extra_reason}' @@ -2357,15 +2362,17 @@ def get_command_runners(self, zip(ip_list, port_list), **ssh_credentials) return runners if self.cached_cluster_info is None: - # We have `or self.cached_external_ips is None` here, because + # We have `and self.cached_external_ips is None` here, because # when a cluster's cloud is just upgraded to the new provsioner, # although it has the cached_external_ips, the cached_cluster_info # can be None. We need to update it here, even when force_cached is # set to True. # TODO: We can remove `self.cached_external_ips is None` after # version 0.8.0. - assert not force_cached or self.cached_external_ips is not None, ( - force_cached, self.cached_external_ips) + if force_cached and self.cached_external_ips is None: + raise RuntimeError( + 'Tried to use cached cluster info, but it\'s missing for ' + f'cluster "{self.cluster_name}"') self._update_cluster_info() assert self.cached_cluster_info is not None, self runners = provision_lib.get_command_runners( @@ -2784,9 +2791,6 @@ def _provision( if e.no_failover: error_message = str(e) else: - # Clean up the cluster's entry in `sky status`. - global_user_state.remove_cluster(cluster_name, - terminate=True) usage_lib.messages.usage.update_final_cluster_status( None) error_message = ( @@ -3928,7 +3932,6 @@ def teardown_no_lock(self, limit=1000).get_result()['items'] vpc_id = None try: - # pylint: disable=line-too-long vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit( ':', 1)[-1] vpc_found = True @@ -3937,7 +3940,6 @@ def teardown_no_lock(self, returncode = -1 if vpc_found: - # pylint: disable=line-too-long E1136 # Delete VPC and it's associated resources vpc_provider = IBMVPCProvider( config_provider['resource_group_id'], region, @@ -4058,6 +4060,7 @@ def post_teardown_cleanup(self, * Removing the terminated cluster's scripts and ray yaml files. """ cluster_name_on_cloud = handle.cluster_name_on_cloud + cloud = handle.launched_resources.cloud if (terminate and handle.launched_resources.is_image_managed is True): # Delete the image when terminating a "cloned" cluster, i.e., @@ -4078,7 +4081,6 @@ def post_teardown_cleanup(self, 'remove it manually to avoid image leakage. Details: ' f'{common_utils.format_exception(e, use_bracket=True)}') if terminate: - cloud = handle.launched_resources.cloud config = common_utils.read_yaml(handle.cluster_yaml) try: cloud.check_features_are_supported( @@ -4105,6 +4107,44 @@ def post_teardown_cleanup(self, config = common_utils.read_yaml(handle.cluster_yaml) backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name) + # Confirm that instances have actually transitioned state before + # updating the state database. We do this immediately before removing + # the state from the database, so that we can guarantee that this is + # always called before the state is removed. We considered running this + # check as part of provisioner.teardown_cluster or + # provision.terminate_instances, but it would open the door code paths + # that successfully call this function but do not first call + # teardown_cluster or terminate_instances. See + # https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032 + attempts = 0 + while True: + logger.debug(f'instance statuses attempt {attempts + 1}') + node_status_dict = provision_lib.query_instances( + repr(cloud), + cluster_name_on_cloud, + config['provider'], + non_terminated_only=False) + + unexpected_node_state: Optional[Tuple[str, str]] = None + for node_id, node_status in node_status_dict.items(): + logger.debug(f'{node_id} status: {node_status}') + # FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish + # between "stopping/stopped" and "terminating/terminated", so we + # allow for either status instead of casing on `terminate`. + if node_status not in [None, status_lib.ClusterStatus.STOPPED]: + unexpected_node_state = (node_id, node_status) + + if unexpected_node_state is None: + break + + attempts += 1 + if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS: + time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS) + else: + (node_id, node_status) = unexpected_node_state + raise RuntimeError(f'Instance {node_id} in unexpected state ' + f'{node_status}.') + global_user_state.remove_cluster(handle.cluster_name, terminate=terminate) diff --git a/sky/core.py b/sky/core.py index 496b8b8ad5e..35bce9fa7eb 100644 --- a/sky/core.py +++ b/sky/core.py @@ -268,7 +268,8 @@ def _start( cluster_status, handle = backend_utils.refresh_cluster_status_handle( cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') if not force and cluster_status == status_lib.ClusterStatus.UP: sky_logging.print(f'Cluster {cluster_name!r} is already up.') return handle @@ -359,12 +360,13 @@ def start( Useful for upgrading SkyPilot runtime. Raises: - ValueError: argument values are invalid: (1) the specified cluster does - not exist; (2) if ``down`` is set to True but - ``idle_minutes_to_autostop`` is None; (3) if the specified cluster is - the managed jobs controller, and either ``idle_minutes_to_autostop`` - is not None or ``down`` is True (omit them to use the default - autostop settings). + ValueError: argument values are invalid: (1) if ``down`` is set to True + but ``idle_minutes_to_autostop`` is None; (2) if the specified + cluster is the managed jobs controller, and either + ``idle_minutes_to_autostop`` is not None or ``down`` is True (omit + them to use the default autostop settings). + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. sky.exceptions.NotSupportedError: if the cluster to restart was launched using a non-default backend that does not support this operation. @@ -412,7 +414,8 @@ def stop(cluster_name: str, purge: bool = False) -> None: related resources. Raises: - ValueError: the specified cluster does not exist. + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. RuntimeError: failed to stop the cluster. sky.exceptions.NotSupportedError: if the specified cluster is a spot cluster, or a TPU VM Pod cluster, or the managed jobs controller. @@ -423,7 +426,8 @@ def stop(cluster_name: str, purge: bool = False) -> None: f'is not supported.') handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') backend = backend_utils.get_backend_from_handle(handle) @@ -467,14 +471,16 @@ def down(cluster_name: str, purge: bool = False) -> None: resources. Raises: - ValueError: the specified cluster does not exist. + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. RuntimeError: failed to tear down the cluster. sky.exceptions.NotSupportedError: the specified cluster is the managed jobs controller. """ handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') usage_lib.record_cluster_name_for_current_operation(cluster_name) backend = backend_utils.get_backend_from_handle(handle) @@ -521,7 +527,7 @@ def autostop( rather than autostop (restartable). Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend or the cluster is TPU VM Pod. @@ -615,7 +621,7 @@ def queue(cluster_name: str, } ] raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -674,7 +680,8 @@ def cancel( worker node is preempted in the spot cluster. Raises: - ValueError: if arguments are invalid, or the cluster does not exist. + ValueError: if arguments are invalid. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. @@ -749,8 +756,8 @@ def tail_logs(cluster_name: str, Please refer to the sky.cli.tail_logs for the document. Raises: - ValueError: arguments are invalid or the cluster is not supported or - the cluster does not exist. + ValueError: if arguments are invalid or the cluster is not supported. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -792,7 +799,7 @@ def download_logs( Returns: Dict[str, str]: a mapping of job_id to local log path. Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -837,7 +844,7 @@ def job_status(cluster_name: str, If job_ids is None and there is no job on the cluster, it will return {None: None}. Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. diff --git a/sky/exceptions.py b/sky/exceptions.py index f78c6605261..0eef19a5add 100644 --- a/sky/exceptions.py +++ b/sky/exceptions.py @@ -129,6 +129,13 @@ class ClusterSetUpError(Exception): pass +class ClusterDoesNotExist(ValueError): + """Raise when trying to operate on a cluster that does not exist.""" + # This extends ValueError for compatibility reasons - we used to throw + # ValueError instead of this. + pass + + class NotSupportedError(Exception): """Raised when a feature is not supported.""" pass diff --git a/sky/execution.py b/sky/execution.py index 42a39a90380..6a464d7fae8 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -294,7 +294,8 @@ def _execute( do_workdir = (Stage.SYNC_WORKDIR in stages and not dryrun and task.workdir is not None) do_file_mounts = (Stage.SYNC_FILE_MOUNTS in stages and not dryrun and - task.file_mounts is not None) + (task.file_mounts is not None or + task.storage_mounts is not None)) if do_workdir or do_file_mounts: logger.info(ux_utils.starting_message('Mounting files.')) @@ -523,8 +524,9 @@ def exec( # pylint: disable=redefined-builtin submitted. Raises: - ValueError: if the specified cluster does not exist or is not in UP - status. + ValueError: if the specified cluster is not in UP status. + sky.exceptions.ClusterDoesNotExist: if the specified cluster does not + exist. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. diff --git a/sky/jobs/recovery_strategy.py b/sky/jobs/recovery_strategy.py index e5c607696d8..369b4e62dc8 100644 --- a/sky/jobs/recovery_strategy.py +++ b/sky/jobs/recovery_strategy.py @@ -45,8 +45,9 @@ def terminate_cluster(cluster_name: str, max_retry: int = 3) -> None: usage_lib.messages.usage.set_internal() sky.down(cluster_name) return - except ValueError: + except exceptions.ClusterDoesNotExist: # The cluster is already down. + logger.debug(f'The cluster {cluster_name} is already down.') return except Exception as e: # pylint: disable=broad-except retry_cnt += 1 diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index 60159232787..229d7361e22 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -101,8 +101,8 @@ def cluster_status_map( ) -> Dict['AzureInstanceStatus', Optional[status_lib.ClusterStatus]]: return { cls.PENDING: status_lib.ClusterStatus.INIT, - cls.STOPPING: status_lib.ClusterStatus.INIT, cls.RUNNING: status_lib.ClusterStatus.UP, + cls.STOPPING: status_lib.ClusterStatus.STOPPED, cls.STOPPED: status_lib.ClusterStatus.STOPPED, cls.DELETING: None, } @@ -305,7 +305,8 @@ def _create_vm( network_profile=network_profile, identity=compute.VirtualMachineIdentity( type='UserAssigned', - user_assigned_identities={provider_config['msi']: {}})) + user_assigned_identities={provider_config['msi']: {}}), + priority=node_config['azure_arm_parameters'].get('priority', None)) vm_poller = compute_client.virtual_machines.begin_create_or_update( resource_group_name=provider_config['resource_group'], vm_name=vm_name, diff --git a/sky/provision/gcp/instance.py b/sky/provision/gcp/instance.py index 9872ad73dc7..6c09dbc6816 100644 --- a/sky/provision/gcp/instance.py +++ b/sky/provision/gcp/instance.py @@ -52,6 +52,8 @@ def _filter_instances( # non_terminated_only=True? # Will there be callers who would want this to be False? # stop() and terminate() for example already implicitly assume non-terminated. +# Currently, even with non_terminated_only=False, we may not have a dict entry +# for terminated instances, if they have already been fully deleted. @common_utils.retry def query_instances( cluster_name_on_cloud: str, diff --git a/sky/provision/lambda_cloud/instance.py b/sky/provision/lambda_cloud/instance.py index d10c36496ab..d33c97df95c 100644 --- a/sky/provision/lambda_cloud/instance.py +++ b/sky/provision/lambda_cloud/instance.py @@ -233,7 +233,7 @@ def query_instances( 'booting': status_lib.ClusterStatus.INIT, 'active': status_lib.ClusterStatus.UP, 'unhealthy': status_lib.ClusterStatus.INIT, - 'terminating': status_lib.ClusterStatus.INIT, + 'terminating': None, } statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} for instance_id, instance in instances.items(): diff --git a/sky/provision/paperspace/instance.py b/sky/provision/paperspace/instance.py index 5804362d102..6775304c8f7 100644 --- a/sky/provision/paperspace/instance.py +++ b/sky/provision/paperspace/instance.py @@ -286,12 +286,13 @@ def query_instances( assert provider_config is not None, (cluster_name_on_cloud, provider_config) instances = _filter_instances(cluster_name_on_cloud, None) + # https://docs.digitalocean.com/reference/paperspace/core/commands/machines/#show status_map = { 'starting': status_lib.ClusterStatus.INIT, 'restarting': status_lib.ClusterStatus.INIT, 'upgrading': status_lib.ClusterStatus.INIT, 'provisioning': status_lib.ClusterStatus.INIT, - 'stopping': status_lib.ClusterStatus.INIT, + 'stopping': status_lib.ClusterStatus.STOPPED, 'serviceready': status_lib.ClusterStatus.INIT, 'ready': status_lib.ClusterStatus.UP, 'off': status_lib.ClusterStatus.STOPPED, diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 12d42d8c79c..6e5ad5667da 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -11,8 +11,7 @@ import sqlite3 import subprocess import time -import typing -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional import colorama import filelock @@ -24,9 +23,6 @@ from sky.utils import db_utils from sky.utils import log_utils -if typing.TYPE_CHECKING: - from ray.dashboard.modules.job import pydantic_models as ray_pydantic - logger = sky_logging.init_logger(__name__) _JOB_STATUS_LOCK = '~/.sky/locks/.job_{}.lock' @@ -181,14 +177,27 @@ def _run_job(self, job_id: int, run_cmd: str): subprocess.Popen(run_cmd, shell=True, stdout=subprocess.DEVNULL) def schedule_step(self, force_update_jobs: bool = False) -> None: - jobs = self._get_jobs() - if len(jobs) > 0 or force_update_jobs: + if force_update_jobs: update_status() + pending_job_ids = self._get_pending_job_ids() # TODO(zhwu, mraheja): One optimization can be allowing more than one # job staying in the pending state after ray job submit, so that to be # faster to schedule a large amount of jobs. - for job_id, run_cmd, submit, created_time in jobs: + for job_id in pending_job_ids: with filelock.FileLock(_get_lock_path(job_id)): + pending_job = _get_pending_job(job_id) + if pending_job is None: + # Pending job can be removed by another thread, due to the + # job being scheduled already. + continue + run_cmd = pending_job['run_cmd'] + submit = pending_job['submit'] + created_time = pending_job['created_time'] + # We don't have to refresh the job status before checking, as + # the job status will only be stale in rare cases where ray job + # crashes; or the job stays in INIT state for a long time. + # In those cases, the periodic JobSchedulerEvent event will + # update the job status every 300 seconds. status = get_status_no_lock(job_id) if (status not in _PRE_RESOURCE_STATUSES or created_time < psutil.boot_time()): @@ -202,8 +211,8 @@ def schedule_step(self, force_update_jobs: bool = False) -> None: self._run_job(job_id, run_cmd) return - def _get_jobs(self) -> List[Tuple[int, str, int, int]]: - """Returns the metadata for jobs in the pending jobs table + def _get_pending_job_ids(self) -> List[int]: + """Returns the job ids in the pending jobs table The information contains job_id, run command, submit time, creation time. @@ -214,9 +223,10 @@ def _get_jobs(self) -> List[Tuple[int, str, int, int]]: class FIFOScheduler(JobScheduler): """First in first out job scheduler""" - def _get_jobs(self) -> List[Tuple[int, str, int, int]]: - return list( - _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')) + def _get_pending_job_ids(self) -> List[int]: + rows = _CURSOR.execute( + 'SELECT job_id FROM pending_jobs ORDER BY job_id').fetchall() + return [row[0] for row in rows] scheduler = FIFOScheduler() @@ -513,11 +523,16 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]: def _get_pending_job(job_id: int) -> Optional[Dict[str, Any]]: - rows = _CURSOR.execute('SELECT created_time, submit FROM pending_jobs ' - f'WHERE job_id={job_id!r}') + rows = _CURSOR.execute( + 'SELECT created_time, submit, run_cmd FROM pending_jobs ' + f'WHERE job_id={job_id!r}') for row in rows: - created_time, submit = row - return {'created_time': created_time, 'submit': submit} + created_time, submit, run_cmd = row + return { + 'created_time': created_time, + 'submit': submit, + 'run_cmd': run_cmd + } return None @@ -534,25 +549,13 @@ def update_job_status(job_ids: List[int], This function should only be run on the remote instance with ray>=2.4.0. """ + echo = logger.info if not silent else logger.debug if len(job_ids) == 0: return [] - # TODO: if too slow, directly query against redis. ray_job_ids = [make_ray_job_id(job_id) for job_id in job_ids] - job_client = _create_ray_job_submission_client() - # In ray 2.4.0, job_client.list_jobs returns a list of JobDetails, - # which contains the job status (str) and submission_id (str). - ray_job_query_time = time.time() - job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs() - - job_details = {} - ray_job_ids_set = set(ray_job_ids) - for job_detail in job_detail_lists: - if job_detail.submission_id in ray_job_ids_set: - job_details[job_detail.submission_id] = job_detail - statuses = [] for job_id, ray_job_id in zip(job_ids, ray_job_ids): # Per-job status lock is required because between the job status @@ -560,15 +563,48 @@ def update_job_status(job_ids: List[int], # can be modified by the generated ray program. with filelock.FileLock(_get_lock_path(job_id)): status = None - if ray_job_id in job_details: - ray_status = job_details[ray_job_id].status - status = _RAY_TO_JOB_STATUS_MAP[ray_status] + job_record = _get_jobs_by_ids([job_id])[0] + original_status = job_record['status'] + job_submitted_at = job_record['submitted_at'] + + ray_job_query_time = time.time() + if original_status == JobStatus.INIT: + if (job_submitted_at >= psutil.boot_time() and job_submitted_at + >= ray_job_query_time - _PENDING_SUBMIT_GRACE_PERIOD): + # The job id is reserved, but the job is not submitted yet. + # We should keep it in INIT. + status = JobStatus.INIT + else: + # We always immediately submit job after the job id is + # allocated, i.e. INIT -> PENDING, if a job stays in INIT + # for too long, it is likely the job submission process + # was killed before the job is submitted. We should set it + # to FAILED then. Note, if ray job indicates the job is + # running, we will change status to PENDING below. + echo(f'INIT job {job_id} is stale, setting to FAILED') + status = JobStatus.FAILED + + try: + # Querying status within the lock is safer than querying + # outside, as it avoids the race condition when job table is + # updated after the ray job status query. + # Also, getting per-job status is faster than querying all jobs, + # when there are significant number of finished jobs. + # Reference: getting 124 finished jobs takes 0.038s, while + # querying a single job takes 0.006s, 10 jobs takes 0.066s. + # TODO: if too slow, directly query against redis. + ray_job_status = job_client.get_job_status(ray_job_id) + status = _RAY_TO_JOB_STATUS_MAP[ray_job_status.value] + except RuntimeError: + # Job not found. + pass + pending_job = _get_pending_job(job_id) if pending_job is not None: if pending_job['created_time'] < psutil.boot_time(): - logger.info(f'Job {job_id} is stale, setting to FAILED: ' - f'created_time={pending_job["created_time"]}, ' - f'boot_time={psutil.boot_time()}') + echo(f'Job {job_id} is stale, setting to FAILED: ' + f'created_time={pending_job["created_time"]}, ' + f'boot_time={psutil.boot_time()}') # The job is stale as it is created before the instance # is booted, e.g. the instance is rebooted. status = JobStatus.FAILED @@ -583,22 +619,20 @@ def update_job_status(job_ids: List[int], # as stale. status = JobStatus.PENDING - original_status = get_status_no_lock(job_id) assert original_status is not None, (job_id, status) if status is None: status = original_status if (original_status is not None and not original_status.is_terminal()): - logger.info(f'Ray job status for job {job_id} is None, ' - 'setting it to FAILED.') + echo(f'Ray job status for job {job_id} is None, ' + 'setting it to FAILED.') # The job may be stale, when the instance is restarted # (the ray redis is volatile). We need to reset the # status of the task to FAILED if its original status # is RUNNING or PENDING. status = JobStatus.FAILED _set_status_no_lock(job_id, status) - if not silent: - logger.info(f'Updated job {job_id} status to {status}') + echo(f'Updated job {job_id} status to {status}') else: # Taking max of the status is necessary because: # 1. It avoids race condition, where the original status has @@ -611,10 +645,10 @@ def update_job_status(job_ids: List[int], # DB) would already have that value. So we take the max here to # keep it at later status. status = max(status, original_status) + assert status is not None, (job_id, status, original_status) if status != original_status: # Prevents redundant update. _set_status_no_lock(job_id, status) - if not silent: - logger.info(f'Updated job {job_id} status to {status}') + echo(f'Updated job {job_id} status to {status}') statuses.append(status) return statuses @@ -768,7 +802,9 @@ def cancel_jobs_encoded_results(jobs: Optional[List[int]], logger.warning(str(e)) continue - if job['status'] in [ + # Get the job status again to avoid race condition. + job_status = get_status_no_lock(job['job_id']) + if job_status in [ JobStatus.PENDING, JobStatus.SETTING_UP, JobStatus.RUNNING ]: _set_status_no_lock(job['job_id'], JobStatus.CANCELLED) diff --git a/sky/templates/azure-ray.yml.j2 b/sky/templates/azure-ray.yml.j2 index 7b9737748d3..1140704a708 100644 --- a/sky/templates/azure-ray.yml.j2 +++ b/sky/templates/azure-ray.yml.j2 @@ -75,9 +75,6 @@ available_node_types: {%- if use_spot %} # optionally set priority to use Spot instances priority: Spot - # set a maximum price for spot instances if desired - # billingProfile: - # maxPrice: -1 {%- endif %} cloudInitSetupCommands: |- {%- for cmd in cloud_init_setup_commands %} diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 3049d1e840c..c4ed8fd0472 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -25,6 +25,7 @@ # Change cloud for generic tests to aws # > pytest tests/test_smoke.py --generic-cloud aws +import enum import inspect import json import os @@ -36,7 +37,7 @@ import tempfile import textwrap import time -from typing import Dict, List, NamedTuple, Optional, Tuple +from typing import Dict, List, NamedTuple, Optional, TextIO, Tuple import urllib.parse import uuid @@ -58,8 +59,11 @@ from sky.data import data_utils from sky.data import storage as storage_lib from sky.data.data_utils import Rclone +from sky.jobs.state import ManagedJobStatus from sky.skylet import constants from sky.skylet import events +from sky.skylet.job_lib import JobStatus +from sky.status_lib import ClusterStatus from sky.utils import common_utils from sky.utils import resources_utils from sky.utils import subprocess_utils @@ -95,6 +99,165 @@ 'sleep 10; s=$(sky jobs queue);' 'echo "Waiting for job to stop RUNNING"; echo "$s"; done') +# Cluster functions +_ALL_JOB_STATUSES = "|".join([status.value for status in JobStatus]) +_ALL_CLUSTER_STATUSES = "|".join([status.value for status in ClusterStatus]) +_ALL_MANAGED_JOB_STATUSES = "|".join( + [status.value for status in ManagedJobStatus]) + + +def _statuses_to_str(statuses: List[enum.Enum]): + """Convert a list of enums to a string with all the values separated by |.""" + assert len(statuses) > 0, 'statuses must not be empty' + if len(statuses) > 1: + return '(' + '|'.join([status.value for status in statuses]) + ')' + else: + return statuses[0].value + + +_WAIT_UNTIL_CLUSTER_STATUS_CONTAINS = ( + # A while loop to wait until the cluster status + # becomes certain status, with timeout. + 'start_time=$SECONDS; ' + 'while true; do ' + 'if (( $SECONDS - $start_time > {timeout} )); then ' + ' echo "Timeout after {timeout} seconds waiting for cluster status \'{cluster_status}\'"; exit 1; ' + 'fi; ' + 'current_status=$(sky status {cluster_name} --refresh | ' + 'awk "/^{cluster_name}/ ' + '{{for (i=1; i<=NF; i++) if (\$i ~ /^(' + _ALL_CLUSTER_STATUSES + + ')$/) print \$i}}"); ' + 'if [[ "$current_status" =~ {cluster_status} ]]; ' + 'then echo "Target cluster status {cluster_status} reached."; break; fi; ' + 'echo "Waiting for cluster status to become {cluster_status}, current status: $current_status"; ' + 'sleep 10; ' + 'done') + + +def _get_cmd_wait_until_cluster_status_contains( + cluster_name: str, cluster_status: List[ClusterStatus], timeout: int): + return _WAIT_UNTIL_CLUSTER_STATUS_CONTAINS.format( + cluster_name=cluster_name, + cluster_status=_statuses_to_str(cluster_status), + timeout=timeout) + + +def _get_cmd_wait_until_cluster_status_contains_wildcard( + cluster_name_wildcard: str, cluster_status: List[ClusterStatus], + timeout: int): + wait_cmd = _WAIT_UNTIL_CLUSTER_STATUS_CONTAINS.replace( + 'sky status {cluster_name}', + 'sky status "{cluster_name}"').replace('awk "/^{cluster_name}/', + 'awk "/^{cluster_name_awk}/') + return wait_cmd.format(cluster_name=cluster_name_wildcard, + cluster_name_awk=cluster_name_wildcard.replace( + '*', '.*'), + cluster_status=_statuses_to_str(cluster_status), + timeout=timeout) + + +_WAIT_UNTIL_CLUSTER_IS_NOT_FOUND = ( + # A while loop to wait until the cluster is not found or timeout + 'start_time=$SECONDS; ' + 'while true; do ' + 'if (( $SECONDS - $start_time > {timeout} )); then ' + ' echo "Timeout after {timeout} seconds waiting for cluster to be removed"; exit 1; ' + 'fi; ' + 'if sky status -r {cluster_name}; sky status {cluster_name} | grep "{cluster_name} not found"; then ' + ' echo "Cluster {cluster_name} successfully removed."; break; ' + 'fi; ' + 'echo "Waiting for cluster {cluster_name} to be removed..."; ' + 'sleep 10; ' + 'done') + + +def _get_cmd_wait_until_cluster_is_not_found(cluster_name: str, timeout: int): + return _WAIT_UNTIL_CLUSTER_IS_NOT_FOUND.format(cluster_name=cluster_name, + timeout=timeout) + + +_WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID = ( + # A while loop to wait until the job status + # contains certain status, with timeout. + 'start_time=$SECONDS; ' + 'while true; do ' + 'if (( $SECONDS - $start_time > {timeout} )); then ' + ' echo "Timeout after {timeout} seconds waiting for job status \'{job_status}\'"; exit 1; ' + 'fi; ' + 'current_status=$(sky queue {cluster_name} | ' + 'awk "\\$1 == \\"{job_id}\\" ' + '{{for (i=1; i<=NF; i++) if (\$i ~ /^(' + _ALL_JOB_STATUSES + + ')$/) print \$i}}"); ' + 'found=0; ' # Initialize found variable outside the loop + 'while read -r line; do ' # Read line by line + ' if [[ "$line" =~ {job_status} ]]; then ' # Check each line + ' echo "Target job status {job_status} reached."; ' + ' found=1; ' + ' break; ' # Break inner loop + ' fi; ' + 'done <<< "$current_status"; ' + 'if [ "$found" -eq 1 ]; then break; fi; ' # Break outer loop if match found + 'echo "Waiting for job status to contains {job_status}, current status: $current_status"; ' + 'sleep 10; ' + 'done') + +_WAIT_UNTIL_JOB_STATUS_CONTAINS_WITHOUT_MATCHING_JOB = _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.replace( + 'awk "\\$1 == \\"{job_id}\\"', 'awk "') + +_WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME = _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.replace( + 'awk "\\$1 == \\"{job_id}\\"', 'awk "\\$2 == \\"{job_name}\\"') + + +def _get_cmd_wait_until_job_status_contains_matching_job_id( + cluster_name: str, job_id: str, job_status: List[JobStatus], + timeout: int): + return _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.format( + cluster_name=cluster_name, + job_id=job_id, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + +def _get_cmd_wait_until_job_status_contains_without_matching_job( + cluster_name: str, job_status: List[JobStatus], timeout: int): + return _WAIT_UNTIL_JOB_STATUS_CONTAINS_WITHOUT_MATCHING_JOB.format( + cluster_name=cluster_name, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + +def _get_cmd_wait_until_job_status_contains_matching_job_name( + cluster_name: str, job_name: str, job_status: List[JobStatus], + timeout: int): + return _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + cluster_name=cluster_name, + job_name=job_name, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + +# Managed job functions + +_WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME = _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.replace( + 'sky queue {cluster_name}', 'sky jobs queue').replace( + 'awk "\\$2 == \\"{job_name}\\"', + 'awk "\\$2 == \\"{job_name}\\" || \\$3 == \\"{job_name}\\"').replace( + _ALL_JOB_STATUSES, _ALL_MANAGED_JOB_STATUSES) + + +def _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name: str, job_status: List[JobStatus], timeout: int): + return _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + job_name=job_name, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + +# After the timeout, the cluster will stop if autostop is set, and our check +# should be more than the timeout. To address this, we extend the timeout by +# _BUMP_UP_SECONDS before exiting. +_BUMP_UP_SECONDS = 35 + DEFAULT_CMD_TIMEOUT = 15 * 60 @@ -745,6 +908,12 @@ def test_clone_disk_aws(): f'sky launch -y -c {name} --cloud aws --region us-east-2 --retry-until-up "echo hello > ~/user_file.txt"', f'sky launch --clone-disk-from {name} -y -c {name}-clone && exit 1 || true', f'sky stop {name} -y', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=60), + # Wait for EC2 instance to be in stopped state. + # TODO: event based wait. 'sleep 60', f'sky launch --clone-disk-from {name} -y -c {name}-clone --cloud aws -d --region us-east-2 "cat ~/user_file.txt | grep hello"', f'sky launch --clone-disk-from {name} -y -c {name}-clone-2 --cloud aws -d --region us-east-2 "cat ~/user_file.txt | grep hello"', @@ -791,8 +960,8 @@ def test_gcp_mig(): # Check MIG exists. f'gcloud compute instance-groups managed list --format="value(name)" | grep "^sky-mig-{name}"', f'sky autostop -i 0 --down -y {name}', - 'sleep 120', - f'sky status -r {name}; sky status {name} | grep "{name} not found"', + _get_cmd_wait_until_cluster_is_not_found(cluster_name=name, + timeout=120), f'gcloud compute instance-templates list | grep "sky-it-{name}"', # Launch again with the same region. The original instance template # should be removed. @@ -859,8 +1028,10 @@ def test_custom_default_conda_env(generic_cloud: str): f'sky exec {name} tests/test_yamls/test_custom_default_conda_env.yaml', f'sky logs {name} 2 --status', f'sky autostop -y -i 0 {name}', - 'sleep 60', - f'sky status -r {name} | grep "STOPPED"', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=80), f'sky start -y {name}', f'sky logs {name} 2 --no-follow | grep -E "myenv\\s+\\*"', f'sky exec {name} tests/test_yamls/test_custom_default_conda_env.yaml', @@ -881,7 +1052,10 @@ def test_stale_job(generic_cloud: str): f'sky launch -y -c {name} --cloud {generic_cloud} "echo hi"', f'sky exec {name} -d "echo start; sleep 10000"', f'sky stop {name} -y', - 'sleep 100', # Ensure this is large enough, else GCP leaks. + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=100), f'sky start {name} -y', f'sky logs {name} 1 --status', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', @@ -909,13 +1083,18 @@ def test_aws_stale_job_manual_restart(): '--output text`; ' f'aws ec2 stop-instances --region {region} ' '--instance-ids $id', - 'sleep 40', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=40), f'sky launch -c {name} -y "echo hi"', f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}', - f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', + _get_cmd_wait_until_job_status_contains_without_matching_job( + cluster_name=name, + job_status=[JobStatus.FAILED], + timeout=events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS), ], f'sky down -y {name}', ) @@ -945,8 +1124,10 @@ def test_gcp_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}', - f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', + _get_cmd_wait_until_job_status_contains_without_matching_job( + cluster_name=name, + job_status=[JobStatus.FAILED.value], + timeout=events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS) ], f'sky down -y {name}', ) @@ -1040,34 +1221,68 @@ def test_using_file_mounts_with_env_vars(generic_cloud: str): # ---------- storage ---------- + + +def _storage_mounts_commands_generator(f: TextIO, cluster_name: str, + storage_name: str, ls_hello_command: str, + cloud: str, only_mount: bool): + template_str = pathlib.Path( + 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() + template = jinja2.Template(template_str) + content = template.render( + storage_name=storage_name, + cloud=cloud, + only_mount=only_mount, + ) + f.write(content) + f.flush() + file_path = f.name + test_commands = [ + *STORAGE_SETUP_COMMANDS, + f'sky launch -y -c {cluster_name} --cloud {cloud} {file_path}', + f'sky logs {cluster_name} 1 --status', # Ensure job succeeded. + ls_hello_command, + f'sky stop -y {cluster_name}', + f'sky start -y {cluster_name}', + # Check if hello.txt from mounting bucket exists after restart in + # the mounted directory + f'sky exec {cluster_name} -- "set -ex; ls /mount_private_mount/hello.txt"', + ] + clean_command = f'sky down -y {cluster_name}; sky storage delete -y {storage_name}' + return test_commands, clean_command + + @pytest.mark.aws def test_aws_storage_mounts_with_stop(): name = _get_cluster_name() cloud = 'aws' storage_name = f'sky-test-{int(time.time())}' - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name, cloud=cloud) + ls_hello_command = f'aws s3 ls {storage_name}/hello.txt' with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud {cloud} {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'aws s3 ls {storage_name}/hello.txt', - f'sky stop -y {name}', - f'sky start -y {name}', - # Check if hello.txt from mounting bucket exists after restart in - # the mounted directory - f'sky exec {name} -- "set -ex; ls /mount_private_mount/hello.txt"' - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, False) test = Test( 'aws_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, + timeout=20 * 60, # 20 mins + ) + run_one_test(test) + + +@pytest.mark.aws +def test_aws_storage_mounts_with_stop_only_mount(): + name = _get_cluster_name() + cloud = 'aws' + storage_name = f'sky-test-{int(time.time())}' + ls_hello_command = f'aws s3 ls {storage_name}/hello.txt' + with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, True) + test = Test( + 'aws_storage_mounts_only_mount', + test_commands, + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1078,29 +1293,14 @@ def test_gcp_storage_mounts_with_stop(): name = _get_cluster_name() cloud = 'gcp' storage_name = f'sky-test-{int(time.time())}' - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name, cloud=cloud) + ls_hello_command = f'gsutil ls gs://{storage_name}/hello.txt' with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud {cloud} {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'gsutil ls gs://{storage_name}/hello.txt', - f'sky stop -y {name}', - f'sky start -y {name}', - # Check if hello.txt from mounting bucket exists after restart in - # the mounted directory - f'sky exec {name} -- "set -ex; ls /mount_private_mount/hello.txt"' - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, False) test = Test( 'gcp_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1116,31 +1316,19 @@ def test_azure_storage_mounts_with_stop(): get_default_storage_account_name(default_region)) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name, cloud=cloud) + # if the file does not exist, az storage blob list returns '[]' + ls_hello_command = (f'output=$(az storage blob list -c {storage_name} ' + f'--account-name {storage_account_name} ' + f'--account-key {storage_account_key} ' + f'--prefix hello.txt) ' + f'[ "$output" = "[]" ] && exit 1 || exit 0') with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud {cloud} {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'output=$(az storage blob list -c {storage_name} --account-name {storage_account_name} --account-key {storage_account_key} --prefix hello.txt)' - # if the file does not exist, az storage blob list returns '[]' - f'[ "$output" = "[]" ] && exit 1;' - f'sky stop -y {name}', - f'sky start -y {name}', - # Check if hello.txt from mounting bucket exists after restart in - # the mounted directory - f'sky exec {name} -- "set -ex; ls /mount_private_mount/hello.txt"' - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, False) test = Test( 'azure_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1153,25 +1341,15 @@ def test_kubernetes_storage_mounts(): # built for x86_64 only. name = _get_cluster_name() storage_name = f'sky-test-{int(time.time())}' - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name) + ls_hello_command = (f'aws s3 ls {storage_name}/hello.txt || ' + f'gsutil ls gs://{storage_name}/hello.txt') with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud kubernetes {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'aws s3 ls {storage_name}/hello.txt || ' - f'gsutil ls gs://{storage_name}/hello.txt', - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, 'kubernetes', False) test = Test( 'kubernetes_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1623,6 +1801,7 @@ def test_large_job_queue(generic_cloud: str): f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', 'sleep 90', + # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep PENDING | wc -l | grep 43', @@ -1750,10 +1929,27 @@ def test_multi_echo(generic_cloud: str): 'multi_echo', [ f'python examples/multi_echo.py {name} {generic_cloud}', - 'sleep 120', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true', + 'sleep 10', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true', + 'sleep 30', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true', + 'sleep 30', + # Make sure that our job scheduler is fast enough to have at least + # 10 RUNNING jobs in parallel. + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "RUNNING" | wc -l | awk \'{{if ($1 < 10) exit 1}}\'', + 'sleep 30', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true', + f'until sky logs {name} 32 --status; do echo "Waiting for job 32 to finish..."; sleep 1; done', ] + # Ensure jobs succeeded. - [f'sky logs {name} {i + 1} --status' for i in range(32)] + + [ + _get_cmd_wait_until_job_status_contains_matching_job_id( + cluster_name=name, + job_id=i + 1, + job_status=[JobStatus.SUCCEEDED], + timeout=120) for i in range(32) + ] + # Ensure monitor/autoscaler didn't crash on the 'assert not # unfulfilled' error. If process not found, grep->ssh returns 1. [f'ssh {name} \'ps aux | grep "[/]"monitor.py\''], @@ -2306,12 +2502,17 @@ def test_gcp_start_stop(): f'sky exec {name} "prlimit -n --pid=\$(pgrep -f \'raylet/raylet --raylet_socket_name\') | grep \'"\'1048576 1048576\'"\'"', # Ensure the raylet process has the correct file descriptor limit. f'sky logs {name} 3 --status', # Ensure the job succeeded. f'sky stop -y {name}', - f'sleep 20', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=40), f'sky start -y {name} -i 1', f'sky exec {name} examples/gcp_start_stop.yaml', f'sky logs {name} 4 --status', # Ensure the job succeeded. - 'sleep 180', - f'sky status -r {name} | grep "INIT\|STOPPED"', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED, ClusterStatus.INIT], + timeout=200), ], f'sky down -y {name}', ) @@ -2334,9 +2535,11 @@ def test_azure_start_stop(): f'sky start -y {name} -i 1', f'sky exec {name} examples/azure_start_stop.yaml', f'sky logs {name} 3 --status', # Ensure the job succeeded. - 'sleep 260', - f's=$(sky status -r {name}) && echo "$s" && echo "$s" | grep "INIT\|STOPPED"' - f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}' + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED, ClusterStatus.INIT], + timeout=280) + + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}', ], f'sky down -y {name}', timeout=30 * 60, # 30 mins @@ -2372,8 +2575,10 @@ def test_autostop(generic_cloud: str): f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', # Ensure the cluster is STOPPED. - f'sleep {autostop_timeout}', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep STOPPED', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=autostop_timeout), # Ensure the cluster is UP and the autostop setting is reset ('-'). f'sky start -y {name}', @@ -2389,8 +2594,10 @@ def test_autostop(generic_cloud: str): f'sky autostop -y {name} -i 1', # Should restart the timer. 'sleep 40', f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', - f'sleep {autostop_timeout}', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep STOPPED', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=autostop_timeout), # Test restarting the idleness timer via exec: f'sky start -y {name}', @@ -2399,9 +2606,10 @@ def test_autostop(generic_cloud: str): 'sleep 45', # Almost reached the threshold. f'sky exec {name} echo hi', # Should restart the timer. 'sleep 45', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', - f'sleep {autostop_timeout}', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep STOPPED', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=autostop_timeout + _BUMP_UP_SECONDS), ], f'sky down -y {name}', timeout=total_timeout_minutes * 60, @@ -2604,6 +2812,34 @@ def test_use_spot(generic_cloud: str): run_one_test(test) +@pytest.mark.azure +def test_azure_spot_instance_verification(): + """Test Azure spot instance provisioning with explicit verification. + This test verifies that when --use-spot is specified for Azure: + 1. The cluster launches successfully + 2. The instances are actually provisioned as spot instances + """ + name = _get_cluster_name() + test = Test( + 'azure-spot-verification', + [ + f'sky launch -c {name} --cloud azure tests/test_yamls/minimal.yaml --use-spot -y', + f'sky logs {name} 1 --status', f'TARGET_VM_NAME="{name}"; ' + 'VM_INFO=$(az vm list --query "[?contains(name, \'$TARGET_VM_NAME\')].{Name:name, ResourceGroup:resourceGroup}" -o tsv); ' + '[[ -z "$VM_INFO" ]] && exit 1; ' + 'FULL_VM_NAME=$(echo "$VM_INFO" | awk \'{print $1}\'); ' + 'RESOURCE_GROUP=$(echo "$VM_INFO" | awk \'{print $2}\'); ' + 'VM_DETAILS=$(az vm list --resource-group "$RESOURCE_GROUP" ' + '--query "[?name==\'$FULL_VM_NAME\'].{Name:name, Location:location, Priority:priority}" -o table); ' + '[[ -z "$VM_DETAILS" ]] && exit 1; ' + 'echo "VM Details:"; echo "$VM_DETAILS"; ' + 'echo "$VM_DETAILS" | grep -qw "Spot" && exit 0 || exit 1' + ], + f'sky down -y {name}', + ) + run_one_test(test) + + @pytest.mark.gcp def test_stop_gcp_spot(): """Test GCP spot can be stopped, autostopped, restarted.""" @@ -2618,15 +2854,19 @@ def test_stop_gcp_spot(): f'sky exec {name} -- ls myfile', f'sky logs {name} 2 --status', f'sky autostop {name} -i0 -y', - 'sleep 90', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep STOPPED', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=90), f'sky start {name} -y', f'sky exec {name} -- ls myfile', f'sky logs {name} 3 --status', # -i option at launch should go through: f'sky launch -c {name} -i0 -y', - 'sleep 120', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep STOPPED', + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=120), ], f'sky down -y {name}', ) @@ -2646,14 +2886,25 @@ def test_managed_jobs(generic_cloud: str): [ f'sky jobs launch -n {name}-1 --cloud {generic_cloud} examples/managed_job.yaml -y -d', f'sky jobs launch -n {name}-2 --cloud {generic_cloud} examples/managed_job.yaml -y -d', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-1 | head -n1 | grep "PENDING\|SUBMITTED\|STARTING\|RUNNING"', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "PENDING\|SUBMITTED\|STARTING\|RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-1', + job_status=[ + ManagedJobStatus.PENDING, ManagedJobStatus.SUBMITTED, + ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + ], + timeout=60), + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ + ManagedJobStatus.PENDING, ManagedJobStatus.SUBMITTED, + ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + ], + timeout=60), f'sky jobs cancel -y -n {name}-1', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-1 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name}-1 | head -n1 | grep CANCELLED', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-1', + job_status=[ManagedJobStatus.CANCELLED], + timeout=230), # Test the functionality for logging. f's=$(sky jobs logs -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "start counting"', f's=$(sky jobs logs --controller -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "Cluster launched:"', @@ -2723,9 +2974,11 @@ def test_managed_jobs_failed_setup(generic_cloud: str): 'managed_jobs_failed_setup', [ f'sky jobs launch -n {name} --cloud {generic_cloud} -y -d tests/test_yamls/failed_setup.yaml', - 'sleep 330', # Make sure the job failed quickly. - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "FAILED_SETUP"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.FAILED_SETUP], + timeout=330 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -2748,7 +3001,10 @@ def test_managed_jobs_pipeline_failed_setup(generic_cloud: str): 'managed_jobs_pipeline_failed_setup', [ f'sky jobs launch -n {name} -y -d tests/test_yamls/failed_setup_pipeline.yaml', - 'sleep 600', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.FAILED_SETUP], + timeout=600), # Make sure the job failed quickly. f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "FAILED_SETUP"', # Task 0 should be SUCCEEDED. @@ -2782,8 +3038,10 @@ def test_managed_jobs_recovery_aws(aws_config_region): 'managed_jobs_recovery_aws', [ f'sky jobs launch --cloud aws --region {region} --use-spot -n {name} "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 360', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=600), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the cluster manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' @@ -2793,8 +3051,10 @@ def test_managed_jobs_recovery_aws(aws_config_region): '--output text)'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo "$RUN_ID"; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -2822,15 +3082,19 @@ def test_managed_jobs_recovery_gcp(): 'managed_jobs_recovery_gcp', [ f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot --cpus 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 360', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=300), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the cluster manually. terminate_cmd, _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo "$RUN_ID"; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -2853,8 +3117,10 @@ def test_managed_jobs_pipeline_recovery_aws(aws_config_region): 'managed_jobs_pipeline_recovery_aws', [ f'sky jobs launch -n {name} tests/test_yamls/pipeline_aws.yaml -y -d', - 'sleep 400', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids', # Terminate the cluster manually. @@ -2873,8 +3139,10 @@ def test_managed_jobs_pipeline_recovery_aws(aws_config_region): '--output text)'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids-new', f'diff /tmp/{name}-run-ids /tmp/{name}-run-ids-new', @@ -2904,8 +3172,10 @@ def test_managed_jobs_pipeline_recovery_gcp(): 'managed_jobs_pipeline_recovery_gcp', [ f'sky jobs launch -n {name} tests/test_yamls/pipeline_gcp.yaml -y -d', - 'sleep 400', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids', # Terminate the cluster manually. @@ -2916,8 +3186,10 @@ def test_managed_jobs_pipeline_recovery_gcp(): f'cut -d\'_\' -f1 | rev | cut -d\'-\' -f1`; {terminate_cmd}'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids-new', f'diff /tmp/{name}-run-ids /tmp/{name}-run-ids-new', @@ -2943,8 +3215,12 @@ def test_managed_jobs_recovery_default_resources(generic_cloud: str): 'managed-spot-recovery-default-resources', [ f'sky jobs launch -n {name} --cloud {generic_cloud} --use-spot "sleep 30 && sudo shutdown now && sleep 1000" -y -d', - 'sleep 360', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING\|RECOVERING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.RUNNING, ManagedJobStatus.RECOVERING + ], + timeout=360), ], f'sky jobs cancel -y -n {name}', timeout=25 * 60, @@ -2964,8 +3240,10 @@ def test_managed_jobs_recovery_multi_node_aws(aws_config_region): 'managed_jobs_recovery_multi_node_aws', [ f'sky jobs launch --cloud aws --region {region} -n {name} --use-spot --num-nodes 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 450', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=450), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the worker manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' @@ -2976,8 +3254,10 @@ def test_managed_jobs_recovery_multi_node_aws(aws_config_region): '--output text)'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 560', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=560), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2 | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -3005,15 +3285,19 @@ def test_managed_jobs_recovery_multi_node_gcp(): 'managed_jobs_recovery_multi_node_gcp', [ f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot --num-nodes 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 400', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the worker manually. terminate_cmd, _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 420', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=560), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2 | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -3038,13 +3322,17 @@ def test_managed_jobs_cancellation_aws(aws_config_region): [ # Test cancellation during spot cluster being launched. f'sky jobs launch --cloud aws --region {region} -n {name} --use-spot "sleep 1000" -y -d', - 'sleep 60', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING\|RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + ], + timeout=60 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_on_cloud}-* ' f'--query Reservations[].Instances[].State[].Name ' @@ -3052,12 +3340,16 @@ def test_managed_jobs_cancellation_aws(aws_config_region): ), # Test cancelling the spot cluster during spot job being setup. f'sky jobs launch --cloud aws --region {region} -n {name}-2 --use-spot tests/test_yamls/test_long_setup.yaml -y -d', - 'sleep 300', + # The job is set up in the cluster, will shown as RUNNING. + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}-2', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_2_on_cloud}-* ' f'--query Reservations[].Instances[].State[].Name ' @@ -3065,8 +3357,11 @@ def test_managed_jobs_cancellation_aws(aws_config_region): ), # Test cancellation during spot job is recovering. f'sky jobs launch --cloud aws --region {region} -n {name}-3 --use-spot "sleep 1000" -y -d', - 'sleep 300', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RUNNING"', + # The job is running in the cluster, will shown as RUNNING. + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), # Terminate the cluster manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' f'aws ec2 describe-instances --region {region} ' @@ -3076,10 +3371,10 @@ def test_managed_jobs_cancellation_aws(aws_config_region): _JOB_WAIT_NOT_RUNNING.format(job_name=f'{name}-3'), f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RECOVERING"', f'sky jobs cancel -y -n {name}-3', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. (f's=$(aws ec2 describe-instances --region {region} ' @@ -3114,34 +3409,42 @@ def test_managed_jobs_cancellation_gcp(): [ # Test cancellation during spot cluster being launched. f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot "sleep 1000" -y -d', - 'sleep 60', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.STARTING], + timeout=60 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # Test cancelling the spot cluster during spot job being setup. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-2 --use-spot tests/test_yamls/test_long_setup.yaml -y -d', - 'sleep 300', + # The job is set up in the cluster, will shown as RUNNING. + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}-2', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # Test cancellation during spot job is recovering. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-3 --use-spot "sleep 1000" -y -d', - 'sleep 300', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), # Terminate the cluster manually. terminate_cmd, _JOB_WAIT_NOT_RUNNING.format(job_name=f'{name}-3'), f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RECOVERING"', f'sky jobs cancel -y -n {name}-3', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # The cluster should be terminated (STOPPING) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. (f's=$({query_state_cmd}) && echo "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "PROVISIONING|STAGING|RUNNING|REPAIRING|TERMINATED|SUSPENDING|SUSPENDED|SUSPENDED"' @@ -3231,8 +3534,10 @@ def test_managed_jobs_storage(generic_cloud: str): *STORAGE_SETUP_COMMANDS, f'sky jobs launch -n {name}{use_spot} --cloud {generic_cloud}{region_flag} {file_path} -y', region_validation_cmd, # Check if the bucket is created in the correct region - 'sleep 60', # Wait the spot queue to be updated - f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.SUCCEEDED], + timeout=60 + _BUMP_UP_SECONDS), f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', # Check if file was written to the mounted output bucket output_check_cmd @@ -3256,10 +3561,17 @@ def test_managed_jobs_tpu(): 'test-spot-tpu', [ f'sky jobs launch -n {name} --use-spot examples/tpu/tpuvm_mnist.yaml -y -d', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep STARTING', - 'sleep 900', # TPU takes a while to launch - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING\|SUCCEEDED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.STARTING], + timeout=60 + _BUMP_UP_SECONDS), + # TPU takes a while to launch + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.RUNNING, ManagedJobStatus.SUCCEEDED + ], + timeout=900 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -3277,8 +3589,10 @@ def test_managed_jobs_inline_env(generic_cloud: str): 'test-managed-jobs-inline-env', [ f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', - 'sleep 20', - f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.SUCCEEDED], + timeout=20 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -3385,8 +3699,10 @@ def test_azure_start_stop_two_nodes(): f'sky start -y {name} -i 1', f'sky exec --num-nodes=2 {name} examples/azure_start_stop.yaml', f'sky logs {name} 2 --status', # Ensure the job succeeded. - 'sleep 200', - f's=$(sky status -r {name}) && echo "$s" && echo "$s" | grep "INIT\|STOPPED"' + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.INIT, ClusterStatus.STOPPED], + timeout=200 + _BUMP_UP_SECONDS) + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}' ], f'sky down -y {name}', diff --git a/tests/test_yamls/pipeline_aws.yaml b/tests/test_yamls/pipeline_aws.yaml index 9971f30bc66..962183bd992 100644 --- a/tests/test_yamls/pipeline_aws.yaml +++ b/tests/test_yamls/pipeline_aws.yaml @@ -5,7 +5,7 @@ name: a resources: cloud: aws - region: us-west-2 + region: us-east-2 cpus: 2+ run: | diff --git a/tests/test_yamls/test_storage_mounting.yaml.j2 b/tests/test_yamls/test_storage_mounting.yaml.j2 index 4241c63409e..651fb190012 100644 --- a/tests/test_yamls/test_storage_mounting.yaml.j2 +++ b/tests/test_yamls/test_storage_mounting.yaml.j2 @@ -20,13 +20,13 @@ file_mounts: /mount_private_copy: name: {{storage_name}} source: ~/tmp-workdir - mode: COPY + mode: {% if only_mount | default(false) %}MOUNT{% else %}COPY{% endif %} # Mounting private buckets in COPY mode with a list of files as source /mount_private_copy_lof: name: {{storage_name}} source: ['~/tmp-workdir/tmp file', '~/tmp-workdir/tmp file2'] - mode: COPY + mode: {% if only_mount | default(false) %}MOUNT{% else %}COPY{% endif %} {% if include_private_mount | default(True) %} # Mounting private buckets in MOUNT mode @@ -38,7 +38,7 @@ file_mounts: run: | set -ex - + # Check public bucket contents ls -ltr /mount_public_s3/corpora ls -ltr /mount_public_gcp/tiles @@ -55,12 +55,12 @@ run: | ls -ltr /mount_private_mount/foo ls -ltr /mount_private_mount/tmp\ file {% endif %} - + # Symlinks are not copied to buckets ! ls /mount_private_copy/circle-link {% if include_private_mount | default(True) %} ! ls /mount_private_mount/circle-link - + # Write to private bucket in MOUNT mode should pass echo "hello" > /mount_private_mount/hello.txt {% endif %} diff --git a/tests/unit_tests/test_recovery_strategy.py b/tests/unit_tests/test_recovery_strategy.py new file mode 100644 index 00000000000..da8e8142da0 --- /dev/null +++ b/tests/unit_tests/test_recovery_strategy.py @@ -0,0 +1,48 @@ +from unittest import mock + +from sky.exceptions import ClusterDoesNotExist +from sky.jobs import recovery_strategy + + +@mock.patch('sky.down') +@mock.patch('sky.usage.usage_lib.messages.usage.set_internal') +def test_terminate_cluster_retry_on_value_error(mock_set_internal, + mock_sky_down) -> None: + # Set up mock to fail twice with ValueError, then succeed + mock_sky_down.side_effect = [ + ValueError('Mock error 1'), + ValueError('Mock error 2'), + None, + ] + + # Call should succeed after retries + recovery_strategy.terminate_cluster('test-cluster') + + # Verify sky.down was called 3 times + assert mock_sky_down.call_count == 3 + mock_sky_down.assert_has_calls([ + mock.call('test-cluster'), + mock.call('test-cluster'), + mock.call('test-cluster'), + ]) + + # Verify usage.set_internal was called before each sky.down + assert mock_set_internal.call_count == 3 + + +@mock.patch('sky.down') +@mock.patch('sky.usage.usage_lib.messages.usage.set_internal') +def test_terminate_cluster_handles_nonexistent_cluster(mock_set_internal, + mock_sky_down) -> None: + # Set up mock to raise ClusterDoesNotExist + mock_sky_down.side_effect = ClusterDoesNotExist('test-cluster') + + # Call should succeed silently + recovery_strategy.terminate_cluster('test-cluster') + + # Verify sky.down was called once + assert mock_sky_down.call_count == 1 + mock_sky_down.assert_called_once_with('test-cluster') + + # Verify usage.set_internal was called once + assert mock_set_internal.call_count == 1