Skip to content

Commit

Permalink
New provisioner for RunPod (#2829)
Browse files Browse the repository at this point in the history
* init

* remove ray

* update config

* update

* update

* update

* complete bootstrapping

* add start instance

* fix

* fix

* fix

* update

* wait stopping instances

* support normal gcp tpus first

* fix gcp

* support get cluster info

* fix

* update

* wait for instance starting

* rename

* hide gcp package import

* fix

* fix

* update constants

* fix comments

* remove unused methods

* fix comments

* sync 'config' & 'constants' with upstream, Nov 16

* sync 'instace_utils' with the upstream, Nov 16

* fix typing

* parallelize provisioning

* Fix TPU node

* Fix TPU NAME env for tpu node

* implement bulk provision

* refactor selflink

* format

* reduce the sleep time for autostop

* provisioner version refactoring

* refactor

* Add logging

* avoid saving the provisioner version

* format

* format

* Fix scheduling field in config

* format

* fix public key content

* Fix provisioner version for azure

* Use ray port from head node for workers

* format

* fix ray_port

* fix smoke tests

* shorter sleep time

* refactor status refresh version

* Use new provisioner to launch runpod to avoid issue with ray autoscaler on head
Co-authored-by: Justin Merrell <[email protected]>

* Add wait for the instances to be ready

* fix setup

* Retry and give for getting internal IP

* comment

* Remove internal IP

* use external IP
TODO: use external ray port

* fix ssh port

* Unsupported feature

* typo

* fix ssh ports

* rename var

* format

* Fix cloud unsupported resources

* Runpod update name mapping (#2945)

* Avoid using GpuInfo

* fix all_regions

* Fix runpod list accelerators

* format

* revert to GpuInfo

* Fix get_feasible_launchable_resources

* Add error

* Fix optimizer random_dag for feature check

* address comments

* remove test code

* format

* Add type hints

* format

* format

* fix keyerror

* Address comments

* Fix ports

---------

Co-authored-by: Justin Merrell <[email protected]>
Co-authored-by: Siyuan <[email protected]>
Co-authored-by: Doyoung Kim <[email protected]>
  • Loading branch information
4 people committed Jan 13, 2024
1 parent 71525cd commit 9743aa0
Show file tree
Hide file tree
Showing 21 changed files with 966 additions and 23 deletions.
2 changes: 2 additions & 0 deletions sky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def get_git_commit():
Local = clouds.Local
Kubernetes = clouds.Kubernetes
OCI = clouds.OCI
RunPod = clouds.RunPod
optimize = Optimizer.optimize

__all__ = [
Expand All @@ -94,6 +95,7 @@ def get_git_commit():
'Lambda',
'Local',
'OCI',
'RunPod',
'SCP',
'Optimizer',
'OptimizeTarget',
Expand Down
29 changes: 29 additions & 0 deletions sky/adaptors/runpod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""RunPod cloud adaptor."""

import functools

_runpod_sdk = None


def import_package(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
global _runpod_sdk
if _runpod_sdk is None:
try:
import runpod as _runpod # pylint: disable=import-outside-toplevel
_runpod_sdk = _runpod
except ImportError:
raise ImportError(
'Fail to import dependencies for runpod.'
'Try pip install "skypilot[runpod]"') from None
return func(*args, **kwargs)

return wrapper


@import_package
def runpod():
"""Return the runpod package."""
return _runpod_sdk
15 changes: 15 additions & 0 deletions sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sky import skypilot_config
from sky.adaptors import gcp
from sky.adaptors import ibm
from sky.adaptors import runpod
from sky.clouds.utils import lambda_utils
from sky.utils import common_utils
from sky.utils import kubernetes_enums
Expand Down Expand Up @@ -449,3 +450,17 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
config['auth']['ssh_proxy_command'] = ssh_proxy_cmd

return config


# ---------------------------------- RunPod ---------------------------------- #
def setup_runpod_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
"""Sets up SSH authentication for RunPod.
- Generates a new SSH key pair if one does not exist.
- Adds the public SSH key to the user's RunPod account.
"""
_, public_key_path = get_or_generate_keys()
with open(public_key_path, 'r', encoding='UTF-8') as pub_key_file:
public_key = pub_key_file.read().strip()
runpod.runpod().cli.groups.ssh.functions.add_ssh_key(public_key)

return configure_ssh_info(config)
2 changes: 2 additions & 0 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,8 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str):
config = auth.setup_kubernetes_authentication(config)
elif isinstance(cloud, clouds.IBM):
config = auth.setup_ibm_authentication(config)
elif isinstance(cloud, clouds.RunPod):
config = auth.setup_runpod_authentication(config)
else:
assert isinstance(cloud, clouds.Local), cloud
# Local cluster case, authentication is already filled by the user
Expand Down
10 changes: 10 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _get_cluster_config_template(cloud):
clouds.Local: 'local-ray.yml.j2',
clouds.SCP: 'scp-ray.yml.j2',
clouds.OCI: 'oci-ray.yml.j2',
clouds.RunPod: 'runpod-ray.yml.j2',
clouds.Kubernetes: 'kubernetes-ray.yml.j2',
}
return cloud_to_template[type(cloud)]
Expand Down Expand Up @@ -2291,6 +2292,15 @@ def update_ssh_ports(self, max_attempts: int = 1) -> None:
Use this method to use any cloud-specific port fetching logic.
"""
del max_attempts # Unused.
if isinstance(self.launched_resources.cloud, clouds.RunPod):
cluster_info = provision_lib.get_cluster_info(
str(self.launched_resources.cloud).lower(),
region=self.launched_resources.region,
cluster_name_on_cloud=self.cluster_name_on_cloud,
provider_config=None)
self.stable_ssh_ports = cluster_info.get_ssh_ports()
return

head_ssh_port = 22
self.stable_ssh_ports = (
[head_ssh_port] + [22] *
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sky.clouds.lambda_cloud import Lambda
from sky.clouds.local import Local
from sky.clouds.oci import OCI
from sky.clouds.runpod import RunPod
from sky.clouds.scp import SCP

__all__ = [
Expand All @@ -28,6 +29,7 @@
'Lambda',
'Local',
'SCP',
'RunPod',
'OCI',
'Kubernetes',
'CloudImplementationFeatures',
Expand Down
274 changes: 274 additions & 0 deletions sky/clouds/runpod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
""" RunPod Cloud. """

import json
import typing
from typing import Dict, Iterator, List, Optional, Tuple

from sky import clouds
from sky.clouds import service_catalog

if typing.TYPE_CHECKING:
from sky import resources as resources_lib

_CREDENTIAL_FILES = [
'config.toml',
]


@clouds.CLOUD_REGISTRY.register
class RunPod(clouds.Cloud):
""" RunPod GPU Cloud
_REPR | The string representation for the RunPod GPU cloud object.
"""
_REPR = 'RunPod'
_CLOUD_UNSUPPORTED_FEATURES = {
clouds.CloudImplementationFeatures.STOP: 'Stopping not supported.',
clouds.CloudImplementationFeatures.SPOT_INSTANCE:
('Spot is not supported, as runpod API does not implement spot.'),
clouds.CloudImplementationFeatures.MULTI_NODE:
('Multi-node not supported yet, as the interconnection among nodes '
'are non-trivial on RunPod.'),
clouds.CloudImplementationFeatures.OPEN_PORTS:
('Opening ports is not '
'supported yet on RunPod.'),
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER:
('Customizing disk tier is not supported yet on RunPod.')
}
_MAX_CLUSTER_NAME_LEN_LIMIT = 120
_regions: List[clouds.Region] = []

PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT
STATUS_VERSION = clouds.StatusVersion.SKYPILOT

@classmethod
def _unsupported_features_for_resources(
cls, resources: 'resources_lib.Resources'
) -> Dict[clouds.CloudImplementationFeatures, str]:
"""The features not supported based on the resources provided.
This method is used by check_features_are_supported() to check if the
cloud implementation supports all the requested features.
Returns:
A dict of {feature: reason} for the features not supported by the
cloud implementation.
"""
del resources # unused
return cls._CLOUD_UNSUPPORTED_FEATURES

@classmethod
def _max_cluster_name_length(cls) -> Optional[int]:
return cls._MAX_CLUSTER_NAME_LEN_LIMIT

@classmethod
def regions_with_offering(cls, instance_type: str,
accelerators: Optional[Dict[str, int]],
use_spot: bool, region: Optional[str],
zone: Optional[str]) -> List[clouds.Region]:
assert zone is None, 'RunPod does not support zones.'
del accelerators, zone # unused
if use_spot:
return []
else:
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, 'runpod')

if region is not None:
regions = [r for r in regions if r.name == region]
return regions

@classmethod
def get_vcpus_mem_from_instance_type(
cls,
instance_type: str,
) -> Tuple[Optional[float], Optional[float]]:
return service_catalog.get_vcpus_mem_from_instance_type(instance_type,
clouds='runpod')

@classmethod
def zones_provision_loop(
cls,
*,
region: str,
num_nodes: int,
instance_type: str,
accelerators: Optional[Dict[str, int]] = None,
use_spot: bool = False,
) -> Iterator[None]:
del num_nodes # unused
regions = cls.regions_with_offering(instance_type,
accelerators,
use_spot,
region=region,
zone=None)
for r in regions:
assert r.zones is None, r
yield r.zones

def instance_type_to_hourly_cost(self,
instance_type: str,
use_spot: bool,
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_cost(instance_type,
use_spot=use_spot,
region=region,
zone=zone,
clouds='runpod')

def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
"""Returns the hourly cost of the accelerators, in dollars/hour."""
del accelerators, use_spot, region, zone # unused
return 0.0 # RunPod includes accelerators in the hourly cost.

def get_egress_cost(self, num_gigabytes: float) -> float:
return 0.0

def is_same_cloud(self, other: clouds.Cloud) -> bool:
# Returns true if the two clouds are the same cloud type.
return isinstance(other, RunPod)

@classmethod
def get_default_instance_type(
cls,
cpus: Optional[str] = None,
memory: Optional[str] = None,
disk_tier: Optional[str] = None) -> Optional[str]:
"""Returns the default instance type for RunPod."""
return service_catalog.get_default_instance_type(cpus=cpus,
memory=memory,
disk_tier=disk_tier,
clouds='runpod')

@classmethod
def get_accelerators_from_instance_type(
cls, instance_type: str) -> Optional[Dict[str, int]]:
return service_catalog.get_accelerators_from_instance_type(
instance_type, clouds='runpod')

@classmethod
def get_zone_shell_cmd(cls) -> Optional[str]:
return None

def make_deploy_resources_variables(
self, resources: 'resources_lib.Resources',
cluster_name_on_cloud: str, region: 'clouds.Region',
zones: Optional[List['clouds.Zone']]) -> Dict[str, Optional[str]]:
del zones # unused

r = resources
acc_dict = self.get_accelerators_from_instance_type(r.instance_type)
if acc_dict is not None:
custom_resources = json.dumps(acc_dict, separators=(',', ':'))
else:
custom_resources = None

return {
'instance_type': resources.instance_type,
'custom_resources': custom_resources,
'region': region.name,
}

def _get_feasible_launchable_resources(
self, resources: 'resources_lib.Resources'
) -> Tuple[List['resources_lib.Resources'], List[str]]:
"""Returns a list of feasible resources for the given resources."""
if resources.instance_type is not None:
assert resources.is_launchable(), resources
resources = resources.copy(accelerators=None)
return ([resources], [])

def _make(instance_list):
resource_list = []
for instance_type in instance_list:
r = resources.copy(
cloud=RunPod(),
instance_type=instance_type,
accelerators=None,
cpus=None,
)
resource_list.append(r)
return resource_list

# Currently, handle a filter on accelerators only.
accelerators = resources.accelerators
if accelerators is None:
# Return a default instance type
default_instance_type = RunPod.get_default_instance_type(
cpus=resources.cpus,
memory=resources.memory,
disk_tier=resources.disk_tier)
if default_instance_type is None:
return ([], [])
else:
return (_make([default_instance_type]), [])

assert len(accelerators) == 1, resources
acc, acc_count = list(accelerators.items())[0]
(instance_list, fuzzy_candidate_list
) = service_catalog.get_instance_type_for_accelerator(
acc,
acc_count,
use_spot=resources.use_spot,
cpus=resources.cpus,
region=resources.region,
zone=resources.zone,
clouds='runpod')
if instance_list is None:
return ([], fuzzy_candidate_list)
return (_make(instance_list), fuzzy_candidate_list)

@classmethod
def check_credentials(cls) -> Tuple[bool, Optional[str]]:
""" Verify that the user has valid credentials for RunPod. """
try:
import runpod # pylint: disable=import-outside-toplevel
valid, error = runpod.check_credentials()

if not valid:
return False, (
f'{error} \n' # First line is indented by 4 spaces
' Credentials can be set up by running: \n'
f' $ pip install runpod \n'
f' $ runpod store_api_key <YOUR_RUNPOD_API_KEY> \n'
' For more information, see https://docs.runpod.io/docs/skypilot' # pylint: disable=line-too-long
)

return True, None

except ImportError:
return False, ('Failed to import runpod. '
'To install, run: pip install skypilot[runpod]')

def get_credential_file_mounts(self) -> Dict[str, str]:
return {
f'~/.runpod/{filename}': f'~/.runpod/{filename}'
for filename in _CREDENTIAL_FILES
}

@classmethod
def get_current_user_identity(cls) -> Optional[List[str]]:
# NOTE: used for very advanced SkyPilot functionality
# Can implement later if desired
return None

def instance_type_exists(self, instance_type: str) -> bool:
return service_catalog.instance_type_exists(instance_type, 'runpod')

def validate_region_zone(self, region: Optional[str], zone: Optional[str]):
return service_catalog.validate_region_zone(region,
zone,
clouds='runpod')

def accelerator_in_region_or_zone(self,
accelerator: str,
acc_count: int,
region: Optional[str] = None,
zone: Optional[str] = None) -> bool:
return service_catalog.accelerator_in_region_or_zone(
accelerator, acc_count, region, zone, 'runpod')
Loading

0 comments on commit 9743aa0

Please sign in to comment.