Skip to content

Commit

Permalink
[K8s] Zero config networking for Kubernetes (#2500)
Browse files Browse the repository at this point in the history
* Working Ray K8s node provider based on SSH

* wip

* working provisioning with SkyPilot and ssh config

* working provisioning with SkyPilot and ssh config

* Updates to master

* ray2.3

* Clean up docs

* multiarch build

* hacking around ray start

* more port fixes

* fix up default instance selection

* fix resource selection

* Add provisioning timeout by checking if pods are ready

* Working mounting

* Remove catalog

* fixes

* fixes

* Fix ssh-key auth to create unique secrets

* Fix for ContainerCreating timeout

* Fix head node ssh port caching

* mypy

* lint

* fix ports

* typo

* cleanup

* cleanup

* wip

* Update setup

* readme updates

* lint

* Fix failover

* Fix failover

* optimize setup

* Fix sync down logs for k8s

* test wip

* instance name parsing wip

* Fix instance name parsing

* Merge fixes for query_status

* [k8s_cloud] Delete k8s service resources. (#2105)

Delete k8s service resources.

- 'sky down' for Kubernetes cloud to remove cluster service resources.

* Status refresh WIP

* refactor to kubernetes adaptor

* tests wip

* clean up auth

* wip tests

* cli

* cli

* sky local up/down cli

* cli

* lint

* lint

* lint

* Speed up kind cluster creation

* tests

* lint

* tests

* handling for non-reachable clusters

* Invalid kubeconfig handling

* Timeout for sky check

* code cleanup

* lint

* Do not raise error if GPUs requested, return empty list

* Address comments

* comments

* lint

* Remove public key upload

* GPU support init

* wip

* add shebang

* comments

* change permissions

* remove chmod

* merge 2241

* add todo

* Handle kube config management for sky local commands (#2253)

* Set current-context (if availablee) after sky local down and remove incorrect prompt in sky local up

* Warn user of kubeconfig context switch during sky local up

* Use Optional instead of Union

* Switch context in create_cluster if cluster already exists.

* fix typo

* update sky check error msg after sky local down

* lint

* update timeout check

* fix import error

* Fix kube API access from within cluster (load_incluster_auth)

* lint

* lint

* working autodown and sky status -r

* lint

* add test_kubernetes_autodown

* lint

* address comments

* address comments

* lint

* deletion timeouts wip

* [k8s_cloud] Ray pod not created under current context namespace. (#2302)

'namespace' exists under 'context' key.

* head ssh port namespace fix

* [k8s-cloud] Typo in sky local --help. (#2308)

Typo.

* [k8s-cloud] Set build_image.sh to be executable. (#2307)

* Set build_image.sh to be executable.

* Use TAG to easily switch between registries.

* remove ingress

* remove debug statements

* UX and readme updates

* lint

* fix logging for 409 retry

* lint

* lint

* Debug dockerfile

* wip

* Fix GPU image

* Query cloud specific env vars in task setup (#2347)

* Query cloud specific env vars in task setup

* Make query_env_vars specific to Kubernetes cloud

* Address PR comments

* working GPU type selection for GKE and EKS. GFD needs work.

* TODO for auto-detection

* Add image toggling for CPU/GPU

* Add image toggling for CPU/GPU

* Fix none acce_type

* remove memory from j2

* Make resnet examples run again

* lint

* v100 readme

* dockerfile and smoketest

* fractional cpu and mem

* nits

* refactor utils

* lint and cleanup

* lint and cleanup

* lint and cleanup

* lint and cleanup

* lint and cleanup

* lint and cleanup

* lint

* lint

* manual lint

* manual isort

* test readme update

* Remove EKS

* lint

* add gpu labeler

* updates

* lint

* update script

* ux

* fix formatter

* test update

* test update

* fix test_optimizer_dryruns

* docs

* cleanup

* test readme update

* lint

* lint

* [k8s_cloud_beta1] Add sshjump host support. (#2369)

* Update build image

* fix image path

* fix merge

* cleanup

* lint

* fix utils ref

* typo

* refactor pod creation

* lint

* merge fixes

* portfix

* merge fixes

* [k8s_cloud_beta1] Sky down for a cluster deployed in Kubernetes to possibly remove sshjump pod. (#2425)

* Sky down for a kubernetes cluster to possibly terminate sshjump pod.

- If the related sshjump pod is being reported as its main container
  not have been started, then remove its pod and service. This is to
  minimize the chances for remaining with dangling sshjump pod.

* Remove sshjump service in case of an failure to analyze sshjump.

- remove _request_timeout as it might not be needed due to
  terminationGracePeriodSeconds being set in sshjump template.

* Move sshjump analysis to kubernetes_utils.

* Apply changes per ./format.sh.

* Minor comment rephrase.

* Use sshjump_name from ray pod label.

- rather than from clouds.Kubernetes

* cleanup

* Add networking benchmarks

* comment

* comment

* lint

* autodown fixes

* lint

* fix label

* [k8s_cloud_beta1] Adding support for ssh using kubectl port-forward to access k8s instance (#2412)

* Add sshjump support.

* Update lcm script.

- add comments
- rename variables
- typo

* Set imagePullPolicy to IfNotPresent.

* add support for port-forward

* remove unused

* comments

* Disable ControlMaster for ssh_options_list

* nit

* update to disable rest of the ControlMaster

* command runner rsync update

* relocating run_on_k8s

* relocate run_on_k8s

* Make Kubernetes specific env variables available when joining a cluster via SSH

* merge k8s_cloud_beta1

* format

* remove redundant utils.py

* format and comments

* update with proxy_to_k8s

* Update sky/authentication.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* resolving comments on structures

* Update sky/utils/command_runner.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* document on nodeport/port-forward proxycommand

* error handling when socat is not installed

* removing KUBECONFIG from port-forward shell script

* nit

* nit

* Add suport for nodeport

* Update sky/utils/kubernetes_utils.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* update

* switch svc when conflicting jump pod svc exist

* format

* Update sky/utils/kubernetes_utils.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* refactoring check for socat

* resolve comments

* add ServiceType enum and port-forward proxy script

* update k8s env var access

* add check for container status remove unused func

* nit

* update get_external_ip for portforward mode

* conditionally use sudo and quote values of env var

---------

Co-authored-by: Avi Weit <[email protected]>
Co-authored-by: hemildesai <[email protected]>
Co-authored-by: Romil Bhardwaj <[email protected]>

* refactor

* fix

* updates

* lint

* Update sky/skylet/providers/kubernetes/node_provider.py

* fix test

* [k8s] Showing reasons for provisioning failure in K8s (#2422)

* surface provision failure message

* nit

* nit

* format

* nit

* CPU message fix

* update Insufficient memory handling

* nit

* nit

* Update sky/skylet/providers/kubernetes/node_provider.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* Update sky/skylet/providers/kubernetes/node_provider.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* Update sky/skylet/providers/kubernetes/node_provider.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* Update sky/skylet/providers/kubernetes/node_provider.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* format

* update gpu failure message and condition

* fix GPU handling cases

* fix

* comment

* nit

* add try except block with general error handling

---------

Co-authored-by: Romil Bhardwaj <[email protected]>

* cleanup

* lint

* fix for ssh jump image_id

* comments

* ssh jump refactor

* lint

* image build fixes

---------

Co-authored-by: Avi Weit <[email protected]>
Co-authored-by: Hemil Desai <[email protected]>
Co-authored-by: Doyoung Kim <[email protected]>
  • Loading branch information
4 people authored Sep 16, 2023
1 parent 578b36d commit f0d3dfc
Show file tree
Hide file tree
Showing 21 changed files with 1,156 additions and 160 deletions.
3 changes: 3 additions & 0 deletions Dockerfile_k8s
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ RUN cd /skypilot/ && \
sudo mv -v sky/setup_files/* . && \
pip install ".[aws]"

# Set PYTHONUNBUFFERED=1 to have Python print to stdout/stderr immediately
ENV PYTHONUNBUFFERED=1

# Set WORKDIR and initialize conda for sky user
WORKDIR /home/sky
RUN conda init
3 changes: 3 additions & 0 deletions Dockerfile_k8s_gpu
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ RUN cd /skypilot/ && \
sudo mv -v sky/setup_files/* . && \
pip install ".[aws]"

# Set PYTHONUNBUFFERED=1 to have Python print to stdout/stderr immediately
ENV PYTHONUNBUFFERED=1

# Set WORKDIR and initialize conda for sky user
WORKDIR /home/sky
RUN conda init
43 changes: 43 additions & 0 deletions sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@

from sky import clouds
from sky import sky_logging
from sky import skypilot_config
from sky.adaptors import gcp
from sky.adaptors import ibm
from sky.skylet.providers.lambda_cloud import lambda_utils
from sky.utils import common_utils
from sky.utils import kubernetes_utils
from sky.utils import subprocess_utils
from sky.utils import ux_utils

Expand Down Expand Up @@ -377,6 +379,21 @@ def setup_scp_authentication(config: Dict[str, Any]) -> Dict[str, Any]:


def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
# Default ssh session is established with kubectl port-forwarding with
# ClusterIP service.
nodeport_mode = kubernetes_utils.KubernetesNetworkingMode.NODEPORT
port_forward_mode = kubernetes_utils.KubernetesNetworkingMode.PORTFORWARD
network_mode_str = skypilot_config.get_nested(('kubernetes', 'networking'),
port_forward_mode.value)
try:
network_mode = kubernetes_utils.KubernetesNetworkingMode.from_str(
network_mode_str)
except ValueError as e:
# Add message saying "Please check: ~/.sky/config.yaml" to the error
# message.
with ux_utils.print_exception_no_traceback():
raise ValueError(str(e) + ' Please check: ~/.sky/config.yaml.') \
from None
get_or_generate_keys()

# Run kubectl command to add the public key to the cluster.
Expand All @@ -403,4 +420,30 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
logger.error(suffix)
raise

ssh_jump_name = clouds.Kubernetes.SKY_SSH_JUMP_NAME
if network_mode == nodeport_mode:
service_type = kubernetes_utils.KubernetesServiceType.NODEPORT
elif network_mode == port_forward_mode:
kubernetes_utils.check_port_forward_mode_dependencies()
# Using `kubectl port-forward` creates a direct tunnel to jump pod and
# does not require opening any ports on Kubernetes nodes. As a result,
# the service can be a simple ClusterIP service which we access with
# `kubectl port-forward`.
service_type = kubernetes_utils.KubernetesServiceType.CLUSTERIP
else:
# This should never happen because we check for this in from_str above.
raise ValueError(f'Unsupported networking mode: {network_mode_str}')
# Setup service for SSH jump pod. We create the SSH jump service here
# because we need to know the service IP address and port to set the
# ssh_proxy_command in the autoscaler config.
namespace = kubernetes_utils.get_current_kube_config_context_namespace()
kubernetes_utils.setup_ssh_jump_svc(ssh_jump_name, namespace, service_type)

ssh_proxy_cmd = kubernetes_utils.get_ssh_proxy_command(
PRIVATE_SSH_KEY_PATH, ssh_jump_name, network_mode, namespace,
clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_PATH,
clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_TEMPLATE)

config['auth']['ssh_proxy_command'] = ssh_proxy_cmd

return config
6 changes: 5 additions & 1 deletion sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ def wait_until_ray_cluster_ready(

def ssh_credential_from_yaml(cluster_yaml: str,
docker_user: Optional[str] = None
) -> Dict[str, str]:
) -> Dict[str, Any]:
"""Returns ssh_user, ssh_private_key and ssh_control name."""
config = common_utils.read_yaml(cluster_yaml)
auth_section = config['auth']
Expand All @@ -1369,6 +1369,10 @@ def ssh_credential_from_yaml(cluster_yaml: str,
}
if docker_user is not None:
credentials['docker_user'] = docker_user
ssh_provider_module = config['provider']['module']
# If we are running ssh command on kubernetes node.
if 'kubernetes' in ssh_provider_module:
credentials['disable_control_master'] = True
return credentials


Expand Down
49 changes: 6 additions & 43 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2319,23 +2319,12 @@ def _update_cluster_region(self):
self.launched_resources = self.launched_resources.copy(region=region)

def update_ssh_ports(self, max_attempts: int = 1) -> None:
"""Updates the cluster SSH ports cached in the handle."""
# TODO(romilb): Replace this with a call to the cloud class to get ports
# Use port 22 for everything except Kubernetes
if not isinstance(self.launched_resources.cloud, clouds.Kubernetes):
head_ssh_port = 22
else:
svc_name = f'{self.cluster_name_on_cloud}-ray-head-ssh'
retry_cnt = 0
while True:
try:
head_ssh_port = clouds.Kubernetes.get_port(svc_name)
break
except Exception: # pylint: disable=broad-except
retry_cnt += 1
if retry_cnt >= max_attempts:
raise
# TODO(romilb): Multinode doesn't work with Kubernetes yet.
"""Fetches and sets the SSH ports for the cluster nodes.
Use this method to use any cloud-specific port fetching logic.
"""
del max_attempts # Unused.
head_ssh_port = 22
self.stable_ssh_ports = ([head_ssh_port] + [22] *
(self.num_node_ips - 1))

Expand Down Expand Up @@ -3011,37 +3000,12 @@ def _sync_file_mounts(
self._execute_file_mounts(handle, all_file_mounts)
self._execute_storage_mounts(handle, storage_mounts)

def _update_envs_for_k8s(self, handle: CloudVmRayResourceHandle,
task: task_lib.Task) -> None:
"""Update envs with env vars from Kubernetes if cloud is Kubernetes.
Kubernetes automatically populates containers with critical environment
variables, such as those for discovering services running in the
cluster and CUDA/nvidia environment variables. We need to update task
environment variables with these env vars. This is needed for GPU
support and service discovery.
See https://github.com/skypilot-org/skypilot/issues/2287 for
more details.
"""
if isinstance(handle.launched_resources.cloud, clouds.Kubernetes):
temp_envs = copy.deepcopy(task.envs)
cloud_env_vars = handle.launched_resources.cloud.query_env_vars(
handle.cluster_name_on_cloud)
task.update_envs(cloud_env_vars)

# Re update the envs with the original envs to give priority to
# the original envs.
task.update_envs(temp_envs)

def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task,
detach_setup: bool) -> None:
start = time.time()
style = colorama.Style
fore = colorama.Fore

self._update_envs_for_k8s(handle, task)

if task.setup is None:
return

Expand Down Expand Up @@ -3350,7 +3314,6 @@ def _execute(
# Check the task resources vs the cluster resources. Since `sky exec`
# will not run the provision and _check_existing_cluster
self.check_resources_fit_cluster(handle, task)
self._update_envs_for_k8s(handle, task)

resources_str = backend_utils.get_task_resources_str(task)

Expand Down
51 changes: 13 additions & 38 deletions sky/clouds/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

logger = sky_logging.init_logger(__name__)

_CREDENTIAL_PATH = '~/.kube/config'
CREDENTIAL_PATH = '~/.kube/config'


@clouds.CLOUD_REGISTRY.register
class Kubernetes(clouds.Cloud):
"""Kubernetes."""

SKY_SSH_KEY_SECRET_NAME = f'sky-ssh-{common_utils.get_user_hash()}'

SKY_SSH_JUMP_NAME = f'sky-ssh-jump-{common_utils.get_user_hash()}'
PORT_FORWARD_PROXY_CMD_TEMPLATE = \
'kubernetes-port-forward-proxy-command.sh.j2'
PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/port-forward-proxy-cmd.sh'
# Timeout for resource provisioning. This timeout determines how long to
# wait for pod to be in pending status before giving up.
# Larger timeout may be required for autoscaling clusters, since autoscaler
Expand Down Expand Up @@ -209,6 +212,9 @@ def make_deploy_resources_variables(
assert image_id.startswith('skypilot:')
image_id = service_catalog.get_image_id_from_tag(image_id,
clouds='kubernetes')
# TODO(romilb): Create a lightweight image for SSH jump host
ssh_jump_image = service_catalog.get_image_id_from_tag(
self.IMAGE_CPU, clouds='kubernetes')

k8s_acc_label_key = None
k8s_acc_label_value = None
Expand All @@ -229,6 +235,8 @@ def make_deploy_resources_variables(
'k8s_ssh_key_secret_name': self.SKY_SSH_KEY_SECRET_NAME,
'k8s_acc_label_key': k8s_acc_label_key,
'k8s_acc_label_value': k8s_acc_label_value,
'k8s_ssh_jump_name': self.SKY_SSH_JUMP_NAME,
'k8s_ssh_jump_image': ssh_jump_image,
# TODO(romilb): Allow user to specify custom images
'image_id': image_id,
}
Expand Down Expand Up @@ -298,7 +306,7 @@ def _make(instance_list):

@classmethod
def check_credentials(cls) -> Tuple[bool, Optional[str]]:
if os.path.exists(os.path.expanduser(_CREDENTIAL_PATH)):
if os.path.exists(os.path.expanduser(CREDENTIAL_PATH)):
# Test using python API
try:
return kubernetes_utils.check_credentials()
Expand All @@ -307,10 +315,10 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]:
f'{common_utils.format_exception(e)}')
else:
return (False, 'Credentials not found - '
f'check if {_CREDENTIAL_PATH} exists.')
f'check if {CREDENTIAL_PATH} exists.')

def get_credential_file_mounts(self) -> Dict[str, str]:
return {_CREDENTIAL_PATH: _CREDENTIAL_PATH}
return {CREDENTIAL_PATH: CREDENTIAL_PATH}

def instance_type_exists(self, instance_type: str) -> bool:
return kubernetes_utils.KubernetesInstanceType.is_valid_instance_type(
Expand Down Expand Up @@ -368,39 +376,6 @@ def query_status(cls, name: str, tag_filters: Dict[str, str],
# If pods are not found, we don't add them to the return list
return cluster_status

@classmethod
def query_env_vars(cls, name: str) -> Dict[str, str]:
namespace = kubernetes_utils.get_current_kube_config_context_namespace()
pod = kubernetes.core_api().list_namespaced_pod(
namespace,
label_selector=f'skypilot-cluster={name},ray-node-type=head'
).items[0]
response = kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
pod.metadata.name,
namespace,
command=['env'],
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)
# Split response by newline and filter lines containing '='
raw_lines = response.split('\n')
filtered_lines = [line for line in raw_lines if '=' in line]

# Split each line at the first '=' occurrence
lines = [line.split('=', 1) for line in filtered_lines]

# Construct the dictionary using only valid environment variable names
env_vars = {}
for line in lines:
key = line[0]
if common_utils.is_valid_env_var(key):
env_vars[key] = line[1]

return env_vars

@classmethod
def get_current_user_identity(cls) -> Optional[List[str]]:
k8s = kubernetes.get_kubernetes()
Expand Down
36 changes: 36 additions & 0 deletions sky/skylet/providers/kubernetes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def bootstrap_kubernetes(config: Dict[str, Any]) -> Dict[str, Any]:

_configure_services(namespace, config['provider'])

config = _configure_ssh_jump(namespace, config)

if not config['provider'].get('_operator'):
# These steps are unecessary when using the Operator.
_configure_autoscaler_service_account(namespace, config['provider'])
Expand Down Expand Up @@ -257,6 +259,40 @@ def _configure_autoscaler_role_binding(namespace: str,
logger.info(log_prefix + created_msg(binding_field, name))


def _configure_ssh_jump(namespace, config):
"""Creates a SSH jump pod to connect to the cluster.
Also updates config['auth']['ssh_proxy_command'] to use the newly created
jump pod.
"""
pod_cfg = config['available_node_types']['ray_head_default']['node_config']

ssh_jump_name = pod_cfg['metadata']['labels']['skypilot-ssh-jump']
ssh_jump_image = config['provider']['ssh_jump_image']

volumes = pod_cfg['spec']['volumes']
# find 'secret-volume' and get the secret name
secret_volume = next(filter(lambda x: x['name'] == 'secret-volume',
volumes))
ssh_key_secret_name = secret_volume['secret']['secretName']

# TODO(romilb): We currently split SSH jump pod and svc creation. Service
# is first created in authentication.py::setup_kubernetes_authentication
# and then SSH jump pod creation happens here. This is because we need to
# set the ssh_proxy_command in the ray YAML before we pass it to the
# autoscaler. If in the future if we can write the ssh_proxy_command to the
# cluster yaml through this method, then we should move the service
# creation here.

# TODO(romilb): We should add a check here to make sure the service is up
# and available before we create the SSH jump pod. If for any reason the
# service is missing, we should raise an error.

kubernetes_utils.setup_ssh_jump_pod(ssh_jump_name, ssh_jump_image,
ssh_key_secret_name, namespace)
return config


def _configure_services(namespace: str, provider_config: Dict[str,
Any]) -> None:
service_field = 'services'
Expand Down
Loading

0 comments on commit f0d3dfc

Please sign in to comment.