diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index deb2307b67b..4881541206e 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -296,6 +296,16 @@ Paperspace mkdir -p ~/.paperspace echo "{'api_key' : }" > ~/.paperspace/config.json +Vast +~~~~~~~~~~ + +`Vast `__ is a cloud provider that offers low-cost GPUs. To configure Vast access, go to the `Account `_ page on your Vast console to get your **API key**. Then, run: + +.. code-block:: shell + + pip install "vastai-sdk>=0.1.3" + echo "" > ~/.vast_api_key + RunPod ~~~~~~~~~~ diff --git a/sky/__init__.py b/sky/__init__.py index 4e720d63ce0..9e052ca62f2 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -133,6 +133,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): OCI = clouds.OCI Paperspace = clouds.Paperspace RunPod = clouds.RunPod +Vast = clouds.Vast Vsphere = clouds.Vsphere Fluidstack = clouds.Fluidstack optimize = Optimizer.optimize @@ -150,6 +151,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): 'OCI', 'Paperspace', 'RunPod', + 'Vast', 'SCP', 'Vsphere', 'Fluidstack', diff --git a/sky/adaptors/vast.py b/sky/adaptors/vast.py new file mode 100644 index 00000000000..20ba72035f1 --- /dev/null +++ b/sky/adaptors/vast.py @@ -0,0 +1,29 @@ +"""Vast cloud adaptor.""" + +import functools + +_vast_sdk = None + + +def import_package(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + global _vast_sdk + + if _vast_sdk is None: + try: + import vastai_sdk as _vast # pylint: disable=import-outside-toplevel + _vast_sdk = _vast.VastAI() + except ImportError: + raise ImportError('Fail to import dependencies for vast.' + 'Try pip install "skypilot[vast]"') from None + return func(*args, **kwargs) + + return wrapper + + +@import_package +def vast(): + """Return the vast package.""" + return _vast_sdk diff --git a/sky/authentication.py b/sky/authentication.py index 2eb65bd9f6f..d72d991141e 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -43,6 +43,7 @@ from sky.adaptors import ibm from sky.adaptors import kubernetes from sky.adaptors import runpod +from sky.adaptors import vast from sky.provision.fluidstack import fluidstack_utils from sky.provision.kubernetes import utils as kubernetes_utils from sky.provision.lambda_cloud import lambda_utils @@ -473,6 +474,19 @@ def setup_runpod_authentication(config: Dict[str, Any]) -> Dict[str, Any]: return configure_ssh_info(config) +def setup_vast_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + """Sets up SSH authentication for Vast. + - Generates a new SSH key pair if one does not exist. + - Adds the public SSH key to the user's Vast account. + """ + _, public_key_path = get_or_generate_keys() + with open(public_key_path, 'r', encoding='UTF-8') as pub_key_file: + public_key = pub_key_file.read().strip() + vast.vast().create_ssh_key(ssh_key=public_key) + config['auth']['ssh_public_key'] = PUBLIC_SSH_KEY_PATH + return configure_ssh_info(config) + + def setup_fluidstack_authentication(config: Dict[str, Any]) -> Dict[str, Any]: get_or_generate_keys() diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index a3651bdba9a..cbf1e562160 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1002,6 +1002,8 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): config = auth.setup_ibm_authentication(config) elif isinstance(cloud, clouds.RunPod): config = auth.setup_runpod_authentication(config) + elif isinstance(cloud, clouds.Vast): + config = auth.setup_vast_authentication(config) elif isinstance(cloud, clouds.Fluidstack): config = auth.setup_fluidstack_authentication(config) else: diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 0c67ec6b328..75f6bc026ae 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -181,6 +181,7 @@ def _get_cluster_config_template(cloud): clouds.RunPod: 'runpod-ray.yml.j2', clouds.Kubernetes: 'kubernetes-ray.yml.j2', clouds.Vsphere: 'vsphere-ray.yml.j2', + clouds.Vast: 'vast-ray.yml.j2', clouds.Fluidstack: 'fluidstack-ray.yml.j2' } return cloud_to_template[type(cloud)] diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index c4d46e93adf..2201f41ac23 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -24,6 +24,7 @@ from sky.clouds.paperspace import Paperspace from sky.clouds.runpod import RunPod from sky.clouds.scp import SCP +from sky.clouds.vast import Vast from sky.clouds.vsphere import Vsphere __all__ = [ @@ -37,6 +38,7 @@ 'Paperspace', 'SCP', 'RunPod', + 'Vast', 'OCI', 'Vsphere', 'Kubernetes', diff --git a/sky/clouds/service_catalog/constants.py b/sky/clouds/service_catalog/constants.py index 1373fd86a03..f5f1f89b390 100644 --- a/sky/clouds/service_catalog/constants.py +++ b/sky/clouds/service_catalog/constants.py @@ -3,5 +3,5 @@ CATALOG_SCHEMA_VERSION = 'v5' CATALOG_DIR = '~/.sky/catalogs' ALL_CLOUDS = ('aws', 'azure', 'gcp', 'ibm', 'lambda', 'scp', 'oci', - 'kubernetes', 'runpod', 'vsphere', 'cudo', 'fluidstack', + 'kubernetes', 'runpod', 'vast', 'vsphere', 'cudo', 'fluidstack', 'paperspace') diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_vast.py b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py new file mode 100755 index 00000000000..3050434ea0c --- /dev/null +++ b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py @@ -0,0 +1,91 @@ +"""A script that generates the Vast Cloud catalog. """ + +# +# Due to the design of the sdk, pylint has a false +# positive for the fnctions. +# +# pylint: disable=assignment-from-no-return +import csv +import json +import re +import sys +from typing import Any, Dict + +from sky.adaptors import vast + +_map = { + 'TeslaV100': 'V100', + 'TeslaT4': 'T4', + 'TeslaP100': 'P100', + 'QRTX6000': 'RTX6000', + 'QRTX8000': 'RTX8000' +} + + +def create_instance_type(obj: Dict[str, Any]) -> str: + stubify = lambda x: re.sub(r'\s', '_', x) + return '{}x-{}-{}'.format(obj['num_gpus'], stubify(obj['gpu_name']), + obj['cpu_cores']) + + +def dot_get(d: dict, key: str) -> Any: + for k in key.split('.'): + d = d[k] + return d + + +if __name__ == '__main__': + # InstanceType and gpuInfo are basically just stubs + # so that the dictwriter is happy without weird + # code. + mapped_keys = (('gpu_name', 'InstanceType'), ('gpu_name', + 'AcceleratorName'), + ('num_gpus', 'AcceleratorCount'), ('cpu_cores', 'vCPUs'), + ('cpu_ram', 'MemoryGiB'), ('gpu_name', 'GpuInfo'), + ('search.totalHour', 'Price'), + ('search.totalHour', 'SpotPrice'), ('geolocation', 'Region')) + writer = csv.DictWriter(sys.stdout, fieldnames=[x[1] for x in mapped_keys]) + writer.writeheader() + + offerList = vast.vast().search_offers(limit=10000) + for offer in offerList: + entry = {} + for ours, theirs in mapped_keys: + field = dot_get(offer, ours) + if 'Price' in theirs: + field = '{:.2f}'.format(field) + entry[theirs] = field + + entry['InstanceType'] = create_instance_type(offer) + + # the documentation says + # "{'gpus': [{ + # 'name': 'v100', + # 'manufacturer': 'nvidia', + # 'count': 8.0, + # 'memoryinfo': {'sizeinmib': 16384} + # }], + # 'totalgpumemoryinmib': 16384}", + # we can do that. + entry['MemoryGiB'] /= 1024 + + gpu = re.sub('Ada', '-Ada', re.sub(r'\s', '', offer['gpu_name'])) + gpu = re.sub(r'(Ti|PCIE|SXM4|SXM|NVL)$', '', gpu) + gpu = re.sub(r'(RTX\d0\d0)(S|D)$', r'\1', gpu) + + if gpu in _map: + gpu = _map[gpu] + + entry['AcceleratorName'] = gpu + entry['GpuInfo'] = json.dumps({ + 'Gpus': [{ + 'Name': gpu, + 'Count': offer['num_gpus'], + 'MemoryInfo': { + 'SizeInMiB': offer['gpu_total_ram'] + } + }], + 'TotalGpuMemoryInMiB': offer['gpu_total_ram'] + }).replace('"', '\'') + + writer.writerow(entry) diff --git a/sky/clouds/service_catalog/vast_catalog.py b/sky/clouds/service_catalog/vast_catalog.py new file mode 100644 index 00000000000..14f4b2fedf2 --- /dev/null +++ b/sky/clouds/service_catalog/vast_catalog.py @@ -0,0 +1,104 @@ +""" Vast | Catalog + +This module loads the service catalog file and can be used to +query instance types and pricing information for Vast.ai. +""" + +import typing +from typing import Dict, List, Optional, Tuple, Union + +from sky.clouds.service_catalog import common +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from sky.clouds import cloud + +_df = common.read_catalog('vast/vms.csv') + + +def instance_type_exists(instance_type: str) -> bool: + return common.instance_type_exists_impl(_df, instance_type) + + +def validate_region_zone( + region: Optional[str], + zone: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.validate_region_zone_impl('vast', _df, region, zone) + + +def get_hourly_cost(instance_type: str, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the cost, or the cheapest cost among all zones for spot.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.get_hourly_cost_impl(_df, instance_type, use_spot, region, + zone) + + +def get_vcpus_mem_from_instance_type( + instance_type: str) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) + + +def get_default_instance_type(cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[str] = None) -> Optional[str]: + del disk_tier + # NOTE: After expanding catalog to multiple entries, you may + # want to specify a default instance type or family. + return common.get_instance_type_for_cpus_mem_impl(_df, cpus, memory) + + +def get_accelerators_from_instance_type( + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return common.get_accelerators_from_instance_type_impl(_df, instance_type) + + +def get_instance_type_for_accelerator( + acc_name: str, + acc_count: int, + cpus: Optional[str] = None, + memory: Optional[str] = None, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: + """Returns a list of instance types that have the given accelerator.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.get_instance_type_for_accelerator_impl(df=_df, + acc_name=acc_name, + acc_count=acc_count, + cpus=cpus, + memory=memory, + use_spot=use_spot, + region=region, + zone=zone) + + +def get_region_zones_for_instance_type(instance_type: str, + use_spot: bool) -> List['cloud.Region']: + df = _df[_df['InstanceType'] == instance_type] + return common.get_region_zones(df, use_spot) + + +# TODO: this differs from the fluffy catalog version +def list_accelerators( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True) -> Dict[str, List[common.InstanceTypeInfo]]: + """Returns all instance types in Vast offering GPUs.""" + del require_price # Unused. + return common.list_accelerators_impl('Vast', _df, gpus_only, name_filter, + region_filter, quantity_filter, + case_sensitive, all_regions) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py new file mode 100644 index 00000000000..125c022db37 --- /dev/null +++ b/sky/clouds/vast.py @@ -0,0 +1,287 @@ +""" Vast Cloud. """ + +import typing +from typing import Dict, Iterator, List, Optional, Tuple, Union + +from sky import clouds +from sky.clouds import service_catalog +from sky.utils import resources_utils + +if typing.TYPE_CHECKING: + from sky import resources as resources_lib + + +@clouds.CLOUD_REGISTRY.register +class Vast(clouds.Cloud): + """ Vast GPU Cloud + + _REPR | The string representation for the Vast GPU cloud object. + """ + _REPR = 'Vast' + _CLOUD_UNSUPPORTED_FEATURES = { + clouds.CloudImplementationFeatures.SPOT_INSTANCE: + ('Spot is not supported, as vast API does not implement spot.'), + clouds.CloudImplementationFeatures.MULTI_NODE: + ('Multi-node not supported yet, as the interconnection among nodes ' + 'are non-trivial on Vast.'), + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: + ('Customizing disk tier is not supported yet on Vast.'), + clouds.CloudImplementationFeatures.OPEN_PORTS: + ('Opening ports is currently not supported on Vast.'), + clouds.CloudImplementationFeatures.STORAGE_MOUNTING: + ('Mounting object stores is not supported on Vast.'), + } + # + # Vast doesn't have a max cluster name limit. This number + # is reasonably large and exists to play nicely with the + # other providers + # + _MAX_CLUSTER_NAME_LEN_LIMIT = 120 + _regions: List[clouds.Region] = [] + + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + OPEN_PORTS_VERSION = clouds.OpenPortsVersion.LAUNCH_ONLY + + @classmethod + def _unsupported_features_for_resources( + cls, resources: 'resources_lib.Resources' + ) -> Dict[clouds.CloudImplementationFeatures, str]: + """The features not supported based on the resources provided. + + This method is used by check_features_are_supported() to check if the + cloud implementation supports all the requested features. + + Returns: + A dict of {feature: reason} for the features not supported by the + cloud implementation. + """ + del resources # unused + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _max_cluster_name_length(cls) -> Optional[int]: + return cls._MAX_CLUSTER_NAME_LEN_LIMIT + + @classmethod + def regions_with_offering(cls, instance_type: str, + accelerators: Optional[Dict[str, int]], + use_spot: bool, region: Optional[str], + zone: Optional[str]) -> List[clouds.Region]: + assert zone is None, 'Vast does not support zones.' + del accelerators, zone # unused + if use_spot: + return [] + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'vast') + + if region is not None: + regions = [r for r in regions if r.name == region] + return regions + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + return service_catalog.get_vcpus_mem_from_instance_type(instance_type, + clouds='vast') + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[None]: + del num_nodes # unused + regions = cls.regions_with_offering(instance_type, + accelerators, + use_spot, + region=region, + zone=None) + for r in regions: + assert r.zones is None, r + yield r.zones + + def instance_type_to_hourly_cost(self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + return service_catalog.get_hourly_cost(instance_type, + use_spot=use_spot, + region=region, + zone=zone, + clouds='vast') + + def accelerators_to_hourly_cost(self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the hourly cost of the accelerators, in dollars/hour.""" + del accelerators, use_spot, region, zone # unused + return 0.0 # Vast includes accelerators in the hourly cost. + + def get_egress_cost(self, num_gigabytes: float) -> float: + return 0.0 + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None + ) -> Optional[str]: + """Returns the default instance type for Vast.""" + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='vast') + + @classmethod + def get_accelerators_from_instance_type( + cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return service_catalog.get_accelerators_from_instance_type( + instance_type, clouds='vast') + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + return None + + def make_deploy_resources_variables( + self, + resources: 'resources_lib.Resources', + cluster_name: resources_utils.ClusterName, + region: 'clouds.Region', + zones: Optional[List['clouds.Zone']], + num_nodes: int, + dryrun: bool = False) -> Dict[str, Optional[str]]: + del zones, dryrun, cluster_name, num_nodes # unused + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) + + if r.image_id is None: + image_id = 'vastai/pytorch:latest' + elif r.extract_docker_image() is not None: + image_id = r.extract_docker_image() + else: + image_id = r.image_id[r.region] + + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + 'image_id': image_id, + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources' + ) -> 'resources_utils.FeasibleResources': + """Returns a list of feasible resources for the given resources.""" + if resources.use_spot: + # TODO(kristopolous/cjm): Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + if resources.instance_type is not None: + assert resources.is_launchable(), resources + resources = resources.copy(accelerators=None) + return resources_utils.FeasibleResources([resources], [], None) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=Vast(), + instance_type=instance_type, + accelerators=None, + cpus=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type + default_instance_type = Vast.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + if default_instance_type is None: + # TODO: Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + else: + return resources_utils.FeasibleResources( + _make([default_instance_type]), [], None) + + assert len(accelerators) == 1, resources + acc, acc_count = list(accelerators.items())[0] + (instance_list, fuzzy_candidate_list + ) = service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + use_spot=resources.use_spot, + cpus=resources.cpus, + region=resources.region, + zone=resources.zone, + clouds='vast') + if instance_list is None: + return resources_utils.FeasibleResources([], fuzzy_candidate_list, + None) + return resources_utils.FeasibleResources(_make(instance_list), + fuzzy_candidate_list, None) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + """ Verify that the user has valid credentials for Vast. """ + try: + import vastai_sdk as _vast # pylint: disable=import-outside-toplevel + vast = _vast.VastAI() + + # We only support file pased credential passing + if vast.creds_source != 'FILE': + return False, ( + 'error \n' # First line is indented by 4 spaces + ' Credentials can be set up by running: \n' + ' $ pip install vastai\n' + ' $ echo [key] > ~/.vast_api_key\n' + ' For more information, see https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#vast' # pylint: disable=line-too-long + ) + + return True, None + + except ImportError: + return False, ('Failed to import vast. ' + 'To install, run: pip install skypilot[vast]') + + def get_credential_file_mounts(self) -> Dict[str, str]: + return { + '~/.config/vastai/vast_api_key': '~/.config/vastai/vast_api_key' + } + + @classmethod + def get_user_identities(cls) -> Optional[List[List[str]]]: + # NOTE: used for very advanced SkyPilot functionality + # Can implement later if desired + return None + + def instance_type_exists(self, instance_type: str) -> bool: + return service_catalog.instance_type_exists(instance_type, 'vast') + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + return service_catalog.validate_region_zone(region, zone, clouds='vast') + + @classmethod + def get_image_size(cls, image_id: str, region: Optional[str]) -> float: + # TODO: use 0.0 for now to allow all images. We should change this to + # return the docker image size. + return 0.0 diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 02a627b08a3..83452a0077b 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -22,6 +22,7 @@ from sky.provision import lambda_cloud from sky.provision import oci from sky.provision import runpod +from sky.provision import vast from sky.provision import vsphere from sky.utils import command_runner from sky.utils import timeline diff --git a/sky/provision/vast/__init__.py b/sky/provision/vast/__init__.py new file mode 100644 index 00000000000..1b19246496a --- /dev/null +++ b/sky/provision/vast/__init__.py @@ -0,0 +1,11 @@ +"""Vast provisioner for SkyPilot.""" + +from sky.provision.vast.config import bootstrap_instances +from sky.provision.vast.instance import cleanup_ports +from sky.provision.vast.instance import get_cluster_info +from sky.provision.vast.instance import query_instances +from sky.provision.vast.instance import query_ports +from sky.provision.vast.instance import run_instances +from sky.provision.vast.instance import stop_instances +from sky.provision.vast.instance import terminate_instances +from sky.provision.vast.instance import wait_instances diff --git a/sky/provision/vast/config.py b/sky/provision/vast/config.py new file mode 100644 index 00000000000..9cb337eb91d --- /dev/null +++ b/sky/provision/vast/config.py @@ -0,0 +1,11 @@ +"""Vast configuration bootstrapping.""" + +from sky.provision import common + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + """Bootstraps instances for the given cluster.""" + del region, cluster_name # unused + return config diff --git a/sky/provision/vast/instance.py b/sky/provision/vast/instance.py new file mode 100644 index 00000000000..0819bbf1a2a --- /dev/null +++ b/sky/provision/vast/instance.py @@ -0,0 +1,278 @@ +"""Vast instance provisioning.""" +import time +from typing import Any, Dict, List, Optional + +from sky import sky_logging +from sky import status_lib +from sky.provision import common +from sky.provision.vast import utils +from sky.utils import common_utils +from sky.utils import resources_utils +from sky.utils import ux_utils + +POLL_INTERVAL = 5 +QUERY_PORTS_TIMEOUT_SECONDS = 30 + +logger = sky_logging.init_logger(__name__) +# a much more convenient method +status_filter = lambda machine_dict, stat_list: { + k: v for k, v in machine_dict.items() if v['status'] in stat_list +} + + +def _filter_instances(cluster_name_on_cloud: str, + status_filters: Optional[List[str]], + head_only: bool = False) -> Dict[str, Any]: + + instances = utils.list_instances() + possible_names = [f'{cluster_name_on_cloud}-head'] + if not head_only: + possible_names.append(f'{cluster_name_on_cloud}-worker') + + filtered_instances = {} + for instance_id, instance in instances.items(): + if (status_filters is not None and + instance['status'] not in status_filters): + continue + if instance.get('name') in possible_names: + filtered_instances[instance_id] = instance + return filtered_instances + + +def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: + for inst_id, inst in instances.items(): + if inst['name'].endswith('-head'): + return inst_id + return None + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster.""" + pending_status = ['CREATED', 'RESTARTING'] + + created_instance_ids = [] + instances: Dict[str, Any] = {} + + while True: + instances = _filter_instances(cluster_name_on_cloud) + if not status_filter(instances, pending_status): + break + logger.info(f'Waiting for {len(instances)} instances to be ready.') + time.sleep(POLL_INTERVAL) + + running_instances = status_filter(instances, ['RUNNING']) + head_instance_id = _get_head_instance_id(running_instances) + stopped_instances = status_filter(instances, ['EXITED', 'STOPPED']) + + if config.resume_stopped_nodes and stopped_instances: + for instance in stopped_instances.values(): + utils.start(instance['id']) + else: + to_start_count = config.count - (len(running_instances) + + len(stopped_instances)) + if to_start_count < 0: + raise RuntimeError(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes,' + f'but {config.count} are required.') + if to_start_count == 0: + if head_instance_id is None: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} has no head node.') + logger.info( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes, no need to start more.') + return common.ProvisionRecord(provider_name='vast', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=[]) + + for _ in range(to_start_count): + node_type = 'head' if head_instance_id is None else 'worker' + try: + instance_id = utils.launch( + name=f'{cluster_name_on_cloud}-{node_type}', + instance_type=config.node_config['InstanceType'], + region=region, + disk_size=config.node_config['DiskSize'], + image_name=config.node_config['ImageId']) + except Exception as e: # pylint: disable=broad-except + logger.warning(f'run_instances error: {e}') + raise + logger.info(f'Launched instance {instance_id}.') + created_instance_ids.append(instance_id) + if head_instance_id is None: + head_instance_id = instance_id + + # Wait for instances to be ready. + while True: + instances = _filter_instances(cluster_name_on_cloud, ['RUNNING']) + ready_instance_cnt = 0 + for instance_id, instance in instances.items(): + if instance.get('ssh_port') is not None: + ready_instance_cnt += 1 + logger.info('Waiting for instances to be ready: ' + f'({ready_instance_cnt}/{config.count}).') + if ready_instance_cnt == config.count: + break + + time.sleep(POLL_INTERVAL) + + head_instance_id = _get_head_instance_id(utils.list_instances()) + assert head_instance_id is not None, 'head_instance_id should not be None' + return common.ProvisionRecord(provider_name='vast', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=created_instance_ids) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del region, cluster_name_on_cloud, state + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + return action_instances('stop', cluster_name_on_cloud, provider_config, + worker_only) + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + return action_instances('remove', cluster_name_on_cloud, provider_config, + worker_only) + + +def action_instances( + fn: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + del provider_config # unused + instances = _filter_instances(cluster_name_on_cloud, None) + for inst_id, inst in instances.items(): + logger.debug(f'Instance {fn} {inst_id}: {inst}') + if worker_only and inst['name'].endswith('-head'): + continue + try: + getattr(utils, fn)(inst_id) + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Failed to {fn} instance {inst_id}: ' + f'{common_utils.format_exception(e, use_bracket=False)}' + ) from e + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo: + del region # unused + running_instances = _filter_instances(cluster_name_on_cloud, ['RUNNING']) + instances: Dict[str, List[common.InstanceInfo]] = {} + head_instance_id = None + for instance_id, instance_info in running_instances.items(): + instances[instance_id] = [ + common.InstanceInfo( + instance_id=instance_id, + internal_ip=instance_info['local_ipaddrs'].strip(), + external_ip=instance_info['public_ipaddr'], + ssh_port=instance_info['ports']['22/tcp'][0]['HostPort'], + tags={}, + ) + ] + if instance_info['name'].endswith('-head'): + head_instance_id = instance_id + + return common.ClusterInfo( + instances=instances, + head_instance_id=head_instance_id, + provider_name='vast', + provider_config=provider_config, + ) + +def open_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + raise NotImplementedError('open_ports is not supported for Vast') + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = _filter_instances(cluster_name_on_cloud, None) + # "running", "frozen", "stopped", "unknown", "loading" + status_map = { + 'LOADING': status_lib.ClusterStatus.INIT, + 'EXITED': status_lib.ClusterStatus.STOPPED, + 'STOPPED': status_lib.ClusterStatus.STOPPED, + 'RUNNING': status_lib.ClusterStatus.UP, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for inst_id, inst in instances.items(): + status = status_map[inst['status']] + if non_terminated_only and status is None: + continue + statuses[inst_id] = status + return statuses + + +def cleanup_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, ports, provider_config # Unused. + + +def query_ports( + cluster_name_on_cloud: str, + ports: List[str], + head_ip: Optional[str] = None, + provider_config: Optional[Dict[str, Any]] = None, +) -> Dict[int, List[common.Endpoint]]: + """See sky/provision/__init__.py""" + del head_ip, provider_config # Unused. + + start_time = time.time() + ports_to_query = resources_utils.port_ranges_to_set(ports) + while True: + instances = _filter_instances(cluster_name_on_cloud, + None, + head_only=True) + assert len(instances) == 1 + head_inst = list(instances.values())[0] + ready_ports: Dict[int, List[common.Endpoint]] = { + port: [common.SocketEndpoint(**endpoint)] + for port, endpoint in head_inst['port2endpoint'].items() + if port in ports_to_query + } + not_ready_ports = ports_to_query - set(ready_ports.keys()) + if not not_ready_ports: + return ready_ports + if time.time() - start_time > QUERY_PORTS_TIMEOUT_SECONDS: + logger.warning(f'Querying ports {ports} timed out. Ports ' + f'{not_ready_ports} are not ready.') + return ready_ports + time.sleep(1) diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py new file mode 100644 index 00000000000..77ff790872c --- /dev/null +++ b/sky/provision/vast/utils.py @@ -0,0 +1,144 @@ +# pylint: disable=assignment-from-no-return +# +# The pylint exception above is an accomodation for +# false positives generated by pylint for the Vast +# python sdk. +# +"""Vast library wrapper for SkyPilot.""" +from typing import Any, Dict, List + +from sky import sky_logging +from sky.adaptors import vast + +logger = sky_logging.init_logger(__name__) + + +def list_instances() -> Dict[str, Dict[str, Any]]: + """Lists instances associated with API key.""" + instances = vast.vast().show_instances() + + instance_dict: Dict[str, Dict[str, Any]] = {} + for instance in instances: + instance['id'] = str(instance['id']) + info = instance + + if isinstance(instance['actual_status'], str): + info['status'] = instance['actual_status'].upper() + else: + info['status'] = 'UNKNOWN' + info['name'] = instance['label'] + + instance_dict[instance['id']] = info + + return instance_dict + + +def launch(name: str, instance_type: str, region: str, disk_size: int, + image_name: str) -> str: + """Launches an instance with the given parameters. + + Converts the instance_type to the Vast GPU name, finds the specs for the + GPU, and launches the instance. + + Notes: + + * `disk_size`: we look for instances that are of the requested + size or greater than it. For instance, `disk_size=100` might + return something with `disk_size` at 102 or even 1000. + + The disk size {xx} GB is not exactly matched the requested + size {yy} GB. It is possible to charge extra cost on disk. + + * `geolocation`: Geolocation on Vast can be as specific as the + host chooses to be. They can say, for instance, "YutakachŨ, + Shinagawa District, Tokyo, JP." Such a specific geolocation + as ours would fail to return this host in a simple string + comparison if a user searched for "JP". + + Since regardless of specificity, all our geolocations end + in two-letter country codes we just snip that to conform + to how many providers state their geolocation. + + * Since the catalog is cached, we can't gaurantee availability + of any machine at the point of inquiry. As a consequence we + search for the machine again and potentially return a failure + if there is no availability. + + * Vast instance types are an invention for skypilot. Refer to + service_catalog/vast_catalog.py for the current construction + of the type. + + """ + gpu_name = instance_type.split('-')[1].replace('_', ' ') + num_gpus = int(instance_type.split('-')[0].replace('x', '')) + + query = ' '.join([ + f'geolocation="{region[-2:]}"', + f'disk_space>={disk_size}', + f'num_gpus={num_gpus}', + f'gpu_name="{gpu_name}"', + ]) + + instance_list = vast.vast().search_offers(query=query) + + if isinstance(instance_list, int) or len(instance_list) == 0: + return '' + + instance_touse = instance_list[0] + + new_instance_contract = vast.vast().create_instance( + id=instance_touse['id'], + direct=True, + ssh=True, + env='-e __SOURCE=skypilot', + onstart_cmd='touch ~/.no_auto_tmux;apt install lsof', + label=name, + image=image_name) + + new_instance = vast.vast().show_instance( + id=new_instance_contract['new_contract']) + + return new_instance['id'] + + +def start(instance_id: str) -> None: + """Starts the given instance.""" + vast.vast().start_instance(id=instance_id) + + +def stop(instance_id: str) -> None: + """Stops the given instance.""" + vast.vast().stop_instance(id=instance_id) + + +def remove(instance_id: str) -> None: + """Terminates the given instance.""" + vast.vast().destroy_instance(id=instance_id) + + +def get_ssh_ports(cluster_name) -> List[int]: + """Gets the SSH ports for the given cluster.""" + logger.debug(f'Getting SSH ports for cluster {cluster_name}.') + + instances = list_instances() + possible_names = [f'{cluster_name}-head', f'{cluster_name}-worker'] + + ssh_ports = [] + + for instance in instances.values(): + if instance['name'] in possible_names: + ssh_ports.append((instance['name'], instance['ssh_port'])) + assert ssh_ports, ( + f'Could not find any instances for cluster {cluster_name}.') + + # So now we have + # [(name, port) ... ] + # + # We want to put head first and otherwise sort numerically + # and then extract the ports. + ssh_ports = list( + x[1] + for x in sorted(ssh_ports, + key=lambda x: -1 + if x[0].endswith('head') else int(x[0].split('-')[-1]))) + return ssh_ports diff --git a/sky/templates/vast-ray.yml.j2 b/sky/templates/vast-ray.yml.j2 new file mode 100644 index 00000000000..9c856d0f210 --- /dev/null +++ b/sky/templates/vast-ray.yml.j2 @@ -0,0 +1,78 @@ +cluster_name: {{cluster_name_on_cloud}} + +# The maximum number of workers nodes to launch in addition to the head node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + +provider: + type: external + module: sky.provision.vast + region: "{{region}}" + disable_launch_config_check: true + +auth: + ssh_user: root + ssh_private_key: {{ssh_private_key}} + +available_node_types: + ray_head_default: + resources: {} + node_config: + InstanceType: {{instance_type}} + DiskSize: {{disk_size}} + ImageId: {{image_id}} + PublicKey: |- + skypilot:ssh_public_key_content + +head_node_type: ray_head_default + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +rsync_exclude: [] + +initialization_commands: [] + +# List of shell commands to run to set up nodes. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 1 +setup_commands: + # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'rm ..': there is another installation of pip. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - {%- for initial_setup_command in initial_setup_commands %} + {{ initial_setup_command }} + {%- endfor %} + sudo systemctl stop unattended-upgrades || true; + sudo systemctl disable unattended-upgrades || true; + sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true; + sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1` || true; + sudo pkill -9 apt-get; + sudo pkill -9 dpkg; + sudo dpkg --configure -a; + mkdir -p ~/.ssh; touch ~/.ssh/config; + {{ conda_installation_commands }} + {{ ray_skypilot_installation_commands }} + touch ~/.sudo_as_admin_successful; + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); + +# Command to start ray clusters are now placed in `sky.provision.instance_setup`. +# We do not need to list it here anymore. diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 0166a16ff16..4b193d9ddd7 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -25,9 +25,7 @@ from sky.jobs import utils as managed_job_utils from sky.serve import constants as serve_constants from sky.serve import serve_utils -from sky.setup_files import dependencies from sky.skylet import constants -from sky.skylet import log_lib from sky.utils import common_utils from sky.utils import env_options from sky.utils import rich_utils @@ -189,49 +187,79 @@ def from_type(cls, controller_type: str) -> Optional['Controllers']: # Install cli dependencies. Not using SkyPilot wheels because the wheel # can be cleaned up by another process. +# TODO(zhwu): Keep the dependencies align with the ones in setup.py def _get_cloud_dependencies_installation_commands( controller: Controllers) -> List[str]: + # TODO(tian): Make dependency installation command a method of cloud + # class and get all installation command for enabled clouds. + commands = [] # We use / instead of strong formatting, as we need to update # the at the end of the for loop, and python does not support # partial string formatting. prefix_str = ('[/] Check & install cloud dependencies ' 'on controller: ') - commands: List[str] = [] # This is to make sure the shorter checking message does not have junk # characters from the previous message. - empty_str = ' ' * 20 - - # All python dependencies will be accumulated and then installed in one - # command at the end. This is very fast if the packages are already - # installed, so we don't check that. - python_packages: Set[str] = set() - - step_prefix = prefix_str.replace('', str(len(commands) + 1)) - commands.append(f'echo -en "\\r{step_prefix}uv{empty_str}" &&' - f'{constants.SKY_UV_INSTALL_CMD} >/dev/null 2>&1') - + empty_str = ' ' * 10 + aws_dependencies_installation = ( + 'pip list | grep boto3 > /dev/null 2>&1 || pip install ' + 'botocore>=1.29.10 boto3>=1.26.1; ' + # Need to separate the installation of awscli from above because some + # other clouds will install boto3 but not awscli. + 'pip list | grep awscli> /dev/null 2>&1 || pip install "urllib3<2" ' + 'awscli>=1.27.10 "colorama<0.4.5" > /dev/null 2>&1') + setup_clouds: List[str] = [] for cloud in sky_check.get_cached_enabled_clouds_or_refresh(): - cloud_python_dependencies: List[str] = copy.deepcopy( - dependencies.extras_require[cloud.canonical_name()]) - - if isinstance(cloud, clouds.Azure): - # azure-cli cannot be normally installed by uv. - # See comments in sky/skylet/constants.py. - cloud_python_dependencies.remove(dependencies.AZURE_CLI) - - step_prefix = prefix_str.replace('', str(len(commands) + 1)) + if isinstance( + clouds, + (clouds.Lambda, clouds.SCP, clouds.Fluidstack, clouds.Paperspace)): + # no need to install any cloud dependencies for lambda, scp, + # fluidstack and paperspace + continue + if isinstance(cloud, clouds.AWS): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append(f'echo -en "\\r{step_prefix}AWS{empty_str}" && ' + + aws_dependencies_installation) + setup_clouds.append(str(cloud)) + elif isinstance(cloud, clouds.Azure): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append( + f'echo -en "\\r{step_prefix}Azure{empty_str}" && ' + 'pip list | grep azure-cli > /dev/null 2>&1 || ' + 'pip install "azure-cli>=2.31.0" azure-core ' + '"azure-identity>=1.13.0" azure-mgmt-network > /dev/null 2>&1') + # Have to separate this installation of az blob storage from above + # because this is newly-introduced and not part of azure-cli. We + # need a separate installed check for this. commands.append( - f'echo -en "\\r{step_prefix}azure-cli{empty_str}" &&' - f'{constants.SKY_UV_PIP_CMD} install --prerelease=allow ' - f'"{dependencies.AZURE_CLI}" > /dev/null 2>&1') + 'pip list | grep azure-storage-blob > /dev/null 2>&1 || ' + 'pip install azure-storage-blob msgraph-sdk > /dev/null 2>&1') + setup_clouds.append(str(cloud)) elif isinstance(cloud, clouds.GCP): - step_prefix = prefix_str.replace('', str(len(commands) + 1)) - commands.append(f'echo -en "\\r{step_prefix}GCP SDK{empty_str}" &&' - f'{gcp.GOOGLE_SDK_INSTALLATION_COMMAND}') + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append( + f'echo -en "\\r{step_prefix}GCP{empty_str}" && ' + 'pip list | grep google-api-python-client > /dev/null 2>&1 || ' + 'pip install "google-api-python-client>=2.69.0" ' + '> /dev/null 2>&1') + # Have to separate the installation of google-cloud-storage from + # above because for a VM launched on GCP, the VM may have + # google-api-python-client installed alone. + commands.append( + 'pip list | grep google-cloud-storage > /dev/null 2>&1 || ' + 'pip install google-cloud-storage > /dev/null 2>&1') + commands.append(f'{gcp.GOOGLE_SDK_INSTALLATION_COMMAND}') + setup_clouds.append(str(cloud)) elif isinstance(cloud, clouds.Kubernetes): - step_prefix = prefix_str.replace('', str(len(commands) + 1)) + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) commands.append( f'echo -en "\\r{step_prefix}Kubernetes{empty_str}" && ' + 'pip list | grep kubernetes > /dev/null 2>&1 || ' + 'pip install "kubernetes>=20.0.0" > /dev/null 2>&1 &&' # Install k8s + skypilot dependencies 'sudo bash -c "if ' '! command -v curl &> /dev/null || ' @@ -247,36 +275,61 @@ def _get_cloud_dependencies_installation_commands( '/bin/linux/amd64/kubectl" && ' 'sudo install -o root -g root -m 0755 ' 'kubectl /usr/local/bin/kubectl))') + setup_clouds.append(str(cloud)) elif isinstance(cloud, clouds.Cudo): - step_prefix = prefix_str.replace('', str(len(commands) + 1)) + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) commands.append( - f'echo -en "\\r{step_prefix}cudoctl{empty_str}" && ' + f'echo -en "\\r{step_prefix}Cudo{empty_str}" && ' + 'pip list | grep cudo-compute > /dev/null 2>&1 || ' + 'pip install "cudo-compute>=0.1.10" > /dev/null 2>&1 && ' 'wget https://download.cudo.org/compute/cudoctl-0.3.2-amd64.deb -O ~/cudoctl.deb > /dev/null 2>&1 && ' # pylint: disable=line-too-long 'sudo dpkg -i ~/cudoctl.deb > /dev/null 2>&1') - elif isinstance(cloud, clouds.IBM): - if controller != Controllers.JOBS_CONTROLLER: - # We only need IBM deps on the jobs controller. - cloud_python_dependencies = [] - - python_packages.update(cloud_python_dependencies) - + setup_clouds.append(str(cloud)) + elif isinstance(cloud, clouds.RunPod): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append(f'echo -en "\\r{step_prefix}RunPod{empty_str}" && ' + 'pip list | grep runpod > /dev/null 2>&1 || ' + 'pip install "runpod>=1.5.1" > /dev/null 2>&1') + setup_clouds.append(str(cloud)) + elif isinstance(cloud, clouds.OCI): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append(f'echo -en "\\r{prefix_str}OCI{empty_str}" && ' + 'pip list | grep oci > /dev/null 2>&1 || ' + 'pip install oci > /dev/null 2>&1') + setup_clouds.append(str(cloud)) + elif isinstance(cloud, clouds.Vast): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append(f'echo -en "\\r{step_prefix}Vast{empty_str}" && ' + 'pip list | grep vastai_sdk > /dev/null 2>&1 || ' + 'pip install "vastai_sdk>=0.1.2" > /dev/null 2>&1') + setup_clouds.append(str(cloud)) + if controller == Controllers.JOBS_CONTROLLER: + if isinstance(cloud, clouds.IBM): + step_prefix = prefix_str.replace('', + str(len(setup_clouds) + 1)) + commands.append( + f'echo -en "\\r{step_prefix}IBM{empty_str}" ' + '&& pip list | grep ibm-cloud-sdk-core > /dev/null 2>&1 || ' + 'pip install ibm-cloud-sdk-core ibm-vpc ' + 'ibm-platform-services ibm-cos-sdk > /dev/null 2>&1') + setup_clouds.append(str(cloud)) if (cloudflare.NAME in storage_lib.get_cached_enabled_storage_clouds_or_refresh()): - python_packages.update(dependencies.extras_require['cloudflare']) + step_prefix = prefix_str.replace('', str(len(setup_clouds) + 1)) + commands.append( + f'echo -en "\\r{step_prefix}Cloudflare{empty_str}" && ' + + aws_dependencies_installation) + setup_clouds.append(cloudflare.NAME) - packages_string = ' '.join([f'"{package}"' for package in python_packages]) - step_prefix = prefix_str.replace('', str(len(commands) + 1)) - commands.append( - f'echo -en "\\r{step_prefix}cloud python packages{empty_str}" && ' - f'{constants.SKY_UV_PIP_CMD} install {packages_string} > /dev/null 2>&1' - ) - - total_commands = len(commands) finish_prefix = prefix_str.replace('[/] ', ' ') commands.append(f'echo -e "\\r{finish_prefix}done.{empty_str}"') - commands = [ - command.replace('', str(total_commands)) for command in commands + command.replace('', str(len(setup_clouds))) + for command in commands ] return commands @@ -334,19 +387,11 @@ def download_and_stream_latest_job_log( else: log_dir = list(log_dirs.values())[0] log_file = os.path.join(log_dir, 'run.log') + # Print the logs to the console. - # TODO(zhwu): refactor this into log_utils, along with the - # refactoring for the log_lib.tail_logs. try: with open(log_file, 'r', encoding='utf-8') as f: - # Stream the logs to the console without reading the whole - # file into memory. - start_streaming = False - for line in f: - if log_lib.LOG_FILE_START_STREAMING_AT in line: - start_streaming = True - if start_streaming: - print(line, end='', flush=True) + print(f.read()) except FileNotFoundError: logger.error('Failed to find the logs for the user ' f'program at {log_file}.') diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 79920147adb..9d05742e9fb 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2352,6 +2352,23 @@ def test_paperspace_http_server_with_custom_ports(): run_one_test(test) +# ---------- Web apps with custom ports on Vast. ---------- +@pytest.mark.vast +def test_vast_http_server_with_custom_ports(): + name = _get_cluster_name() + test = Test( + 'vast_http_server_with_custom_ports', + [ + f'sky launch -y -d -c {name} --cloud vast examples/http_server_with_custom_ports/task.yaml', + f'until SKYPILOT_DEBUG=0 sky status --endpoint 33828 {name}; do sleep 10; done', + # Retry a few times to avoid flakiness in ports being open. + f'ip=$(SKYPILOT_DEBUG=0 sky status --endpoint 33828 {name}); success=false; for i in $(seq 1 5); do if curl $ip | grep "

This is a demo HTML page.

"; then success=true; break; fi; sleep 10; done; if [ "$success" = false ]; then exit 1; fi', + ], + f'sky down -y {name}', + ) + run_one_test(test) + + # ---------- Web apps with custom ports on RunPod. ---------- @pytest.mark.runpod def test_runpod_http_server_with_custom_ports():