Skip to content

Commit

Permalink
[Azure] Use SkyPilot provisioner to handle stop and termination for A…
Browse files Browse the repository at this point in the history
…zure (#3700)

* Use SkyPilot for status query

* format

* Avoid reconfig

* Add todo

* Add termination and stopping

* add stop and termination into __init__

* get rid of azure special handling in backend

* format

* Fix filtering for autodown clusters

* More detailed error message

* typing
  • Loading branch information
Michaelvll authored Jul 1, 2024
1 parent 3d9c6ca commit 24faf70
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 27 deletions.
26 changes: 3 additions & 23 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3888,22 +3888,8 @@ def teardown_no_lock(self,
self.post_teardown_cleanup(handle, terminate, purge)
return

if terminate and isinstance(cloud, clouds.Azure):
# Here we handle termination of Azure by ourselves instead of Ray
# autoscaler.
resource_group = config['provider']['resource_group']
terminate_cmd = f'az group delete -y --name {resource_group}'
with rich_utils.safe_status(f'[bold cyan]Terminating '
f'[green]{cluster_name}'):
returncode, stdout, stderr = log_lib.run_with_log(
terminate_cmd,
log_abs_path,
shell=True,
stream_logs=False,
require_outputs=True)

elif (isinstance(cloud, clouds.IBM) and terminate and
prev_cluster_status == status_lib.ClusterStatus.STOPPED):
if (isinstance(cloud, clouds.IBM) and terminate and
prev_cluster_status == status_lib.ClusterStatus.STOPPED):
# pylint: disable= W0622 W0703 C0415
from sky.adaptors import ibm
from sky.skylet.providers.ibm.vpc_provider import IBMVPCProvider
Expand Down Expand Up @@ -4021,14 +4007,8 @@ def teardown_no_lock(self,
# never launched and the errors are related to pre-launch
# configurations (such as VPC not found). So it's safe & good UX
# to not print a failure message.
#
# '(ResourceGroupNotFound)': this indicates the resource group on
# Azure is not found. That means the cluster is already deleted
# on the cloud. So it's safe & good UX to not print a failure
# message.
elif ('TPU must be specified.' not in stderr and
'SKYPILOT_ERROR_NO_NODES_LAUNCHED: ' not in stderr and
'(ResourceGroupNotFound)' not in stderr):
'SKYPILOT_ERROR_NO_NODES_LAUNCHED: ' not in stderr):
raise RuntimeError(
_TEARDOWN_FAILURE_MESSAGE.format(
extra_reason='',
Expand Down
2 changes: 1 addition & 1 deletion sky/clouds/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Azure(clouds.Cloud):

_INDENT_PREFIX = ' ' * 4

PROVISIONER_VERSION = clouds.ProvisionerVersion.RAY_AUTOSCALER
PROVISIONER_VERSION = clouds.ProvisionerVersion.RAY_PROVISIONER_SKYPILOT_TERMINATOR
STATUS_VERSION = clouds.StatusVersion.SKYPILOT

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions sky/provision/azure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
from sky.provision.azure.instance import cleanup_ports
from sky.provision.azure.instance import open_ports
from sky.provision.azure.instance import query_instances
from sky.provision.azure.instance import stop_instances
from sky.provision.azure.instance import terminate_instances
64 changes: 61 additions & 3 deletions sky/provision/azure/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,63 @@ def cleanup_ports(
del cluster_name_on_cloud, ports, provider_config # Unused.


def stop_instances(
cluster_name_on_cloud: str,
provider_config: Optional[Dict[str, Any]] = None,
worker_only: bool = False,
) -> None:
"""See sky/provision/__init__.py"""
assert provider_config is not None, (cluster_name_on_cloud, provider_config)

subscription_id = provider_config['subscription_id']
resource_group = provider_config['resource_group']
compute_client = azure.get_client('compute', subscription_id)
tag_filters = {TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud}
if worker_only:
tag_filters[TAG_RAY_NODE_KIND] = 'worker'

nodes = _filter_instances(compute_client, tag_filters, resource_group)
stop_virtual_machine = get_azure_sdk_function(
client=compute_client.virtual_machines, function_name='deallocate')
with pool.ThreadPool() as p:
p.starmap(stop_virtual_machine,
[(resource_group, node.name) for node in nodes])


def terminate_instances(
cluster_name_on_cloud: str,
provider_config: Optional[Dict[str, Any]] = None,
worker_only: bool = False,
) -> None:
"""See sky/provision/__init__.py"""
assert provider_config is not None, (cluster_name_on_cloud, provider_config)
# TODO(zhwu): check the following. Also, seems we can directly force
# delete a resource group.
subscription_id = provider_config['subscription_id']
resource_group = provider_config['resource_group']
if worker_only:
compute_client = azure.get_client('compute', subscription_id)
delete_virtual_machine = get_azure_sdk_function(
client=compute_client.virtual_machines, function_name='delete')
filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
TAG_RAY_NODE_KIND: 'worker'
}
nodes = _filter_instances(compute_client, filters, resource_group)
with pool.ThreadPool() as p:
p.starmap(delete_virtual_machine,
[(resource_group, node.name) for node in nodes])
return

assert provider_config is not None, cluster_name_on_cloud

resource_group_client = azure.get_client('resource', subscription_id)
delete_resource_group = get_azure_sdk_function(
client=resource_group_client.resource_groups, function_name='delete')

delete_resource_group(resource_group, force_deletion_types=None)


def _get_vm_status(compute_client: 'azure_compute.ComputeManagementClient',
vm_name: str, resource_group: str) -> str:
instance = compute_client.virtual_machines.instance_view(
Expand All @@ -119,7 +176,7 @@ def _get_vm_status(compute_client: 'azure_compute.ComputeManagementClient',
# skip provisioning status
if code == 'PowerState':
return state
raise ValueError(f'Failed to get status for VM {vm_name}')
raise ValueError(f'Failed to get power state for VM {vm_name}: {instance}')


def _filter_instances(
Expand Down Expand Up @@ -185,8 +242,9 @@ def query_instances(
statuses = {}

def _fetch_and_map_status(
compute_client: 'azure_compute.ComputeManagementClient', node,
resource_group: str):
compute_client: 'azure_compute.ComputeManagementClient',
node: 'azure_compute.models.VirtualMachine',
resource_group: str) -> None:
if node.provisioning_state in provisioning_state_map:
status = provisioning_state_map[node.provisioning_state]
else:
Expand Down

0 comments on commit 24faf70

Please sign in to comment.