Skip to content

Commit

Permalink
[Provisioner] Robustify the termiantion for provision failure to avoi…
Browse files Browse the repository at this point in the history
…d leakage (#2990)

* Robustify the termiantion for provision failure to avoid leakage

* Update sky/provision/provisioner.py

Co-authored-by: Zongheng Yang <[email protected]>

* Update sky/provision/provisioner.py

Co-authored-by: Zongheng Yang <[email protected]>

* address comments

* format

* fix

* format

* address comments

---------

Co-authored-by: Zongheng Yang <[email protected]>
  • Loading branch information
Michaelvll and concretevitamin authored Jan 21, 2024
1 parent 351916f commit cbd8fc8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
3 changes: 3 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,9 @@ def _retry_zones(
config_dict['resources_vars'] = resources_vars
config_dict['handle'] = handle
return config_dict
except provision_common.StopFailoverError:
with ux_utils.print_exception_no_traceback():
raise
except Exception as e: # pylint: disable=broad-except
# NOTE: We try to cleanup the cluster even if the previous
# cluster does not exist. Also we are fast at
Expand Down
8 changes: 8 additions & 0 deletions sky/provision/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ class ProvisionerError(RuntimeError):
errors: List[Dict[str, str]]


class StopFailoverError(Exception):
"""Exception for stopping failover.
It will be raised when failed to cleaning up resources after a failed
provision, so the caller should stop the failover process and raise.
"""


@dataclasses.dataclass
class ProvisionConfig:
"""Configuration for provisioning."""
Expand Down
57 changes: 50 additions & 7 deletions sky/provision/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
# which will be customized in sky.provision.logging.
logger = sky_logging.init_logger('sky.provisioner')

# The maximum number of retries for waiting for instances to be ready and
# teardown instances when provisioning fails.
_MAX_RETRY = 3
_TITLE = '\n\n' + '=' * 20 + ' {} ' + '=' * 20 + '\n'

Expand Down Expand Up @@ -138,7 +140,14 @@ def bulk_provision(
prev_cluster_ever_up: bool,
log_dir: str,
) -> provision_common.ProvisionRecord:
"""Provisions a cluster and wait until fully provisioned."""
"""Provisions a cluster and wait until fully provisioned.
Raises:
StopFailoverError: Raised when during failover cleanup, tearing
down any potentially live cluster failed despite retries
Cloud specific exceptions: If the provisioning process failed, cloud-
specific exceptions will be raised by the cloud APIs.
"""
original_config = common_utils.read_yaml(cluster_yaml)
head_node_type = original_config['head_node_type']
bootstrap_config = provision_common.ProvisionConfig(
Expand Down Expand Up @@ -173,16 +182,43 @@ def bulk_provision(
terminate = not prev_cluster_ever_up
terminate_str = ('Terminating' if terminate else 'Stopping')
logger.debug(f'{terminate_str} the failed cluster.')
teardown_cluster(repr(cloud),
cluster_name,
terminate=terminate,
provider_config=original_config['provider'])
retry_cnt = 1
while True:
try:
teardown_cluster(
repr(cloud),
cluster_name,
terminate=terminate,
provider_config=original_config['provider'])
break
except Exception as e: # pylint: disable=broad-except
logger.debug(f'{terminate_str} {cluster_name!r} failed.')
logger.debug(f'Stacktrace:\n{traceback.format_exc()}')
retry_cnt += 1
if retry_cnt <= _MAX_RETRY:
logger.debug(f'Retrying {retry_cnt}/{_MAX_RETRY}...')
time.sleep(5)
continue
formatted_exception = common_utils.format_exception(
e, use_bracket=True)
raise provision_common.StopFailoverError(
'During provisioner\'s failover, '
f'{terminate_str.lower()} {cluster_name!r} failed. '
'This can cause resource leakage. Please check the '
'failure and the cluster status on the cloud, and '
'manually terminate the cluster. '
f'Details: {formatted_exception}') from e
raise


def teardown_cluster(cloud_name: str, cluster_name: ClusterName,
terminate: bool, provider_config: Dict) -> None:
"""Deleting or stopping a cluster."""
"""Deleting or stopping a cluster.
Raises:
Cloud specific exceptions: If the teardown process failed, cloud-
specific exceptions will be raised by the cloud APIs.
"""
if terminate:
provision.terminate_instances(cloud_name, cluster_name.name_on_cloud,
provider_config)
Expand Down Expand Up @@ -291,7 +327,11 @@ def _wait_ssh_connection_indirect(

def wait_for_ssh(cluster_info: provision_common.ClusterInfo,
ssh_credentials: Dict[str, str]):
"""Wait until SSH is ready."""
"""Wait until SSH is ready.
Raises:
RuntimeError: If the SSH connection is not ready after timeout.
"""
if (cluster_info.has_external_ips() and
ssh_credentials.get('ssh_proxy_command') is None):
# If we can access public IPs, then it is more efficient to test SSH
Expand Down Expand Up @@ -487,6 +527,9 @@ def post_provision_runtime_setup(
and other necessary files to the VM.
3. Run setup commands to install dependencies.
4. Start ray cluster and skylet.
Raises:
RuntimeError: If the setup process encounters any error.
"""
with provision_logging.setup_provision_logging(log_dir):
try:
Expand Down

0 comments on commit cbd8fc8

Please sign in to comment.