Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] Support capacity block reservation #3853

Merged
merged 14 commits into from
Aug 22, 2024
22 changes: 15 additions & 7 deletions sky/clouds/utils/aws_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Utilities for AWS."""
import dataclasses
import enum
import time
from typing import List

Expand All @@ -9,6 +10,11 @@
from sky.adaptors import aws


class ReservationType(str, enum.Enum):
DEFAULT = 'default'
BLOCK = 'capacity-block'


@dataclasses.dataclass
class AWSReservation:
name: str
Expand All @@ -18,6 +24,7 @@ class AWSReservation:
# Whether the reservation is targeted, i.e. can only be consumed when
# the reservation name is specified.
targeted: bool
type: ReservationType


def use_reservations() -> bool:
Expand Down Expand Up @@ -47,11 +54,12 @@ def list_reservations_for_instance_type(
}])
reservations = response['CapacityReservations']
return [
AWSReservation(
name=r['CapacityReservationId'],
instance_type=r['InstanceType'],
zone=r['AvailabilityZone'],
available_resources=r['AvailableInstanceCount'],
targeted=r['InstanceMatchCriteria'] == 'targeted',
) for r in reservations
AWSReservation(name=r['CapacityReservationId'],
instance_type=r['InstanceType'],
zone=r['AvailabilityZone'],
available_resources=r['AvailableInstanceCount'],
targeted=r['InstanceMatchCriteria'] == 'targeted',
type=ReservationType(r.get('ReservationType',
'default')))
for r in reservations
]
45 changes: 29 additions & 16 deletions sky/provision/aws/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ def _create_instances(ec2_fail_fast, cluster_name: str,
assert 'NetworkInterfaces' not in conf, conf
assert security_group_ids is not None, conf

logger.debug(f'Creating {count} instances with config: \n{conf}')

# NOTE: This ensures that we try ALL availability zones before
# throwing an error.
num_subnets = len(subnet_ids)
Expand Down Expand Up @@ -322,9 +324,14 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str:
'Key': 'Name',
'Value': f'sky-{cluster_name_on_cloud}-worker'
})
# Remove AWS internal tags, as they are not allowed to be set by users.
target_instance_tags = [
tag for tag in target_instance.tags
if not tag['Key'].startswith('aws:')
]
ec2.meta.client.create_tags(
Resources=[target_instance.id],
Tags=target_instance.tags + node_tag,
Tags=target_instance_tags + node_tag,
)
return target_instance.id

Expand Down Expand Up @@ -430,12 +437,12 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str:
head_instance_id = _create_node_tag(resumed_instances[0])

if to_start_count > 0:
target_reservations = (config.node_config.get(
target_reservation_names = (config.node_config.get(
'CapacityReservationSpecification',
{}).get('CapacityReservationTarget',
{}).get('CapacityReservationId', []))
created_instances = []
if target_reservations:
if target_reservation_names:
node_config = copy.deepcopy(config.node_config)
# Clear the capacity reservation specification settings in the
# original node config, as we will create instances with
Expand All @@ -449,29 +456,35 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str:
# Filter the reservations by the user-specified ones, because
# reservations contain 'open' reservations as well, which do not
# need to explicitly specify in the config for creating instances.
target_reservations_to_count = {}
for reservation in reservations:
if (reservation.targeted and
reservation.name in target_reservations):
target_reservations_to_count[
reservation.name] = reservation.available_resources
target_reservations = []
for r in reservations:
if (r.targeted and r.name in target_reservation_names):
target_reservations.append(r)
logger.debug(f'Reservations: {reservations}')
logger.debug(f'Target reservations: {target_reservations}')

target_reservations_list = sorted(
target_reservations_to_count.items(),
key=lambda x: x[1],
target_reservations,
key=lambda x: x.available_resources,
reverse=True)
for reservation, reservation_count in target_reservations_list:
if reservation_count <= 0:
for r in target_reservations_list:
if r.available_resources <= 0:
# We have sorted the reservations by the available
# resources, so if the reservation is not available, the
# following reservations are not available either.
break
reservation_count = min(reservation_count, to_start_count)
reservation_count = min(r.available_resources, to_start_count)
logger.debug(f'Creating {reservation_count} instances '
f'with reservation {reservation}')
f'with reservation {r.name}')
node_config['CapacityReservationSpecification'][
'CapacityReservationTarget'] = {
'CapacityReservationId': reservation
'CapacityReservationId': r.name
}
if r.type == aws_utils.ReservationType.BLOCK:
# Capacity block reservations needs to specify the market
# type during instance creation.
node_config['InstanceMarketOptions'] = {
'MarketType': aws_utils.ReservationType.BLOCK.value
}
created_reserved_instances = _create_instances(
ec2_fail_fast,
Expand Down
Loading