diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index fc5eddd6a47..2669aef0a1b 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -193,6 +193,35 @@ Available fields and semantics: # Default: false. disk_encrypted: false + # Reserved capacity (optional). + # + # Whether to prioritize capacity reservations (considered as 0 cost) in the + # optimizer. + # + # If you have capacity reservations in your AWS project: + # Setting this to true guarantees the optimizer will pick any matching + # reservation within all regions and AWS will auto consume your reservations + # with instance match criteria to "open", and setting to false means + # optimizer uses regular, non-zero pricing in optimization (if by chance any + # matching reservation exists, AWS will still consume the reservation). + # + # Note: this setting is default to false for performance reasons, as it can + # take half a minute to retrieve the reservations from AWS when set to true. + # + # Default: false. + prioritize_reservations: false + # + # The targeted capacity reservations (CapacityReservationId) to be + # considered when provisioning clusters on AWS. SkyPilot will automatically + # prioritize this reserved capacity (considered as zero cost) if the + # requested resources matches the reservation. + # + # Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/capacity-reservations-launch.html + specific_reservations: + - cr-a1234567 + - cr-b2345678 + + # Identity to use for AWS instances (optional). # # LOCAL_CREDENTIALS: The user's local credential files will be uploaded to @@ -307,13 +336,16 @@ Available fields and semantics: # Setting this to true guarantees the optimizer will pick any matching # reservation and GCP will auto consume your reservation, and setting to # false means optimizer uses regular, non-zero pricing in optimization (if - # by chance any matching reservation is selected, GCP still auto consumes - # the reservation). + # by chance any matching reservation exists, GCP still auto consumes the + # reservation). # # If you have "specifically targeted" reservations (set by the # `specific_reservations` field below): This field will automatically be set # to true. # + # Note: this setting is default to false for performance reasons, as it can + # take half a minute to retrieve the reservations from GCP when set to true. + # # Default: false. prioritize_reservations: false # diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 021f243da70..3a05223574d 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -8,7 +8,7 @@ import subprocess import time import typing -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Set, Tuple from sky import clouds from sky import exceptions @@ -17,6 +17,7 @@ from sky import skypilot_config from sky.adaptors import aws from sky.clouds import service_catalog +from sky.clouds.utils import aws_utils from sky.skylet import constants from sky.utils import common_utils from sky.utils import resources_utils @@ -173,6 +174,10 @@ def regions_with_offering(cls, instance_type: str, regions = [r for r in regions if r.zones] return regions + @classmethod + def optimize_by_zone(cls) -> bool: + return aws_utils.use_reservations() + @classmethod def zones_provision_loop( cls, @@ -197,11 +202,13 @@ def zones_provision_loop( zone=None) for r in regions: assert r.zones is not None, r - if num_nodes > 1: + if num_nodes > 1 or aws_utils.use_reservations(): # When num_nodes > 1, we shouldn't pass a list of zones to the # AWS NodeProvider to try, because it may then place the nodes of # the same cluster in different zones. This is an artifact of the # current AWS NodeProvider implementation. + # Also, when using reservations, they are zone-specific, so we + # should return one zone at a time. for z in r.zones: yield [z] else: @@ -856,6 +863,37 @@ def check_quota_available(cls, # Quota found to be greater than zero, try provisioning return True + def get_reservations_available_resources( + self, + instance_type: str, + region: str, + zone: Optional[str], + specific_reservations: Set[str], + ) -> Dict[str, int]: + if zone is None: + # For backward compatibility, the cluster in INIT state launched + # before #2352 may not have zone information. In this case, we + # return 0 for all reservations. + return {reservation: 0 for reservation in specific_reservations} + reservations = aws_utils.list_reservations_for_instance_type( + instance_type, region) + + filtered_reservations = [] + for r in reservations: + if zone != r.zone: + continue + if r.targeted: + if r.name in specific_reservations: + filtered_reservations.append(r) + else: + filtered_reservations.append(r) + reservation_available_resources = { + r.name: r.available_resources for r in filtered_reservations + } + logger.debug('Get AWS reservations available resources:' + f'{region}-{zone}: {reservation_available_resources}') + return reservation_available_resources + @classmethod def query_status(cls, name: str, tag_filters: Dict[str, str], region: Optional[str], zone: Optional[str], diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 854cb467c5f..9775109ac80 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -177,6 +177,11 @@ def regions_with_offering(cls, instance_type: str, """ raise NotImplementedError + @classmethod + def optimize_by_zone(cls) -> bool: + """Returns whether to optimize this cloud by zone (default: region).""" + return False + @classmethod def zones_provision_loop( cls, diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index e24e67b2486..643d55d7037 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -260,6 +260,10 @@ def regions_with_offering(cls, instance_type: str, regions = [r for r in regions if r.zones] return regions + @classmethod + def optimize_by_zone(cls) -> bool: + return True + @classmethod def zones_provision_loop( cls, diff --git a/sky/clouds/utils/aws_utils.py b/sky/clouds/utils/aws_utils.py new file mode 100644 index 00000000000..a6c21a15d5c --- /dev/null +++ b/sky/clouds/utils/aws_utils.py @@ -0,0 +1,57 @@ +"""Utilities for AWS.""" +import dataclasses +import time +from typing import List + +import cachetools + +from sky import skypilot_config +from sky.adaptors import aws + + +@dataclasses.dataclass +class AWSReservation: + name: str + instance_type: str + zone: str + available_resources: int + # Whether the reservation is targeted, i.e. can only be consumed when + # the reservation name is specified. + targeted: bool + + +def use_reservations() -> bool: + prioritize_reservations = skypilot_config.get_nested( + ('aws', 'prioritize_reservations'), False) + specific_reservations = skypilot_config.get_nested( + ('aws', 'specific_reservations'), set()) + return prioritize_reservations or specific_reservations + + +@cachetools.cached(cache=cachetools.TTLCache(maxsize=100, + ttl=300, + timer=time.time)) +def list_reservations_for_instance_type( + instance_type: str, + region: str, +) -> List[AWSReservation]: + if not use_reservations(): + return [] + ec2 = aws.client('ec2', region_name=region) + response = ec2.describe_capacity_reservations(Filters=[{ + 'Name': 'instance-type', + 'Values': [instance_type] + }, { + 'Name': 'state', + 'Values': ['active'] + }]) + reservations = response['CapacityReservations'] + return [ + AWSReservation( + name=r['CapacityReservationId'], + instance_type=r['InstanceType'], + zone=r['AvailabilityZone'], + available_resources=r['AvailableInstanceCount'], + targeted=r['InstanceMatchCriteria'] == 'targeted', + ) for r in reservations + ] diff --git a/sky/optimizer.py b/sky/optimizer.py index 7b4b29e3bce..10aa697258b 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -19,6 +19,8 @@ from sky.adaptors import common as adaptors_common from sky.utils import env_options from sky.utils import log_utils +from sky.utils import rich_utils +from sky.utils import subprocess_utils from sky.utils import ux_utils if typing.TYPE_CHECKING: @@ -252,6 +254,26 @@ def _estimate_nodes_cost_or_time( # node -> cloud -> list of resources that satisfy user's requirements. node_to_candidate_map: _TaskToPerCloudCandidates = {} + def get_available_reservations( + launchable_resources: Dict[resources_lib.Resources, + List[resources_lib.Resources]] + ) -> Dict[resources_lib.Resources, int]: + num_available_reserved_nodes_per_resource = {} + + def get_reservations_available_resources( + resources: resources_lib.Resources): + num_available_reserved_nodes_per_resource[resources] = sum( + resources.get_reservations_available_resources().values()) + + launchable_resources_list: List[resources_lib.Resources] = sum( + launchable_resources.values(), []) + with rich_utils.safe_status( + '[cyan]Checking reserved resources...[/]'): + subprocess_utils.run_in_parallel( + get_reservations_available_resources, + launchable_resources_list) + return num_available_reserved_nodes_per_resource + # Compute the estimated cost/time for each node. for node_i, node in enumerate(topo_order): if node_i == 0: @@ -279,7 +301,11 @@ def _estimate_nodes_cost_or_time( list(node.resources)[0]: list(node.resources) } + # Fetch reservations in advance and in parallel to speed up the + # reservation info fetching. num_resources = len(list(node.resources)) + num_available_reserved_nodes_per_resource = ( + get_available_reservations(launchable_resources)) for orig_resources, launchable_list in launchable_resources.items(): if num_resources == 1 and node.time_estimator_func is None: @@ -302,15 +328,16 @@ def _estimate_nodes_cost_or_time( else: estimated_runtime = node.estimate_runtime( orig_resources) + for resources in launchable_list: if do_print: logger.debug(f'resources: {resources}') if minimize_cost: cost_per_node = resources.get_cost(estimated_runtime) - num_available_reserved_nodes = sum( - resources.get_reservations_available_resources( - ).values()) + num_available_reserved_nodes = ( + num_available_reserved_nodes_per_resource[resources] + ) # We consider the cost of the unused reservation # resources to be 0 since we are already paying for @@ -1116,7 +1143,7 @@ def _make_launchables_for_valid_region_zones( regions = launchable_resources.get_valid_regions_for_launchable() for region in regions: if (launchable_resources.use_spot and region.zones is not None or - isinstance(launchable_resources.cloud, clouds.GCP)): + launchable_resources.cloud.optimize_by_zone()): # Spot instances. # Do not batch the per-zone requests. for zone in region.zones: diff --git a/sky/provision/aws/instance.py b/sky/provision/aws/instance.py index 0161992bffc..24173482f34 100644 --- a/sky/provision/aws/instance.py +++ b/sky/provision/aws/instance.py @@ -15,6 +15,7 @@ from sky import status_lib from sky.adaptors import aws from sky.clouds import aws as aws_cloud +from sky.clouds.utils import aws_utils from sky.provision import common from sky.provision import constants from sky.provision.aws import utils @@ -429,19 +430,81 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str: head_instance_id = _create_node_tag(resumed_instances[0]) if to_start_count > 0: + target_reservations = (config.node_config.get( + 'CapacityReservationSpecification', + {}).get('CapacityReservationTarget', + {}).get('CapacityReservationId', [])) + created_instances = [] + if target_reservations: + node_config = copy.deepcopy(config.node_config) + # Clear the capacity reservation specification settings in the + # original node config, as we will create instances with + # reservations with specific settings for each reservation. + node_config['CapacityReservationSpecification'] = { + 'CapacityReservationTarget': {} + } + + reservations = aws_utils.list_reservations_for_instance_type( + node_config['InstanceType'], region=region) + # Filter the reservations by the user-specified ones, because + # reservations contain 'open' reservations as well, which do not + # need to explicitly specify in the config for creating instances. + target_reservations_to_count = {} + for reservation in reservations: + if (reservation.targeted and + reservation.name in target_reservations): + target_reservations_to_count[ + reservation.name] = reservation.available_resources + + target_reservations_list = sorted( + target_reservations_to_count.items(), + key=lambda x: x[1], + reverse=True) + for reservation, reservation_count in target_reservations_list: + if reservation_count <= 0: + # We have sorted the reservations by the available + # resources, so if the reservation is not available, the + # following reservations are not available either. + break + reservation_count = min(reservation_count, to_start_count) + logger.debug(f'Creating {reservation_count} instances ' + f'with reservation {reservation}') + node_config['CapacityReservationSpecification'][ + 'CapacityReservationTarget'] = { + 'CapacityReservationId': reservation + } + created_reserved_instances = _create_instances( + ec2_fail_fast, + cluster_name_on_cloud, + node_config, + tags, + reservation_count, + associate_public_ip_address=( + not config.provider_config['use_internal_ips'])) + created_instances.extend(created_reserved_instances) + to_start_count -= reservation_count + if to_start_count <= 0: + break + # TODO(suquark): If there are existing instances (already running or # resumed), then we cannot guarantee that they will be in the same # availability zone (when there are multiple zones specified). # This is a known issue before. - created_instances = _create_instances( - ec2_fail_fast, - cluster_name_on_cloud, - config.node_config, - tags, - to_start_count, - associate_public_ip_address=( - not config.provider_config['use_internal_ips'])) + if to_start_count > 0: + # Remove the capacity reservation specification from the node config + # as we have already created the instances with the reservations. + config.node_config.get('CapacityReservationSpecification', + {}).pop('CapacityReservationTarget', None) + created_remaining_instances = _create_instances( + ec2_fail_fast, + cluster_name_on_cloud, + config.node_config, + tags, + to_start_count, + associate_public_ip_address=( + not config.provider_config['use_internal_ips'])) + created_instances.extend(created_remaining_instances) created_instances.sort(key=lambda x: x.id) created_instance_ids = [n.id for n in created_instances] diff --git a/sky/templates/aws-ray.yml.j2 b/sky/templates/aws-ray.yml.j2 index 4cfecfe2b12..7e9dfccdaf1 100644 --- a/sky/templates/aws-ray.yml.j2 +++ b/sky/templates/aws-ray.yml.j2 @@ -84,6 +84,12 @@ available_node_types: # SpotOptions: # MaxPrice: MAX_HOURLY_PRICE {% endif %} + CapacityReservationSpecification: + CapacityReservationPreference: open + {% if specific_reservations %} + CapacityReservationTarget: + CapacityReservationId: {{specific_reservations}} + {% endif %} # Use cloud init in UserData to set up the authorized_keys to get # around the number of keys limit and permission issues with # ec2.describe_key_pairs. diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 0fa1e8d34ce..01dc14f617c 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -706,6 +706,15 @@ def get_config_schema(): 'required': [], 'additionalProperties': False, 'properties': { + 'prioritize_reservations': { + 'type': 'boolean', + }, + 'specific_reservations': { + 'type': 'array', + 'items': { + 'type': 'string', + }, + }, 'disk_encrypted': { 'type': 'boolean', },