From 237be80dea5eb506a9e156f94bfb12ec17c2c707 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Fri, 22 Nov 2024 13:32:34 -0800 Subject: [PATCH 01/19] Preliminary Vast AI support --- sky/__init__.py | 2 + sky/adaptors/vast.py | 29 ++ sky/authentication.py | 14 + sky/backends/backend_utils.py | 2 + sky/backends/cloud_vm_ray_backend.py | 1 + sky/clouds/__init__.py | 2 + sky/clouds/service_catalog/constants.py | 2 +- sky/clouds/service_catalog/vast_catalog.py | 104 ++++++++ sky/clouds/vast.py | 296 +++++++++++++++++++++ sky/optimizer.py | 1 + sky/provision/__init__.py | 1 + sky/provision/vast/__init__.py | 11 + sky/provision/vast/config.py | 11 + sky/provision/vast/instance.py | 276 +++++++++++++++++++ sky/provision/vast/utils.py | 103 +++++++ sky/templates/vast-ray.yml.j2 | 78 ++++++ sky/utils/controller_utils.py | 9 +- tests/test_smoke.py | 17 ++ 18 files changed, 957 insertions(+), 2 deletions(-) create mode 100644 sky/adaptors/vast.py create mode 100644 sky/clouds/service_catalog/vast_catalog.py create mode 100644 sky/clouds/vast.py create mode 100644 sky/provision/vast/__init__.py create mode 100644 sky/provision/vast/config.py create mode 100644 sky/provision/vast/instance.py create mode 100644 sky/provision/vast/utils.py create mode 100644 sky/templates/vast-ray.yml.j2 diff --git a/sky/__init__.py b/sky/__init__.py index b851775dabf..1317dd788c4 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -132,6 +132,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): OCI = clouds.OCI Paperspace = clouds.Paperspace RunPod = clouds.RunPod +Vast = clouds.Vast Vsphere = clouds.Vsphere Fluidstack = clouds.Fluidstack optimize = Optimizer.optimize @@ -149,6 +150,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): 'OCI', 'Paperspace', 'RunPod', + 'Vast', 'SCP', 'Vsphere', 'Fluidstack', diff --git a/sky/adaptors/vast.py b/sky/adaptors/vast.py new file mode 100644 index 00000000000..20ba72035f1 --- /dev/null +++ b/sky/adaptors/vast.py @@ -0,0 +1,29 @@ +"""Vast cloud adaptor.""" + +import functools + +_vast_sdk = None + + +def import_package(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + global _vast_sdk + + if _vast_sdk is None: + try: + import vastai_sdk as _vast # pylint: disable=import-outside-toplevel + _vast_sdk = _vast.VastAI() + except ImportError: + raise ImportError('Fail to import dependencies for vast.' + 'Try pip install "skypilot[vast]"') from None + return func(*args, **kwargs) + + return wrapper + + +@import_package +def vast(): + """Return the vast package.""" + return _vast_sdk diff --git a/sky/authentication.py b/sky/authentication.py index 41a7d02dfb7..4e7f7e89188 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -43,6 +43,7 @@ from sky.adaptors import ibm from sky.adaptors import kubernetes from sky.adaptors import runpod +from sky.adaptors import vast from sky.provision.fluidstack import fluidstack_utils from sky.provision.kubernetes import utils as kubernetes_utils from sky.provision.lambda_cloud import lambda_utils @@ -473,6 +474,19 @@ def setup_runpod_authentication(config: Dict[str, Any]) -> Dict[str, Any]: return configure_ssh_info(config) +def setup_vast_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + """Sets up SSH authentication for Vast. + - Generates a new SSH key pair if one does not exist. + - Adds the public SSH key to the user's Vast account. + """ + _, public_key_path = get_or_generate_keys() + with open(public_key_path, 'r', encoding='UTF-8') as pub_key_file: + public_key = pub_key_file.read().strip() + vast.vast().create_ssh_key(ssh_key=public_key) + config['auth']['ssh_public_key'] = PUBLIC_SSH_KEY_PATH + return configure_ssh_info(config) + + def setup_fluidstack_authentication(config: Dict[str, Any]) -> Dict[str, Any]: get_or_generate_keys() diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 5902510b626..023bbe9efb4 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -963,6 +963,8 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): config = auth.setup_ibm_authentication(config) elif isinstance(cloud, clouds.RunPod): config = auth.setup_runpod_authentication(config) + elif isinstance(cloud, clouds.Vast): + config = auth.setup_vast_authentication(config) elif isinstance(cloud, clouds.Fluidstack): config = auth.setup_fluidstack_authentication(config) else: diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 5682cf24586..a8fcc1fdaa4 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -176,6 +176,7 @@ def _get_cluster_config_template(cloud): clouds.RunPod: 'runpod-ray.yml.j2', clouds.Kubernetes: 'kubernetes-ray.yml.j2', clouds.Vsphere: 'vsphere-ray.yml.j2', + clouds.Vast: 'vast-ray.yml.j2', clouds.Fluidstack: 'fluidstack-ray.yml.j2' } return cloud_to_template[type(cloud)] diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index c4d46e93adf..2201f41ac23 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -24,6 +24,7 @@ from sky.clouds.paperspace import Paperspace from sky.clouds.runpod import RunPod from sky.clouds.scp import SCP +from sky.clouds.vast import Vast from sky.clouds.vsphere import Vsphere __all__ = [ @@ -37,6 +38,7 @@ 'Paperspace', 'SCP', 'RunPod', + 'Vast', 'OCI', 'Vsphere', 'Kubernetes', diff --git a/sky/clouds/service_catalog/constants.py b/sky/clouds/service_catalog/constants.py index 1373fd86a03..f5f1f89b390 100644 --- a/sky/clouds/service_catalog/constants.py +++ b/sky/clouds/service_catalog/constants.py @@ -3,5 +3,5 @@ CATALOG_SCHEMA_VERSION = 'v5' CATALOG_DIR = '~/.sky/catalogs' ALL_CLOUDS = ('aws', 'azure', 'gcp', 'ibm', 'lambda', 'scp', 'oci', - 'kubernetes', 'runpod', 'vsphere', 'cudo', 'fluidstack', + 'kubernetes', 'runpod', 'vast', 'vsphere', 'cudo', 'fluidstack', 'paperspace') diff --git a/sky/clouds/service_catalog/vast_catalog.py b/sky/clouds/service_catalog/vast_catalog.py new file mode 100644 index 00000000000..14f4b2fedf2 --- /dev/null +++ b/sky/clouds/service_catalog/vast_catalog.py @@ -0,0 +1,104 @@ +""" Vast | Catalog + +This module loads the service catalog file and can be used to +query instance types and pricing information for Vast.ai. +""" + +import typing +from typing import Dict, List, Optional, Tuple, Union + +from sky.clouds.service_catalog import common +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from sky.clouds import cloud + +_df = common.read_catalog('vast/vms.csv') + + +def instance_type_exists(instance_type: str) -> bool: + return common.instance_type_exists_impl(_df, instance_type) + + +def validate_region_zone( + region: Optional[str], + zone: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.validate_region_zone_impl('vast', _df, region, zone) + + +def get_hourly_cost(instance_type: str, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the cost, or the cheapest cost among all zones for spot.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.get_hourly_cost_impl(_df, instance_type, use_spot, region, + zone) + + +def get_vcpus_mem_from_instance_type( + instance_type: str) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) + + +def get_default_instance_type(cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[str] = None) -> Optional[str]: + del disk_tier + # NOTE: After expanding catalog to multiple entries, you may + # want to specify a default instance type or family. + return common.get_instance_type_for_cpus_mem_impl(_df, cpus, memory) + + +def get_accelerators_from_instance_type( + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return common.get_accelerators_from_instance_type_impl(_df, instance_type) + + +def get_instance_type_for_accelerator( + acc_name: str, + acc_count: int, + cpus: Optional[str] = None, + memory: Optional[str] = None, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: + """Returns a list of instance types that have the given accelerator.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.get_instance_type_for_accelerator_impl(df=_df, + acc_name=acc_name, + acc_count=acc_count, + cpus=cpus, + memory=memory, + use_spot=use_spot, + region=region, + zone=zone) + + +def get_region_zones_for_instance_type(instance_type: str, + use_spot: bool) -> List['cloud.Region']: + df = _df[_df['InstanceType'] == instance_type] + return common.get_region_zones(df, use_spot) + + +# TODO: this differs from the fluffy catalog version +def list_accelerators( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True) -> Dict[str, List[common.InstanceTypeInfo]]: + """Returns all instance types in Vast offering GPUs.""" + del require_price # Unused. + return common.list_accelerators_impl('Vast', _df, gpus_only, name_filter, + region_filter, quantity_filter, + case_sensitive, all_regions) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py new file mode 100644 index 00000000000..2f7877372d2 --- /dev/null +++ b/sky/clouds/vast.py @@ -0,0 +1,296 @@ +""" Vast Cloud. """ + +import typing +from typing import Dict, Iterator, List, Optional, Tuple, Union + +from sky import clouds +from sky.clouds import service_catalog +from sky.utils import resources_utils + +if typing.TYPE_CHECKING: + from sky import resources as resources_lib + +_CREDENTIAL_FILES = [ + 'config.toml', +] + + +@clouds.CLOUD_REGISTRY.register +class Vast(clouds.Cloud): + """ Vast GPU Cloud + + _REPR | The string representation for the Vast GPU cloud object. + """ + _REPR = 'Vast' + _CLOUD_UNSUPPORTED_FEATURES = { + clouds.CloudImplementationFeatures.SPOT_INSTANCE: + ('Spot is not supported, as vast API does not implement spot.'), + clouds.CloudImplementationFeatures.MULTI_NODE: + ('Multi-node not supported yet, as the interconnection among nodes ' + 'are non-trivial on Vast.'), + ## TODO: These are different from the fluffy cloud + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: + ('Customizing disk tier is not supported yet on Vast.'), + clouds.CloudImplementationFeatures.STORAGE_MOUNTING: + ('Mounting object stores is not supported on Vast. To read data ' + 'from object stores on Vast, use `mode: COPY` to copy the data ' + 'to local disk.'), + } + _MAX_CLUSTER_NAME_LEN_LIMIT = 120 + _regions: List[clouds.Region] = [] + + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + OPEN_PORTS_VERSION = clouds.OpenPortsVersion.LAUNCH_ONLY + + @classmethod + def _unsupported_features_for_resources( + cls, resources: 'resources_lib.Resources' + ) -> Dict[clouds.CloudImplementationFeatures, str]: + """The features not supported based on the resources provided. + + This method is used by check_features_are_supported() to check if the + cloud implementation supports all the requested features. + + Returns: + A dict of {feature: reason} for the features not supported by the + cloud implementation. + """ + del resources # unused + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _max_cluster_name_length(cls) -> Optional[int]: + return cls._MAX_CLUSTER_NAME_LEN_LIMIT + + @classmethod + def regions_with_offering(cls, instance_type: str, + accelerators: Optional[Dict[str, int]], + use_spot: bool, region: Optional[str], + zone: Optional[str]) -> List[clouds.Region]: + assert zone is None, 'Vast does not support zones.' + del accelerators, zone # unused + if use_spot: + return [] + else: + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'vast') + + if region is not None: + regions = [r for r in regions if r.name == region] + return regions + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + return service_catalog.get_vcpus_mem_from_instance_type(instance_type, + clouds='vast') + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[None]: + del num_nodes # unused + regions = cls.regions_with_offering(instance_type, + accelerators, + use_spot, + region=region, + zone=None) + for r in regions: + assert r.zones is None, r + yield r.zones + + def instance_type_to_hourly_cost(self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + return service_catalog.get_hourly_cost(instance_type, + use_spot=use_spot, + region=region, + zone=zone, + clouds='vast') + + def accelerators_to_hourly_cost(self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the hourly cost of the accelerators, in dollars/hour.""" + del accelerators, use_spot, region, zone # unused + return 0.0 # Vast includes accelerators in the hourly cost. + + def get_egress_cost(self, num_gigabytes: float) -> float: + return 0.0 + + def __repr__(self): + return 'Vast' + + def is_same_cloud(self, other: clouds.Cloud) -> bool: + # Returns true if the two clouds are the same cloud type. + return isinstance(other, Vast) + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None + ) -> Optional[str]: + """Returns the default instance type for Vast.""" + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='vast') + + @classmethod + def get_accelerators_from_instance_type( + cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return service_catalog.get_accelerators_from_instance_type( + instance_type, clouds='vast') + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + return None + + # TODO: Function signature is different from the fluffy cloud + def make_deploy_resources_variables( + self, + resources: 'resources_lib.Resources', + cluster_name: resources_utils.ClusterName, + region: 'clouds.Region', + zones: Optional[List['clouds.Zone']], + dryrun: bool = False) -> Dict[str, Optional[str]]: + del zones, dryrun, cluster_name # unused + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) + + if r.image_id is None: + image_id = 'runpod/base:0.0.2' + elif r.extract_docker_image() is not None: + image_id = r.extract_docker_image() + else: + image_id = r.image_id[r.region] + + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + 'image_id': image_id, + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources' + ) -> 'resources_utils.FeasibleResources': + """Returns a list of feasible resources for the given resources.""" + if resources.use_spot: + # TODO: Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + if resources.instance_type is not None: + assert resources.is_launchable(), resources + resources = resources.copy(accelerators=None) + return resources_utils.FeasibleResources([resources], [], None) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=Vast(), + instance_type=instance_type, + accelerators=None, + cpus=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type + default_instance_type = Vast.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + if default_instance_type is None: + # TODO: Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + else: + return resources_utils.FeasibleResources( + _make([default_instance_type]), [], None) + + assert len(accelerators) == 1, resources + acc, acc_count = list(accelerators.items())[0] + (instance_list, fuzzy_candidate_list + ) = service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + use_spot=resources.use_spot, + cpus=resources.cpus, + region=resources.region, + zone=resources.zone, + clouds='vast') + if instance_list is None: + return resources_utils.FeasibleResources([], fuzzy_candidate_list, + None) + return resources_utils.FeasibleResources(_make(instance_list), + fuzzy_candidate_list, None) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + """ Verify that the user has valid credentials for Vast. """ + try: + import vastai_sdk as _vast # pylint: disable=import-outside-toplevel + vast = _vast.VastAI() + + # We only support file pased credential passing + if vast.creds_source != 'FILE': + return False, ( + 'error \n' # First line is indented by 4 spaces + ' Credentials can be set up by running: \n' + ' $ pip install vastai\n' + ' $ echo [key] > ~/.vast_api_key\n' + ' For more information, see https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#vast' # pylint: disable=line-too-long + ) + + return True, None + + except ImportError: + return False, ('Failed to import vast. ' + 'To install, run: pip install skypilot[vast]') + + def get_credential_file_mounts(self) -> Dict[str, str]: + return { + f'~/.config/vastai/{filename}': f'~/.config/vastai/{filename}' + for filename in _CREDENTIAL_FILES + } + + @classmethod + def get_current_user_identity(cls) -> Optional[List[str]]: + # NOTE: used for very advanced SkyPilot functionality + # Can implement later if desired + return None + + def instance_type_exists(self, instance_type: str) -> bool: + return service_catalog.instance_type_exists(instance_type, 'vast') + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + return service_catalog.validate_region_zone(region, zone, clouds='vast') + + @classmethod + def get_image_size(cls, image_id: str, region: Optional[str]) -> float: + # TODO: use 0.0 for now to allow all images. We should change this to + # return the docker image size. + return 0.0 diff --git a/sky/optimizer.py b/sky/optimizer.py index 2f70dd39429..dbeb13d31bb 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -1294,6 +1294,7 @@ def _fill_in_launchable_resources( # If clouds provide hints, store them for later printing. hints: Dict[clouds.Cloud, str] = {} for cloud in clouds_list: + # import ipdb; ipdb.set_trace() feasible_resources = cloud.get_feasible_launchable_resources( resources, num_nodes=task.num_nodes) if feasible_resources.hint is not None: diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 02a627b08a3..83452a0077b 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -22,6 +22,7 @@ from sky.provision import lambda_cloud from sky.provision import oci from sky.provision import runpod +from sky.provision import vast from sky.provision import vsphere from sky.utils import command_runner from sky.utils import timeline diff --git a/sky/provision/vast/__init__.py b/sky/provision/vast/__init__.py new file mode 100644 index 00000000000..1b19246496a --- /dev/null +++ b/sky/provision/vast/__init__.py @@ -0,0 +1,11 @@ +"""Vast provisioner for SkyPilot.""" + +from sky.provision.vast.config import bootstrap_instances +from sky.provision.vast.instance import cleanup_ports +from sky.provision.vast.instance import get_cluster_info +from sky.provision.vast.instance import query_instances +from sky.provision.vast.instance import query_ports +from sky.provision.vast.instance import run_instances +from sky.provision.vast.instance import stop_instances +from sky.provision.vast.instance import terminate_instances +from sky.provision.vast.instance import wait_instances diff --git a/sky/provision/vast/config.py b/sky/provision/vast/config.py new file mode 100644 index 00000000000..9cb337eb91d --- /dev/null +++ b/sky/provision/vast/config.py @@ -0,0 +1,11 @@ +"""Vast configuration bootstrapping.""" + +from sky.provision import common + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + """Bootstraps instances for the given cluster.""" + del region, cluster_name # unused + return config diff --git a/sky/provision/vast/instance.py b/sky/provision/vast/instance.py new file mode 100644 index 00000000000..1b8a1caedf6 --- /dev/null +++ b/sky/provision/vast/instance.py @@ -0,0 +1,276 @@ +"""Vast instance provisioning.""" +import time +from typing import Any, Dict, List, Optional + +from sky import sky_logging +from sky import status_lib +from sky.provision import common +from sky.provision.vast import utils +from sky.utils import common_utils +from sky.utils import resources_utils +from sky.utils import ux_utils + +POLL_INTERVAL = 5 +QUERY_PORTS_TIMEOUT_SECONDS = 30 + +logger = sky_logging.init_logger(__name__) +# a much more convenient method +status_filter = lambda machine_dict, stat_list: { + k: v for k, v in machine_dict.items() if v['status'] in stat_list +} + + +def _filter_instances(cluster_name_on_cloud: str, + status_filters: Optional[List[str]], + head_only: bool = False) -> Dict[str, Any]: + + instances = utils.list_instances() + possible_names = [f'{cluster_name_on_cloud}-head'] + if not head_only: + possible_names.append(f'{cluster_name_on_cloud}-worker') + + filtered_instances = {} + for instance_id, instance in instances.items(): + if (status_filters is not None and + instance['status'] not in status_filters): + continue + if instance.get('name') in possible_names: + filtered_instances[instance_id] = instance + return filtered_instances + + +def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: + head_instance_id = None + for inst_id, inst in instances.items(): + if inst['name'].endswith('-head'): + head_instance_id = inst_id + break + return head_instance_id + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster.""" + pending_status = ['CREATED', 'RESTARTING'] + + created_instance_ids = [] + instances: Dict[str, Any] = {} + + while True: + instances = utils.list_instances() + if not status_filter(instances, pending_status): + break + logger.info(f'Waiting for {len(instances)} instances to be ready.') + time.sleep(POLL_INTERVAL) + + running_instances = status_filter(instances, ['RUNNING']) + head_instance_id = _get_head_instance_id(running_instances) + stopped_instances = status_filter(instances, ['EXITED', 'STOPPED']) + + if config.resume_stopped_nodes and len(stopped_instances): + for instance in stopped_instances.values(): + utils.start(instance['id']) + else: + to_start_count = config.count - (len(running_instances) + + len(stopped_instances)) + if to_start_count < 0: + raise RuntimeError(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes,' + f'but {config.count} are required.') + if to_start_count == 0: + if head_instance_id is None: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} has no head node.') + logger.info( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes, no need to start more.') + return common.ProvisionRecord(provider_name='vast', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=[]) + + for _ in range(to_start_count): + node_type = 'head' if head_instance_id is None else 'worker' + try: + instance_id = utils.launch( + name=f'{cluster_name_on_cloud}-{node_type}', + instance_type=config.node_config['InstanceType'], + region=region, + disk_size=config.node_config['DiskSize'], + image_name=config.node_config['ImageId'], + ports=config.ports_to_open_on_launch, + public_key=config.node_config['PublicKey']) + except Exception as e: # pylint: disable=broad-except + logger.warning(f'run_instances error: {e}') + raise + logger.info(f'Launched instance {instance_id}.') + created_instance_ids.append(instance_id) + if head_instance_id is None: + head_instance_id = instance_id + + # Wait for instances to be ready. + while True: + instances = _filter_instances(cluster_name_on_cloud, ['RUNNING']) + ready_instance_cnt = 0 + for instance_id, instance in instances.items(): + if instance.get('ssh_port') is not None: + ready_instance_cnt += 1 + logger.info('Waiting for instances to be ready: ' + f'({ready_instance_cnt}/{config.count}).') + if ready_instance_cnt == config.count: + break + + time.sleep(POLL_INTERVAL) + + head_instance_id = _get_head_instance_id(utils.list_instances()) + assert head_instance_id is not None, 'head_instance_id should not be None' + return common.ProvisionRecord(provider_name='vast', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=created_instance_ids) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del region, cluster_name_on_cloud, state + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + return action_instances('stop', cluster_name_on_cloud, provider_config, + worker_only) + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + return action_instances('remove', cluster_name_on_cloud, provider_config, + worker_only) + + +def action_instances( + fn: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + del provider_config # unused + instances = _filter_instances(cluster_name_on_cloud, None) + for inst_id, inst in instances.items(): + logger.debug(f'Instance {fn} {inst_id}: {inst}') + if worker_only and inst['name'].endswith('-head'): + continue + try: + getattr(utils, fn)(inst_id) + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Failed to {fn} instance {inst_id}: ' + f'{common_utils.format_exception(e, use_bracket=False)}' + ) from e + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo: + del region # unused + running_instances = _filter_instances(cluster_name_on_cloud, ['RUNNING']) + instances: Dict[str, List[common.InstanceInfo]] = {} + head_instance_id = None + for instance_id, instance_info in running_instances.items(): + instances[instance_id] = [ + common.InstanceInfo( + instance_id=instance_id, + internal_ip=instance_info['local_ipaddrs'].strip(), + external_ip=instance_info['public_ipaddr'], + ssh_port=instance_info['ports']['22/tcp'][0]['HostPort'], + tags={}, + ) + ] + if instance_info['name'].endswith('-head'): + head_instance_id = instance_id + + return common.ClusterInfo( + instances=instances, + head_instance_id=head_instance_id, + provider_name='vast', + provider_config=provider_config, + ) + + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = _filter_instances(cluster_name_on_cloud, None) + # "running", "frozen", "stopped", "unknown", "loading" + status_map = { + 'LOADING': status_lib.ClusterStatus.INIT, + 'EXITED': status_lib.ClusterStatus.STOPPED, + 'STOPPED': status_lib.ClusterStatus.STOPPED, + 'RUNNING': status_lib.ClusterStatus.UP, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for inst_id, inst in instances.items(): + status = status_map[inst['status']] + if non_terminated_only and status is None: + continue + statuses[inst_id] = status + return statuses + + +def cleanup_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, ports, provider_config # Unused. + + +def query_ports( + cluster_name_on_cloud: str, + ports: List[str], + head_ip: Optional[str] = None, + provider_config: Optional[Dict[str, Any]] = None, +) -> Dict[int, List[common.Endpoint]]: + """See sky/provision/__init__.py""" + del head_ip, provider_config # Unused. + + start_time = time.time() + ports_to_query = resources_utils.port_ranges_to_set(ports) + while True: + instances = _filter_instances(cluster_name_on_cloud, + None, + head_only=True) + assert len(instances) == 1 + head_inst = list(instances.values())[0] + ready_ports: Dict[int, List[common.Endpoint]] = { + port: [common.SocketEndpoint(**endpoint)] + for port, endpoint in head_inst['port2endpoint'].items() + if port in ports_to_query + } + not_ready_ports = ports_to_query - set(ready_ports.keys()) + if not not_ready_ports: + return ready_ports + if time.time() - start_time > QUERY_PORTS_TIMEOUT_SECONDS: + logger.warning(f'Querying ports {ports} timed out. Ports ' + f'{not_ready_ports} are not ready.') + return ready_ports + time.sleep(1) diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py new file mode 100644 index 00000000000..57b3e165a5f --- /dev/null +++ b/sky/provision/vast/utils.py @@ -0,0 +1,103 @@ +# pylint: disable=assignment-from-no-return +"""Vast library wrapper for SkyPilot.""" +from typing import Any, Dict, List, Optional + +from sky import sky_logging +from sky.adaptors import vast + +logger = sky_logging.init_logger(__name__) + + +def list_instances() -> Dict[str, Dict[str, Any]]: + """Lists instances associated with API key.""" + instances = vast.vast().show_instances() + + instance_dict: Dict[str, Dict[str, Any]] = {} + for instance in instances: + instance['id'] = str(instance['id']) + info = instance.copy() + + if isinstance(instance['actual_status'], str): + info['status'] = instance['actual_status'].upper() + else: + info['status'] = 'UNKNOWN' + info['name'] = instance['label'] + + instance_dict[instance['id']] = info + + return instance_dict + + +def launch(name: str, instance_type: str, region: str, disk_size: int, + image_name: str, ports: Optional[List[int]], public_key: str) -> str: + """Launches an instance with the given parameters. + + Converts the instance_type to the Vast GPU name, finds the specs for the + GPU, and launches the instance. + """ + del ports + del public_key + + gpu_name = instance_type.split('-')[1].replace('_', ' ') + num_gpus = int(instance_type.split('-')[0].replace('x', '')) + + query = ' '.join([ + f'geolocation="{region[-2:]}"', + f'disk_space>={disk_size}', + f'num_gpus={num_gpus}', + f'gpu_name="{gpu_name}"', + ]) + + instance_list = vast.vast().search_offers(query=query) + + if isinstance(instance_list, int) or len(instance_list) == 0: + return '' + + instance_touse = instance_list[0] + + new_instance_contract = vast.vast().create_instance( + id=instance_touse['id'], + direct=True, + ssh=True, + env='-e __SOURCE=skypilot', + onstart_cmd='touch ~/.no_auto_tmux;apt install lsof', + label=name, + image=image_name) + + new_instance = vast.vast().show_instance( + id=new_instance_contract['new_contract']) + + return new_instance['id'] + + +def start(instance_id: str) -> None: + """Stops the given instance.""" + vast.vast().start_instance(id=instance_id) + + +def stop(instance_id: str) -> None: + """Stops the given instance.""" + vast.vast().stop_instance(id=instance_id) + + +def remove(instance_id: str) -> None: + """Terminates the given instance.""" + vast.vast().destroy_instance(id=instance_id) + + +def get_ssh_ports(cluster_name) -> List[int]: + """Gets the SSH ports for the given cluster.""" + logger.debug(f'Getting SSH ports for cluster {cluster_name}.') + + instances = list_instances() + possible_names = [f'{cluster_name}-head', f'{cluster_name}-worker'] + + ssh_ports = [] + + for instance in instances.values(): + if instance['name'] in possible_names: + ssh_ports.append(instance['ssh_port']) + assert ssh_ports, ( + f'Could not find any instances for cluster {cluster_name}.') + + return ssh_ports diff --git a/sky/templates/vast-ray.yml.j2 b/sky/templates/vast-ray.yml.j2 new file mode 100644 index 00000000000..9c856d0f210 --- /dev/null +++ b/sky/templates/vast-ray.yml.j2 @@ -0,0 +1,78 @@ +cluster_name: {{cluster_name_on_cloud}} + +# The maximum number of workers nodes to launch in addition to the head node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + +provider: + type: external + module: sky.provision.vast + region: "{{region}}" + disable_launch_config_check: true + +auth: + ssh_user: root + ssh_private_key: {{ssh_private_key}} + +available_node_types: + ray_head_default: + resources: {} + node_config: + InstanceType: {{instance_type}} + DiskSize: {{disk_size}} + ImageId: {{image_id}} + PublicKey: |- + skypilot:ssh_public_key_content + +head_node_type: ray_head_default + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +rsync_exclude: [] + +initialization_commands: [] + +# List of shell commands to run to set up nodes. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 1 +setup_commands: + # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'rm ..': there is another installation of pip. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - {%- for initial_setup_command in initial_setup_commands %} + {{ initial_setup_command }} + {%- endfor %} + sudo systemctl stop unattended-upgrades || true; + sudo systemctl disable unattended-upgrades || true; + sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true; + sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1` || true; + sudo pkill -9 apt-get; + sudo pkill -9 dpkg; + sudo dpkg --configure -a; + mkdir -p ~/.ssh; touch ~/.ssh/config; + {{ conda_installation_commands }} + {{ ray_skypilot_installation_commands }} + touch ~/.sudo_as_admin_successful; + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); + +# Command to start ray clusters are now placed in `sky.provision.instance_setup`. +# We do not need to list it here anymore. diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index a6657df960d..459ce3cec2b 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -98,7 +98,7 @@ class Controllers(enum.Enum): controller_type='jobs', name='managed jobs controller', candidate_cluster_names=[ - managed_job_utils.JOB_CONTROLLER_NAME, + managed_job_utils.JOB_CONTROLLER_NAME, managed_job_utils.LEGACY_JOB_CONTROLLER_NAME ], in_progress_hint=( @@ -300,6 +300,13 @@ def _get_cloud_dependencies_installation_commands( 'pip list | grep oci > /dev/null 2>&1 || ' 'pip install oci > /dev/null 2>&1') setup_clouds.append(str(cloud)) + elif isinstance(cloud, clouds.Vast): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append(f'echo -en "\\r{step_prefix}Vast{empty_str}" && ' + 'pip list | grep vastai_sdk > /dev/null 2>&1 || ' + 'pip install "vastai_sdk>=0.1.2" > /dev/null 2>&1') + setup_clouds.append(str(cloud)) if controller == Controllers.JOBS_CONTROLLER: if isinstance(cloud, clouds.IBM): step_prefix = prefix_str.replace('', diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 6ba81ce68f0..2187bb2606a 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2258,6 +2258,23 @@ def test_paperspace_http_server_with_custom_ports(): run_one_test(test) +# ---------- Web apps with custom ports on Vast. ---------- +@pytest.mark.vast +def test_vast_http_server_with_custom_ports(): + name = _get_cluster_name() + test = Test( + 'vast_http_server_with_custom_ports', + [ + f'sky launch -y -d -c {name} --cloud vast examples/http_server_with_custom_ports/task.yaml', + f'until SKYPILOT_DEBUG=0 sky status --endpoint 33828 {name}; do sleep 10; done', + # Retry a few times to avoid flakiness in ports being open. + f'ip=$(SKYPILOT_DEBUG=0 sky status --endpoint 33828 {name}); success=false; for i in $(seq 1 5); do if curl $ip | grep "

This is a demo HTML page.

"; then success=true; break; fi; sleep 10; done; if [ "$success" = false ]; then exit 1; fi', + ], + f'sky down -y {name}', + ) + run_one_test(test) + + # ---------- Web apps with custom ports on RunPod. ---------- @pytest.mark.runpod def test_runpod_http_server_with_custom_ports(): From 35f0bb094816a22e8d0628d3a1e829f77dd3cdfe Mon Sep 17 00:00:00 2001 From: chris mckenzie Date: Tue, 19 Nov 2024 11:29:10 -0800 Subject: [PATCH 02/19] Update sky/provision/vast/instance.py Co-authored-by: Tian Xia --- sky/provision/vast/instance.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sky/provision/vast/instance.py b/sky/provision/vast/instance.py index 1b8a1caedf6..edf854c187d 100644 --- a/sky/provision/vast/instance.py +++ b/sky/provision/vast/instance.py @@ -40,12 +40,10 @@ def _filter_instances(cluster_name_on_cloud: str, def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: - head_instance_id = None for inst_id, inst in instances.items(): if inst['name'].endswith('-head'): - head_instance_id = inst_id - break - return head_instance_id + return inst_id + return None def run_instances(region: str, cluster_name_on_cloud: str, From 1572222ce04a496c0de52fe6b5c2ed085982645d Mon Sep 17 00:00:00 2001 From: chris mckenzie Date: Tue, 19 Nov 2024 11:29:32 -0800 Subject: [PATCH 03/19] Update sky/clouds/vast.py Co-authored-by: Tian Xia --- sky/clouds/vast.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index 2f7877372d2..c83d1977e0b 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -131,8 +131,6 @@ def accelerators_to_hourly_cost(self, def get_egress_cost(self, num_gigabytes: float) -> float: return 0.0 - def __repr__(self): - return 'Vast' def is_same_cloud(self, other: clouds.Cloud) -> bool: # Returns true if the two clouds are the same cloud type. From e0f8e22d124a81e323e7ef4d3858d6ae4c43732f Mon Sep 17 00:00:00 2001 From: chris mckenzie Date: Tue, 19 Nov 2024 12:08:04 -0800 Subject: [PATCH 04/19] Update sky/clouds/vast.py Co-authored-by: Tian Xia --- sky/clouds/vast.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index c83d1977e0b..c57b9d24fa7 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -132,9 +132,6 @@ def get_egress_cost(self, num_gigabytes: float) -> float: return 0.0 - def is_same_cloud(self, other: clouds.Cloud) -> bool: - # Returns true if the two clouds are the same cloud type. - return isinstance(other, Vast) @classmethod def get_default_instance_type( From f1fb7006b99d4d27cd07cd0f9663a95a19446d79 Mon Sep 17 00:00:00 2001 From: chris mckenzie Date: Tue, 19 Nov 2024 12:43:40 -0800 Subject: [PATCH 05/19] Update sky/clouds/vast.py Co-authored-by: Tian Xia --- sky/clouds/vast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index c57b9d24fa7..23546b79f1c 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -273,7 +273,7 @@ def get_credential_file_mounts(self) -> Dict[str, str]: } @classmethod - def get_current_user_identity(cls) -> Optional[List[str]]: + def get_user_identities(cls) -> Optional[List[str]]: # NOTE: used for very advanced SkyPilot functionality # Can implement later if desired return None From 7ac64c5603ef09c70b1461cf8cb962a51da70730 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 09:49:11 -0800 Subject: [PATCH 06/19] Updating the vast dependencies in the setup.py --- sky/setup_files/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index 0fd6978ec03..d1584677fa7 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -238,6 +238,7 @@ def parse_readme(readme: str) -> str: 'fluidstack': [], # No dependencies needed for fluidstack 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace + 'vast': ['vastai_sdk'], # As of now, any version will do 'vsphere': [ 'pyvmomi==8.0.1.0.2', # vsphere-automation-sdk is also required, but it does not have From 69594a32bc3c456183eb499182f6e3c90065a0eb Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 09:51:03 -0800 Subject: [PATCH 07/19] Vast: Copy update on object stores --- sky/clouds/vast.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index 23546b79f1c..8807324b253 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -32,9 +32,7 @@ class Vast(clouds.Cloud): clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: ('Customizing disk tier is not supported yet on Vast.'), clouds.CloudImplementationFeatures.STORAGE_MOUNTING: - ('Mounting object stores is not supported on Vast. To read data ' - 'from object stores on Vast, use `mode: COPY` to copy the data ' - 'to local disk.'), + ('Mounting object stores is not supported on Vast.'), } _MAX_CLUSTER_NAME_LEN_LIMIT = 120 _regions: List[clouds.Region] = [] From a1442e72f1f3920a9a4e02f89e2c4386614e28a7 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 10:04:03 -0800 Subject: [PATCH 08/19] Vast: update base image dockerhub link --- sky/clouds/vast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index 8807324b253..8c8983d9094 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -170,7 +170,7 @@ def make_deploy_resources_variables( acc_dict) if r.image_id is None: - image_id = 'runpod/base:0.0.2' + image_id = 'vastai/pytorch:latest' elif r.extract_docker_image() is not None: image_id = r.extract_docker_image() else: From 56bd241a3a7182741cbccba9621d4d6472bc34f4 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 10:07:59 -0800 Subject: [PATCH 09/19] Vast: removing errant comment --- sky/optimizer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/optimizer.py b/sky/optimizer.py index dbeb13d31bb..2f70dd39429 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -1294,7 +1294,6 @@ def _fill_in_launchable_resources( # If clouds provide hints, store them for later printing. hints: Dict[clouds.Cloud, str] = {} for cloud in clouds_list: - # import ipdb; ipdb.set_trace() feasible_resources = cloud.get_feasible_launchable_resources( resources, num_nodes=task.num_nodes) if feasible_resources.hint is not None: From 79c87c584f3b3fce5a974727a2b03c5dea860fe0 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 10:13:42 -0800 Subject: [PATCH 10/19] Vast: provision/utils cleanup of a shallow copy --- sky/provision/vast/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py index 57b3e165a5f..c846e6a0bc1 100644 --- a/sky/provision/vast/utils.py +++ b/sky/provision/vast/utils.py @@ -15,7 +15,7 @@ def list_instances() -> Dict[str, Dict[str, Any]]: instance_dict: Dict[str, Dict[str, Any]] = {} for instance in instances: instance['id'] = str(instance['id']) - info = instance.copy() + info = instance if isinstance(instance['actual_status'], str): info['status'] = instance['actual_status'].upper() From 2a80ed83f9b42ff27cfcb2c362d66b9791878a61 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 14:58:06 -0800 Subject: [PATCH 11/19] Vast: Simplifying the credential files mount --- sky/clouds/vast.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index 8c8983d9094..5d66468bcd0 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -10,11 +10,6 @@ if typing.TYPE_CHECKING: from sky import resources as resources_lib -_CREDENTIAL_FILES = [ - 'config.toml', -] - - @clouds.CLOUD_REGISTRY.register class Vast(clouds.Cloud): """ Vast GPU Cloud @@ -266,8 +261,7 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]: def get_credential_file_mounts(self) -> Dict[str, str]: return { - f'~/.config/vastai/{filename}': f'~/.config/vastai/{filename}' - for filename in _CREDENTIAL_FILES + '~/.config/vastai/vast_api_key': '~/.config/vastai/vast_api_key' } @classmethod From 848f718d6287b555d60f86725f96b0261e2c6531 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 16:49:56 -0800 Subject: [PATCH 12/19] Vast: Linter cleanup --- sky/clouds/vast.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index 5d66468bcd0..cd8ffa64c21 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -10,6 +10,7 @@ if typing.TYPE_CHECKING: from sky import resources as resources_lib + @clouds.CLOUD_REGISTRY.register class Vast(clouds.Cloud): """ Vast GPU Cloud @@ -124,8 +125,6 @@ def accelerators_to_hourly_cost(self, def get_egress_cost(self, num_gigabytes: float) -> float: return 0.0 - - @classmethod def get_default_instance_type( cls, @@ -265,7 +264,7 @@ def get_credential_file_mounts(self) -> Dict[str, str]: } @classmethod - def get_user_identities(cls) -> Optional[List[str]]: + def get_user_identities(cls) -> Optional[List[List[str]]]: # NOTE: used for very advanced SkyPilot functionality # Can implement later if desired return None From f7f0bfd592a1474f3969a950f80a89ad81b442fc Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 16:54:00 -0800 Subject: [PATCH 13/19] Vast: Internal api cleanup --- sky/provision/vast/instance.py | 4 +--- sky/provision/vast/utils.py | 5 +---- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/sky/provision/vast/instance.py b/sky/provision/vast/instance.py index edf854c187d..c60c0ac0413 100644 --- a/sky/provision/vast/instance.py +++ b/sky/provision/vast/instance.py @@ -98,9 +98,7 @@ def run_instances(region: str, cluster_name_on_cloud: str, instance_type=config.node_config['InstanceType'], region=region, disk_size=config.node_config['DiskSize'], - image_name=config.node_config['ImageId'], - ports=config.ports_to_open_on_launch, - public_key=config.node_config['PublicKey']) + image_name=config.node_config['ImageId']) except Exception as e: # pylint: disable=broad-except logger.warning(f'run_instances error: {e}') raise diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py index c846e6a0bc1..525eab1e91f 100644 --- a/sky/provision/vast/utils.py +++ b/sky/provision/vast/utils.py @@ -29,15 +29,12 @@ def list_instances() -> Dict[str, Dict[str, Any]]: def launch(name: str, instance_type: str, region: str, disk_size: int, - image_name: str, ports: Optional[List[int]], public_key: str) -> str: + image_name: str) -> str: """Launches an instance with the given parameters. Converts the instance_type to the Vast GPU name, finds the specs for the GPU, and launches the instance. """ - del ports - del public_key - gpu_name = instance_type.split('-')[1].replace('_', ' ') num_gpus = int(instance_type.split('-')[0].replace('x', '')) From 577a025541fdc67c4bf4c2943e8b142d92cbb05c Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Wed, 20 Nov 2024 17:25:00 -0800 Subject: [PATCH 14/19] Vast: Adding the catalog_fetcher --- .../data_fetchers/fetch_vast.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100755 sky/clouds/service_catalog/data_fetchers/fetch_vast.py diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_vast.py b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py new file mode 100755 index 00000000000..2d8a199bdc9 --- /dev/null +++ b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +import json, re +from vastai_sdk import VastAI +import csv, sys + +def create_instance_type(obj): + stubify = lambda x: re.sub(r'\s', '_', x) + return "{}x-{}-{}".format(obj['num_gpus'], stubify(obj['gpu_name']), obj['cpu_cores']) + +def dot_get(d, key): + for k in key.split("."): + d = d[k] + return d + +# InstanceType and gpuInfo are basically just stubs +# so that the dictwriter is happy without weird +# code. +mapped_keys = ( + ('gpu_name', 'InstanceType'), + ('gpu_name', 'AcceleratorName'), + ('num_gpus', 'AcceleratorCount'), + ('cpu_cores', 'vCPUs'), + ('gpu_total_ram', 'MemoryGiB'), + ('search.totalHour', 'Price'), + ('geolocation', 'Region'), + ('gpu_name', 'GpuInfo'), + ('search.totalHour', 'SpotPrice') +) +writer = csv.DictWriter(sys.stdout, fieldnames=[x[1] for x in mapped_keys]) +writer.writeheader() + +offerList = VastAI().search_offers(limit=10000) +for offer in offerList: + entry = {} + for ours, theirs in mapped_keys: + field = dot_get(offer, ours) + if 'Price' in theirs: + field = "{:.2f}".format(field) + entry[theirs] = field + + entry['InstanceType'] = create_instance_type(offer) + + # the documentation says + # "{'gpus': [{'name': 'v100', 'manufacturer': 'nvidia', 'count': 8.0, 'memoryinfo': {'sizeinmib': 16384}}], 'totalgpumemoryinmib': 16384}", + # we can do that. + entry['MemoryGiB'] /= 1024 + entry['GpuInfo'] = json.dumps({'Gpus': [{'Name': offer['gpu_name'], 'Count': offer['num_gpus'], 'MemoryInfo': {'SizeInMiB': offer['gpu_total_ram']}}], 'TotalGpuMemoryInMiB': offer['gpu_total_ram']}).replace('"', "'") + + writer.writerow(entry) + From 0cf70be3eeefe7a2a4cee4fba5d6fe94c81e8a47 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Thu, 21 Nov 2024 14:00:35 -0800 Subject: [PATCH 15/19] Vast: Linting fixes --- .../data_fetchers/fetch_vast.py | 59 ++++++++++++------- sky/provision/vast/utils.py | 2 +- sky/setup_files/setup.py | 2 +- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_vast.py b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py index 2d8a199bdc9..de74dbd5fe8 100755 --- a/sky/clouds/service_catalog/data_fetchers/fetch_vast.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py @@ -1,31 +1,34 @@ -#!/usr/bin/env python -import json, re +"""A script that generates the Vast Cloud catalog. """ + +# pylint: disable=assignment-from-no-return +import csv +import json +import re +import sys + from vastai_sdk import VastAI -import csv, sys + def create_instance_type(obj): stubify = lambda x: re.sub(r'\s', '_', x) - return "{}x-{}-{}".format(obj['num_gpus'], stubify(obj['gpu_name']), obj['cpu_cores']) + return '{}x-{}-{}'.format(obj['num_gpus'], stubify(obj['gpu_name']), + obj['cpu_cores']) + def dot_get(d, key): - for k in key.split("."): - d = d[k] + for k in key.split('.'): + d = d[k] return d + # InstanceType and gpuInfo are basically just stubs # so that the dictwriter is happy without weird # code. -mapped_keys = ( - ('gpu_name', 'InstanceType'), - ('gpu_name', 'AcceleratorName'), - ('num_gpus', 'AcceleratorCount'), - ('cpu_cores', 'vCPUs'), - ('gpu_total_ram', 'MemoryGiB'), - ('search.totalHour', 'Price'), - ('geolocation', 'Region'), - ('gpu_name', 'GpuInfo'), - ('search.totalHour', 'SpotPrice') -) +mapped_keys = (('gpu_name', 'InstanceType'), ('gpu_name', 'AcceleratorName'), + ('num_gpus', 'AcceleratorCount'), ('cpu_cores', 'vCPUs'), + ('gpu_total_ram', 'MemoryGiB'), ('search.totalHour', 'Price'), + ('geolocation', 'Region'), ('gpu_name', 'GpuInfo'), + ('search.totalHour', 'SpotPrice')) writer = csv.DictWriter(sys.stdout, fieldnames=[x[1] for x in mapped_keys]) writer.writeheader() @@ -35,16 +38,30 @@ def dot_get(d, key): for ours, theirs in mapped_keys: field = dot_get(offer, ours) if 'Price' in theirs: - field = "{:.2f}".format(field) + field = '{:.2f}'.format(field) entry[theirs] = field entry['InstanceType'] = create_instance_type(offer) # the documentation says - # "{'gpus': [{'name': 'v100', 'manufacturer': 'nvidia', 'count': 8.0, 'memoryinfo': {'sizeinmib': 16384}}], 'totalgpumemoryinmib': 16384}", + # "{'gpus': [{ + # 'name': 'v100', + # 'manufacturer': 'nvidia', + # 'count': 8.0, + # 'memoryinfo': {'sizeinmib': 16384} + # }], + # 'totalgpumemoryinmib': 16384}", # we can do that. entry['MemoryGiB'] /= 1024 - entry['GpuInfo'] = json.dumps({'Gpus': [{'Name': offer['gpu_name'], 'Count': offer['num_gpus'], 'MemoryInfo': {'SizeInMiB': offer['gpu_total_ram']}}], 'TotalGpuMemoryInMiB': offer['gpu_total_ram']}).replace('"', "'") + entry['GpuInfo'] = json.dumps({ + 'Gpus': [{ + 'Name': offer['gpu_name'], + 'Count': offer['num_gpus'], + 'MemoryInfo': { + 'SizeInMiB': offer['gpu_total_ram'] + } + }], + 'TotalGpuMemoryInMiB': offer['gpu_total_ram'] + }).replace('"', "'") # pylint: disable=invalid-string-quote writer.writerow(entry) - diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py index 525eab1e91f..5c627b10803 100644 --- a/sky/provision/vast/utils.py +++ b/sky/provision/vast/utils.py @@ -1,6 +1,6 @@ # pylint: disable=assignment-from-no-return """Vast library wrapper for SkyPilot.""" -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List from sky import sky_logging from sky.adaptors import vast diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index d1584677fa7..a484f3c8375 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -238,7 +238,7 @@ def parse_readme(readme: str) -> str: 'fluidstack': [], # No dependencies needed for fluidstack 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace - 'vast': ['vastai_sdk'], # As of now, any version will do + 'vast': ['vastai_sdk'], # As of now, any version will do 'vsphere': [ 'pyvmomi==8.0.1.0.2', # vsphere-automation-sdk is also required, but it does not have From 3e7a9635f6cb698a17f93154280d3768fb69c50b Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Thu, 21 Nov 2024 14:09:28 -0800 Subject: [PATCH 16/19] Vast: Linting fixes --- sky/setup_files/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index a484f3c8375..32fc069768b 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -238,7 +238,7 @@ def parse_readme(readme: str) -> str: 'fluidstack': [], # No dependencies needed for fluidstack 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace - 'vast': ['vastai_sdk'], # As of now, any version will do + 'vast': ['vastai_sdk>=0.1.2'], 'vsphere': [ 'pyvmomi==8.0.1.0.2', # vsphere-automation-sdk is also required, but it does not have From 00c14326393f6fe92c9c7643e370f54289a25246 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Fri, 22 Nov 2024 12:28:45 -0800 Subject: [PATCH 17/19] Vast: ordering the ports --- sky/provision/vast/utils.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py index 5c627b10803..b0b1423da0c 100644 --- a/sky/provision/vast/utils.py +++ b/sky/provision/vast/utils.py @@ -93,8 +93,18 @@ def get_ssh_ports(cluster_name) -> List[int]: for instance in instances.values(): if instance['name'] in possible_names: - ssh_ports.append(instance['ssh_port']) + ssh_ports.append((instance['name'], instance['ssh_port'])) assert ssh_ports, ( f'Could not find any instances for cluster {cluster_name}.') + # So now we have + # [(name, port) ... ] + # + # We want to put head first and otherwise sort numerically + # and then extract the ports. + ssh_ports = list( + x[1] + for x in sorted(ssh_ports, + key=lambda x: -1 + if x[0].endswith('head') else int(x[0].split('-')[-1]))) return ssh_ports From 25b99f958b5f4cdbf3bd1ce88f34fa4a9c3876cf Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Fri, 22 Nov 2024 13:34:02 -0800 Subject: [PATCH 18/19] Vast: Updating a function signature --- sky/clouds/vast.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py index cd8ffa64c21..c4d547ea5f9 100644 --- a/sky/clouds/vast.py +++ b/sky/clouds/vast.py @@ -148,15 +148,15 @@ def get_accelerators_from_instance_type( def get_zone_shell_cmd(cls) -> Optional[str]: return None - # TODO: Function signature is different from the fluffy cloud def make_deploy_resources_variables( self, resources: 'resources_lib.Resources', cluster_name: resources_utils.ClusterName, region: 'clouds.Region', zones: Optional[List['clouds.Zone']], + num_nodes: int, dryrun: bool = False) -> Dict[str, Optional[str]]: - del zones, dryrun, cluster_name # unused + del zones, dryrun, cluster_name, num_nodes # unused r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) From d9bfbf3539a6e75a8f0d1c4f3d69e1c03ebfd8c7 Mon Sep 17 00:00:00 2001 From: Chris McKenzie Date: Fri, 22 Nov 2024 13:36:38 -0800 Subject: [PATCH 19/19] Vast: Updating unrelated code from a merge that is needed to pass a test --- sky/utils/controller_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 459ce3cec2b..4b193d9ddd7 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -98,7 +98,7 @@ class Controllers(enum.Enum): controller_type='jobs', name='managed jobs controller', candidate_cluster_names=[ - managed_job_utils.JOB_CONTROLLER_NAME, + managed_job_utils.JOB_CONTROLLER_NAME, managed_job_utils.LEGACY_JOB_CONTROLLER_NAME ], in_progress_hint=(