diff --git a/Dockerfile_k8s b/Dockerfile_k8s index 12dbaa9006c..cf6ff86cbed 100644 --- a/Dockerfile_k8s +++ b/Dockerfile_k8s @@ -45,6 +45,9 @@ RUN cd /skypilot/ && \ sudo mv -v sky/setup_files/* . && \ pip install ".[aws]" +# Set PYTHONUNBUFFERED=1 to have Python print to stdout/stderr immediately +ENV PYTHONUNBUFFERED=1 + # Set WORKDIR and initialize conda for sky user WORKDIR /home/sky RUN conda init diff --git a/Dockerfile_k8s_gpu b/Dockerfile_k8s_gpu index 413253d08b8..6f242b6f5ab 100644 --- a/Dockerfile_k8s_gpu +++ b/Dockerfile_k8s_gpu @@ -53,6 +53,9 @@ RUN cd /skypilot/ && \ sudo mv -v sky/setup_files/* . && \ pip install ".[aws]" +# Set PYTHONUNBUFFERED=1 to have Python print to stdout/stderr immediately +ENV PYTHONUNBUFFERED=1 + # Set WORKDIR and initialize conda for sky user WORKDIR /home/sky RUN conda init diff --git a/sky/authentication.py b/sky/authentication.py index 27029b982de..022dba9264c 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -37,10 +37,12 @@ from sky import clouds from sky import sky_logging +from sky import skypilot_config from sky.adaptors import gcp from sky.adaptors import ibm from sky.skylet.providers.lambda_cloud import lambda_utils from sky.utils import common_utils +from sky.utils import kubernetes_utils from sky.utils import subprocess_utils from sky.utils import ux_utils @@ -377,6 +379,21 @@ def setup_scp_authentication(config: Dict[str, Any]) -> Dict[str, Any]: def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + # Default ssh session is established with kubectl port-forwarding with + # ClusterIP service. + nodeport_mode = kubernetes_utils.KubernetesNetworkingMode.NODEPORT + port_forward_mode = kubernetes_utils.KubernetesNetworkingMode.PORTFORWARD + network_mode_str = skypilot_config.get_nested(('kubernetes', 'networking'), + port_forward_mode.value) + try: + network_mode = kubernetes_utils.KubernetesNetworkingMode.from_str( + network_mode_str) + except ValueError as e: + # Add message saying "Please check: ~/.sky/config.yaml" to the error + # message. + with ux_utils.print_exception_no_traceback(): + raise ValueError(str(e) + ' Please check: ~/.sky/config.yaml.') \ + from None get_or_generate_keys() # Run kubectl command to add the public key to the cluster. @@ -403,4 +420,30 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: logger.error(suffix) raise + ssh_jump_name = clouds.Kubernetes.SKY_SSH_JUMP_NAME + if network_mode == nodeport_mode: + service_type = kubernetes_utils.KubernetesServiceType.NODEPORT + elif network_mode == port_forward_mode: + kubernetes_utils.check_port_forward_mode_dependencies() + # Using `kubectl port-forward` creates a direct tunnel to jump pod and + # does not require opening any ports on Kubernetes nodes. As a result, + # the service can be a simple ClusterIP service which we access with + # `kubectl port-forward`. + service_type = kubernetes_utils.KubernetesServiceType.CLUSTERIP + else: + # This should never happen because we check for this in from_str above. + raise ValueError(f'Unsupported networking mode: {network_mode_str}') + # Setup service for SSH jump pod. We create the SSH jump service here + # because we need to know the service IP address and port to set the + # ssh_proxy_command in the autoscaler config. + namespace = kubernetes_utils.get_current_kube_config_context_namespace() + kubernetes_utils.setup_ssh_jump_svc(ssh_jump_name, namespace, service_type) + + ssh_proxy_cmd = kubernetes_utils.get_ssh_proxy_command( + PRIVATE_SSH_KEY_PATH, ssh_jump_name, network_mode, namespace, + clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_PATH, + clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_TEMPLATE) + + config['auth']['ssh_proxy_command'] = ssh_proxy_cmd + return config diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index b3162aa0226..d5f391bffca 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1353,7 +1353,7 @@ def wait_until_ray_cluster_ready( def ssh_credential_from_yaml(cluster_yaml: str, docker_user: Optional[str] = None - ) -> Dict[str, str]: + ) -> Dict[str, Any]: """Returns ssh_user, ssh_private_key and ssh_control name.""" config = common_utils.read_yaml(cluster_yaml) auth_section = config['auth'] @@ -1369,6 +1369,10 @@ def ssh_credential_from_yaml(cluster_yaml: str, } if docker_user is not None: credentials['docker_user'] = docker_user + ssh_provider_module = config['provider']['module'] + # If we are running ssh command on kubernetes node. + if 'kubernetes' in ssh_provider_module: + credentials['disable_control_master'] = True return credentials diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f222a98241e..e72abf774ad 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2319,23 +2319,12 @@ def _update_cluster_region(self): self.launched_resources = self.launched_resources.copy(region=region) def update_ssh_ports(self, max_attempts: int = 1) -> None: - """Updates the cluster SSH ports cached in the handle.""" - # TODO(romilb): Replace this with a call to the cloud class to get ports - # Use port 22 for everything except Kubernetes - if not isinstance(self.launched_resources.cloud, clouds.Kubernetes): - head_ssh_port = 22 - else: - svc_name = f'{self.cluster_name_on_cloud}-ray-head-ssh' - retry_cnt = 0 - while True: - try: - head_ssh_port = clouds.Kubernetes.get_port(svc_name) - break - except Exception: # pylint: disable=broad-except - retry_cnt += 1 - if retry_cnt >= max_attempts: - raise - # TODO(romilb): Multinode doesn't work with Kubernetes yet. + """Fetches and sets the SSH ports for the cluster nodes. + + Use this method to use any cloud-specific port fetching logic. + """ + del max_attempts # Unused. + head_ssh_port = 22 self.stable_ssh_ports = ([head_ssh_port] + [22] * (self.num_node_ips - 1)) @@ -3011,37 +3000,12 @@ def _sync_file_mounts( self._execute_file_mounts(handle, all_file_mounts) self._execute_storage_mounts(handle, storage_mounts) - def _update_envs_for_k8s(self, handle: CloudVmRayResourceHandle, - task: task_lib.Task) -> None: - """Update envs with env vars from Kubernetes if cloud is Kubernetes. - - Kubernetes automatically populates containers with critical environment - variables, such as those for discovering services running in the - cluster and CUDA/nvidia environment variables. We need to update task - environment variables with these env vars. This is needed for GPU - support and service discovery. - - See https://github.com/skypilot-org/skypilot/issues/2287 for - more details. - """ - if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): - temp_envs = copy.deepcopy(task.envs) - cloud_env_vars = handle.launched_resources.cloud.query_env_vars( - handle.cluster_name_on_cloud) - task.update_envs(cloud_env_vars) - - # Re update the envs with the original envs to give priority to - # the original envs. - task.update_envs(temp_envs) - def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, detach_setup: bool) -> None: start = time.time() style = colorama.Style fore = colorama.Fore - self._update_envs_for_k8s(handle, task) - if task.setup is None: return @@ -3350,7 +3314,6 @@ def _execute( # Check the task resources vs the cluster resources. Since `sky exec` # will not run the provision and _check_existing_cluster self.check_resources_fit_cluster(handle, task) - self._update_envs_for_k8s(handle, task) resources_str = backend_utils.get_task_resources_str(task) diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index ae15fe1a8f5..50b358755ed 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -20,7 +20,7 @@ logger = sky_logging.init_logger(__name__) -_CREDENTIAL_PATH = '~/.kube/config' +CREDENTIAL_PATH = '~/.kube/config' @clouds.CLOUD_REGISTRY.register @@ -28,7 +28,10 @@ class Kubernetes(clouds.Cloud): """Kubernetes.""" SKY_SSH_KEY_SECRET_NAME = f'sky-ssh-{common_utils.get_user_hash()}' - + SKY_SSH_JUMP_NAME = f'sky-ssh-jump-{common_utils.get_user_hash()}' + PORT_FORWARD_PROXY_CMD_TEMPLATE = \ + 'kubernetes-port-forward-proxy-command.sh.j2' + PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/port-forward-proxy-cmd.sh' # Timeout for resource provisioning. This timeout determines how long to # wait for pod to be in pending status before giving up. # Larger timeout may be required for autoscaling clusters, since autoscaler @@ -209,6 +212,9 @@ def make_deploy_resources_variables( assert image_id.startswith('skypilot:') image_id = service_catalog.get_image_id_from_tag(image_id, clouds='kubernetes') + # TODO(romilb): Create a lightweight image for SSH jump host + ssh_jump_image = service_catalog.get_image_id_from_tag( + self.IMAGE_CPU, clouds='kubernetes') k8s_acc_label_key = None k8s_acc_label_value = None @@ -229,6 +235,8 @@ def make_deploy_resources_variables( 'k8s_ssh_key_secret_name': self.SKY_SSH_KEY_SECRET_NAME, 'k8s_acc_label_key': k8s_acc_label_key, 'k8s_acc_label_value': k8s_acc_label_value, + 'k8s_ssh_jump_name': self.SKY_SSH_JUMP_NAME, + 'k8s_ssh_jump_image': ssh_jump_image, # TODO(romilb): Allow user to specify custom images 'image_id': image_id, } @@ -298,7 +306,7 @@ def _make(instance_list): @classmethod def check_credentials(cls) -> Tuple[bool, Optional[str]]: - if os.path.exists(os.path.expanduser(_CREDENTIAL_PATH)): + if os.path.exists(os.path.expanduser(CREDENTIAL_PATH)): # Test using python API try: return kubernetes_utils.check_credentials() @@ -307,10 +315,10 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]: f'{common_utils.format_exception(e)}') else: return (False, 'Credentials not found - ' - f'check if {_CREDENTIAL_PATH} exists.') + f'check if {CREDENTIAL_PATH} exists.') def get_credential_file_mounts(self) -> Dict[str, str]: - return {_CREDENTIAL_PATH: _CREDENTIAL_PATH} + return {CREDENTIAL_PATH: CREDENTIAL_PATH} def instance_type_exists(self, instance_type: str) -> bool: return kubernetes_utils.KubernetesInstanceType.is_valid_instance_type( @@ -368,39 +376,6 @@ def query_status(cls, name: str, tag_filters: Dict[str, str], # If pods are not found, we don't add them to the return list return cluster_status - @classmethod - def query_env_vars(cls, name: str) -> Dict[str, str]: - namespace = kubernetes_utils.get_current_kube_config_context_namespace() - pod = kubernetes.core_api().list_namespaced_pod( - namespace, - label_selector=f'skypilot-cluster={name},ray-node-type=head' - ).items[0] - response = kubernetes.stream()( - kubernetes.core_api().connect_get_namespaced_pod_exec, - pod.metadata.name, - namespace, - command=['env'], - stderr=True, - stdin=False, - stdout=True, - tty=False, - _request_timeout=kubernetes.API_TIMEOUT) - # Split response by newline and filter lines containing '=' - raw_lines = response.split('\n') - filtered_lines = [line for line in raw_lines if '=' in line] - - # Split each line at the first '=' occurrence - lines = [line.split('=', 1) for line in filtered_lines] - - # Construct the dictionary using only valid environment variable names - env_vars = {} - for line in lines: - key = line[0] - if common_utils.is_valid_env_var(key): - env_vars[key] = line[1] - - return env_vars - @classmethod def get_current_user_identity(cls) -> Optional[List[str]]: k8s = kubernetes.get_kubernetes() diff --git a/sky/skylet/providers/kubernetes/config.py b/sky/skylet/providers/kubernetes/config.py index 4132af1b89c..2ab466f03ed 100644 --- a/sky/skylet/providers/kubernetes/config.py +++ b/sky/skylet/providers/kubernetes/config.py @@ -55,6 +55,8 @@ def bootstrap_kubernetes(config: Dict[str, Any]) -> Dict[str, Any]: _configure_services(namespace, config['provider']) + config = _configure_ssh_jump(namespace, config) + if not config['provider'].get('_operator'): # These steps are unecessary when using the Operator. _configure_autoscaler_service_account(namespace, config['provider']) @@ -257,6 +259,40 @@ def _configure_autoscaler_role_binding(namespace: str, logger.info(log_prefix + created_msg(binding_field, name)) +def _configure_ssh_jump(namespace, config): + """Creates a SSH jump pod to connect to the cluster. + + Also updates config['auth']['ssh_proxy_command'] to use the newly created + jump pod. + """ + pod_cfg = config['available_node_types']['ray_head_default']['node_config'] + + ssh_jump_name = pod_cfg['metadata']['labels']['skypilot-ssh-jump'] + ssh_jump_image = config['provider']['ssh_jump_image'] + + volumes = pod_cfg['spec']['volumes'] + # find 'secret-volume' and get the secret name + secret_volume = next(filter(lambda x: x['name'] == 'secret-volume', + volumes)) + ssh_key_secret_name = secret_volume['secret']['secretName'] + + # TODO(romilb): We currently split SSH jump pod and svc creation. Service + # is first created in authentication.py::setup_kubernetes_authentication + # and then SSH jump pod creation happens here. This is because we need to + # set the ssh_proxy_command in the ray YAML before we pass it to the + # autoscaler. If in the future if we can write the ssh_proxy_command to the + # cluster yaml through this method, then we should move the service + # creation here. + + # TODO(romilb): We should add a check here to make sure the service is up + # and available before we create the SSH jump pod. If for any reason the + # service is missing, we should raise an error. + + kubernetes_utils.setup_ssh_jump_pod(ssh_jump_name, ssh_jump_image, + ssh_key_secret_name, namespace) + return config + + def _configure_services(namespace: str, provider_config: Dict[str, Any]) -> None: service_field = 'services' diff --git a/sky/skylet/providers/kubernetes/node_provider.py b/sky/skylet/providers/kubernetes/node_provider.py index 77222e72ab5..8963225cc3f 100644 --- a/sky/skylet/providers/kubernetes/node_provider.py +++ b/sky/skylet/providers/kubernetes/node_provider.py @@ -2,7 +2,6 @@ import logging import time from typing import Dict -from urllib.parse import urlparse from uuid import uuid4 from ray.autoscaler._private.command_runner import SSHCommandRunner @@ -13,6 +12,7 @@ from sky.adaptors import kubernetes from sky.skylet.providers.kubernetes import config +from sky.utils import common_utils from sky.utils import kubernetes_utils logger = logging.getLogger(__name__) @@ -100,17 +100,7 @@ def node_tags(self, node_id): return pod.metadata.labels def external_ip(self, node_id): - # Return the IP address of the first node with an external IP - nodes = kubernetes.core_api().list_node().items - for node in nodes: - if node.status.addresses: - for address in node.status.addresses: - if address.type == 'ExternalIP': - return address.address - # If no external IP is found, use the API server IP - api_host = kubernetes.core_api().api_client.configuration.host - parsed_url = urlparse(api_host) - return parsed_url.hostname + return kubernetes_utils.get_external_ip() def external_port(self, node_id): # Extract the NodePort of the head node's SSH service @@ -172,6 +162,61 @@ def _set_node_tags(self, node_id, tags): pod.metadata.labels.update(tags) kubernetes.core_api().patch_namespaced_pod(node_id, self.namespace, pod) + def _raise_pod_scheduling_errors(self, new_nodes): + for new_node in new_nodes: + pod_status = new_node.status.phase + pod_name = new_node._metadata._name + events = kubernetes.core_api().list_namespaced_event( + self.namespace, + field_selector=(f'involvedObject.name={pod_name},' + 'involvedObject.kind=Pod')) + # Events created in the past hours are kept by + # Kubernetes python client and we want to surface + # the latest event message + events_desc_by_time = \ + sorted(events.items, + key=lambda e: e.metadata.creation_timestamp, + reverse=True) + for event in events_desc_by_time: + if event.reason == 'FailedScheduling': + event_message = event.message + break + timeout_err_msg = ('Timed out while waiting for nodes to start. ' + 'Cluster may be out of resources or ' + 'may be too slow to autoscale.') + lack_resource_msg = ( + 'Insufficient {resource} capacity on the cluster. ' + 'Other SkyPilot tasks or pods may be using resources. ' + 'Check resource usage by running `kubectl describe nodes`.') + if event_message is not None: + if pod_status == 'Pending': + if 'Insufficient cpu' in event_message: + raise config.KubernetesError( + lack_resource_msg.format(resource='CPU')) + if 'Insufficient memory' in event_message: + raise config.KubernetesError( + lack_resource_msg.format(resource='memory')) + gpu_lf_keys = [ + lf.get_label_key() + for lf in kubernetes_utils.LABEL_FORMATTER_REGISTRY + ] + if new_node.spec.node_selector: + for label_key in new_node.spec.node_selector.keys(): + if label_key in gpu_lf_keys: + # TODO(romilb): We may have additional node + # affinity selectors in the future - in that + # case we will need to update this logic. + if 'Insufficient nvidia.com/gpu' in event_message or \ + 'didn\'t match Pod\'s node affinity/selector' in event_message: + raise config.KubernetesError( + f'{lack_resource_msg.format(resource="GPU")} ' + f'Verify if {new_node.spec.node_selector[label_key]}' + ' is available in the cluster.') + raise config.KubernetesError(f'{timeout_err_msg} ' + f'Pod status: {pod_status}' + f'Details: \'{event_message}\' ') + raise config.KubernetesError(f'{timeout_err_msg}') + def create_node(self, node_config, tags, count): conf = copy.deepcopy(node_config) pod_spec = conf.get('pod', conf) @@ -204,7 +249,6 @@ def create_node(self, node_config, tags, count): '(count={}).'.format(count)) for new_node in new_nodes: - metadata = service_spec.get('metadata', {}) metadata['name'] = new_node.metadata.name service_spec['metadata'] = metadata @@ -216,16 +260,20 @@ def create_node(self, node_config, tags, count): # Wait for all pods to be ready, and if it exceeds the timeout, raise an # exception. If pod's container is ContainerCreating, then we can assume # that resources have been allocated and we can exit. - start = time.time() while True: if time.time() - start > self.timeout: - raise config.KubernetesError( - 'Timed out while waiting for nodes to start. ' - 'Cluster may be out of resources or ' - 'may be too slow to autoscale.') - all_ready = True + try: + self._raise_pod_scheduling_errors(new_nodes) + except config.KubernetesError: + raise + except Exception as e: + raise config.KubernetesError( + 'An error occurred while trying to fetch the reason ' + 'for pod scheduling failure. ' + f'Error: {common_utils.format_exception(e)}') from None + all_ready = True for node in new_nodes: pod = kubernetes.core_api().read_namespaced_pod( node.metadata.name, self.namespace) @@ -252,20 +300,57 @@ def create_node(self, node_config, tags, count): break time.sleep(1) + # Wait for pod containers to be ready - they may be pulling images or + # may be in the process of container creation. + while True: + pods = [] + for node in new_nodes: + pod = kubernetes.core_api().read_namespaced_pod( + node.metadata.name, self.namespace) + pods.append(pod) + if all([pod.status.phase == "Running" for pod in pods]) \ + and all( + [container.state.running for pod in pods for container in + pod.status.container_statuses]): + break + time.sleep(1) + + # Once all containers are ready, we can exec into them and set env vars. + # Kubernetes automatically populates containers with critical + # environment variables, such as those for discovering services running + # in the cluster and CUDA/nvidia environment variables. We need to + # make sure these env vars are available in every task and ssh session. + # This is needed for GPU support and service discovery. + # See https://github.com/skypilot-org/skypilot/issues/2287 for + # more details. + # To do so, we capture env vars from the pod's runtime and write them to + # /etc/profile.d/, making them available for all users in future + # shell sessions. + set_k8s_env_var_cmd = [ + '/bin/sh', '-c', + ('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && ' + 'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || ' + 'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh') + ] + for new_node in new_nodes: + kubernetes.stream()( + kubernetes.core_api().connect_get_namespaced_pod_exec, + new_node.metadata.name, + self.namespace, + command=set_k8s_env_var_cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _request_timeout=kubernetes.API_TIMEOUT) + def terminate_node(self, node_id): logger.info(config.log_prefix + 'calling delete_namespaced_pod') try: - kubernetes.core_api().delete_namespaced_pod( - node_id, - self.namespace, - _request_timeout=config.DELETION_TIMEOUT) - except kubernetes.api_exception() as e: - if e.status == 404: - logger.warning(config.log_prefix + - f'Tried to delete pod {node_id},' - ' but the pod was not found (404).') - else: - raise + kubernetes_utils.clean_zombie_ssh_jump_pod(self.namespace, node_id) + except Exception as e: + logger.warning(config.log_prefix + + f'Error occurred when analyzing SSH Jump pod: {e}') try: kubernetes.core_api().delete_namespaced_service( node_id, @@ -277,6 +362,21 @@ def terminate_node(self, node_id): _request_timeout=config.DELETION_TIMEOUT) except kubernetes.api_exception(): pass + # Note - delete pod after all other resources are deleted. + # This is to ensure there are no leftover resources if this down is run + # from within the pod, e.g., for autodown. + try: + kubernetes.core_api().delete_namespaced_pod( + node_id, + self.namespace, + _request_timeout=config.DELETION_TIMEOUT) + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(config.log_prefix + + f'Tried to delete pod {node_id},' + ' but the pod was not found (404).') + else: + raise def terminate_nodes(self, node_ids): # TODO(romilb): terminate_nodes should be include optimizations for diff --git a/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 b/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 new file mode 100644 index 00000000000..fa71df3a0ec --- /dev/null +++ b/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +set -uo pipefail + +# Checks if socat is installed +if ! command -v socat > /dev/null; then + echo "Using 'port-forward' mode to run ssh session on Kubernetes instances requires 'socat' to be installed. Please install 'socat'" >&2 + exit +fi + +# Checks if lsof is installed +if ! command -v lsof > /dev/null; then + echo "Checking port availability for 'port-forward' mode requires 'lsof' to be installed. Please install 'lsof'" >&2 + exit 1 +fi + +# Function to check if port is in use +is_port_in_use() { + local port="$1" + lsof -i :${port} > /dev/null 2>&1 +} + +# Start from a fixed local port and increment if in use +local_port={{ local_port }} +while is_port_in_use "${local_port}"; do + local_port=$((local_port + 1)) +done + +# Establishes connection between local port and the ssh jump pod +kubectl port-forward svc/{{ ssh_jump_name }} "${local_port}":22 & + +# Terminate the port-forward process when this script exits. +K8S_PORT_FWD_PID=$! +trap "kill $K8S_PORT_FWD_PID" EXIT + +# checks if a connection to local_port of 127.0.0.1:[local_port] is established +while ! nc -z 127.0.0.1 "${local_port}"; do + sleep 0.1 +done + +# Establishes two directional byte streams to handle stdin/stdout between +# terminal and the jump pod. +# socat process terminates when port-forward terminates. +socat - tcp:127.0.0.1:"${local_port}" \ No newline at end of file diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index d8e14adcf4e..da8e4253290 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -16,11 +16,15 @@ provider: type: external module: sky.skylet.providers.kubernetes.KubernetesNodeProvider - # Use False if running from outside of k8s cluster - use_internal_ips: false + # We use internal IPs since we set up a port-forward between the kubernetes + # cluster and the local machine, or directly use NodePort to reach the + # head node. + use_internal_ips: true timeout: {{timeout}} + ssh_jump_image: {{k8s_ssh_jump_image}} + # ServiceAccount created by the autoscaler for the head node pod that it # runs in. If this field isn't provided, the head pod config below must # contain a user-created service account with the proper permissions. @@ -78,7 +82,6 @@ provider: skypilot-cluster: {{cluster_name_on_cloud}} name: {{cluster_name_on_cloud}}-ray-head-ssh spec: - type: NodePort selector: component: {{cluster_name_on_cloud}}-ray-head ports: @@ -126,6 +129,8 @@ available_node_types: parent: skypilot component: {{cluster_name_on_cloud}}-ray-head skypilot-cluster: {{cluster_name_on_cloud}} + # Identifies the SSH jump pod used by this pod. Used in life cycle management of the ssh jump pod. + skypilot-ssh-jump: {{k8s_ssh_jump_name}} spec: # Change this if you altered the autoscaler_service_account above # or want to provide your own. diff --git a/sky/templates/kubernetes-ssh-jump.yml.j2 b/sky/templates/kubernetes-ssh-jump.yml.j2 new file mode 100644 index 00000000000..a4c9929fe1e --- /dev/null +++ b/sky/templates/kubernetes-ssh-jump.yml.j2 @@ -0,0 +1,90 @@ +pod_spec: + apiVersion: v1 + kind: Pod + metadata: + name: {{ name }} + labels: + component: {{ name }} + parent: skypilot + spec: + serviceAccountName: sky-ssh-jump-sa + volumes: + - name: secret-volume + secret: + secretName: {{ secret }} + containers: + - name: {{ name }} + imagePullPolicy: Always + image: {{ image }} + command: ["python3", "-u", "/skypilot/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py"] + ports: + - containerPort: 22 + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: /etc/secret-volume + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] + env: + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: MY_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: ALERT_THRESHOLD + # seconds + value: "600" + - name: RETRY_INTERVAL + # seconds + value: "60" + terminationGracePeriodSeconds: 0 +service_spec: + apiVersion: v1 + kind: Service + metadata: + name: {{ name }} + labels: + parent: skypilot + spec: + type: {{ service_type }} + selector: + component: {{ name }} + ports: + - protocol: TCP + port: 22 + targetPort: 22 +# The following ServiceAccount/Role/RoleBinding sets up an RBAC for life cycle +# management of the jump pod/service +service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + name: sky-ssh-jump-sa + parent: skypilot +role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-ssh-jump-role + rules: + - apiGroups: [""] + resources: ["pods", "pods/status", "pods/exec", "services"] + verbs: ["get", "list", "create", "delete"] +role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: sky-ssh-jump-rb + parent: skypilot + subjects: + - kind: ServiceAccount + name: sky-ssh-jump-sa + roleRef: + kind: Role + name: sky-ssh-jump-role + apiGroup: rbac.authorization.k8s.io diff --git a/sky/utils/command_runner.py b/sky/utils/command_runner.py index 08fde49354d..c3dd73eb345 100644 --- a/sky/utils/command_runner.py +++ b/sky/utils/command_runner.py @@ -42,13 +42,16 @@ def _ssh_control_path(ssh_control_filename: Optional[str]) -> Optional[str]: return path -def ssh_options_list(ssh_private_key: Optional[str], - ssh_control_name: Optional[str], - *, - ssh_proxy_command: Optional[str] = None, - docker_ssh_proxy_command: Optional[str] = None, - timeout: int = 30, - port: int = 22) -> List[str]: +def ssh_options_list( + ssh_private_key: Optional[str], + ssh_control_name: Optional[str], + *, + ssh_proxy_command: Optional[str] = None, + docker_ssh_proxy_command: Optional[str] = None, + timeout: int = 30, + port: int = 22, + disable_control_master: Optional[bool] = False, +) -> List[str]: """Returns a list of sane options for 'ssh'.""" # Forked from Ray SSHOptions: # https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/command_runner.py @@ -79,7 +82,13 @@ def ssh_options_list(ssh_private_key: Optional[str], } # SSH Control will have a severe delay when using docker_ssh_proxy_command. # TODO(tian): Investigate why. - if ssh_control_name is not None and docker_ssh_proxy_command is None: + # We also do not use ControlMaster when we use `kubectl port-forward` + # to access Kubernetes pods over SSH+Proxycommand. This is because the + # process running ProxyCommand is kept running as long as the ssh session + # is running and the ControlMaster keeps the session, which results in + # 'ControlPersist' number of seconds delay per ssh commands ran. + if (ssh_control_name is not None and docker_ssh_proxy_command is None and + not disable_control_master): arg_dict.update({ # Control path: important optimization as we do multiple ssh in one # sky.launch(). @@ -136,6 +145,7 @@ def __init__( ssh_proxy_command: Optional[str] = None, port: int = 22, docker_user: Optional[str] = None, + disable_control_master: Optional[bool] = False, ): """Initialize SSHCommandRunner. @@ -158,13 +168,17 @@ def __init__( port: The port to use for ssh. docker_user: The docker user to use for ssh. If specified, the command will be run inside a docker container which have a ssh - server running at port sky.skylet.constants.DEFAULT_DOCKER_PORT. + server running at port sky.skylet.constants.DEFAULT_DOCKER_PORT + disable_control_master: bool; specifies either or not the ssh + command will utilize ControlMaster. We currently disable + it for k8s instance. """ self.ssh_private_key = ssh_private_key self.ssh_control_name = ( None if ssh_control_name is None else hashlib.md5( ssh_control_name.encode()).hexdigest()[:_HASH_MAX_LENGTH]) self._ssh_proxy_command = ssh_proxy_command + self.disable_control_master = disable_control_master if docker_user is not None: assert port is None or port == 22, ( f'port must be None or 22 for docker_user, got {port}.') @@ -190,6 +204,7 @@ def make_runner_list( ssh_private_key: str, ssh_control_name: Optional[str] = None, ssh_proxy_command: Optional[str] = None, + disable_control_master: Optional[bool] = False, port_list: Optional[List[int]] = None, docker_user: Optional[str] = None, ) -> List['SSHCommandRunner']: @@ -198,7 +213,8 @@ def make_runner_list( port_list = [22] * len(ip_list) return [ SSHCommandRunner(ip, ssh_user, ssh_private_key, ssh_control_name, - ssh_proxy_command, port, docker_user) + ssh_proxy_command, port, docker_user, + disable_control_master) for ip, port in zip(ip_list, port_list) ] @@ -228,7 +244,9 @@ def _ssh_base_command(self, *, ssh_mode: SshMode, ssh_proxy_command=self._ssh_proxy_command, docker_ssh_proxy_command=docker_ssh_proxy_command, port=self.port, - ) + [f'{self.ssh_user}@{self.ip}'] + disable_control_master=self.disable_control_master) + [ + f'{self.ssh_user}@{self.ip}' + ] def run( self, @@ -388,7 +406,7 @@ def rsync( ssh_proxy_command=self._ssh_proxy_command, docker_ssh_proxy_command=docker_ssh_proxy_command, port=self.port, - )) + disable_control_master=self.disable_control_master)) rsync_command.append(f'-e "ssh {ssh_options}"') # To support spaces in the path, we need to quote source and target. # rsync doesn't support '~' in a quoted local path, but it is ok to diff --git a/sky/utils/command_runner.pyi b/sky/utils/command_runner.pyi index e5feb5fb8db..893acb1e57b 100644 --- a/sky/utils/command_runner.pyi +++ b/sky/utils/command_runner.pyi @@ -20,10 +20,16 @@ RSYNC_FILTER_OPTION: str RSYNC_EXCLUDE_OPTION: str -def ssh_options_list(ssh_private_key: Optional[str], - ssh_control_name: Optional[str], - *, - timeout: int = ...) -> List[str]: +def ssh_options_list( + ssh_private_key: Optional[str], + ssh_control_name: Optional[str], + *, + ssh_proxy_command: Optional[str] = ..., + docker_ssh_proxy_command: Optional[str] = ..., + timeout: int = ..., + port: int = ..., + disable_control_master: Optional[bool] = ..., +) -> List[str]: ... @@ -40,14 +46,18 @@ class SSHCommandRunner: ssh_control_name: Optional[str] docker_user: str port: int + disable_control_master: Optional[bool] - def __init__(self, - ip: str, - ssh_user: str, - ssh_private_key: str, - ssh_control_name: Optional[str] = ..., - port: int = ..., - docker_user: Optional[str] = ...) -> None: + def __init__( + self, + ip: str, + ssh_user: str, + ssh_private_key: str, + ssh_control_name: Optional[str] = ..., + port: int = ..., + docker_user: Optional[str] = ..., + disable_control_master: Optional[bool] = ..., + ) -> None: ... @staticmethod @@ -59,6 +69,7 @@ class SSHCommandRunner: ssh_proxy_command: Optional[str] = ..., port_list: Optional[List[int]] = ..., docker_user: Optional[str] = ..., + disable_control_master: Optional[bool] = ..., ) -> List['SSHCommandRunner']: ... diff --git a/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py b/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py new file mode 100644 index 00000000000..05f6a8d7a42 --- /dev/null +++ b/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py @@ -0,0 +1,111 @@ +"""Manages lifecycle of ssh jump pod. + +This script runs inside ssh jump pod as the main process (PID 1). + +It terminates itself (by removing ssh jump service and pod via a call to +kubeapi), if it does not see ray pods in the duration of 10 minutes. If the +user re-launches a task before the duration is over, then ssh jump pod is being +reused and will terminate itself when it sees that no ray cluster exist in that +duration. +""" +import datetime +import os +import sys +import time + +from kubernetes import client +from kubernetes import config + +# Load kube config +config.load_incluster_config() + +v1 = client.CoreV1Api() + +current_name = os.getenv('MY_POD_NAME') +current_namespace = os.getenv('MY_POD_NAMESPACE') + +# The amount of time in seconds where no Ray pods exist in which after that time +# ssh jump pod terminates itself +alert_threshold = int(os.getenv('ALERT_THRESHOLD', '600')) +# The amount of time in seconds to wait between Ray pods existence checks +retry_interval = int(os.getenv('RETRY_INTERVAL', '60')) + +# Ray pods are labeled with this value i.e., ssh jump name which is unique per +# user (based on user hash) +label_selector = f'skypilot-ssh-jump={current_name}' + + +def poll(): + sys.stdout.write('Starting polling.\n') + + alert_delta = datetime.timedelta(seconds=alert_threshold) + + # Set delay for each retry + retry_interval_delta = datetime.timedelta(seconds=retry_interval) + + # Accumulated time of where no SkyPilot cluster exists. Used to compare + # against alert_threshold + nocluster_delta = datetime.timedelta() + + while True: + sys.stdout.write(f'Sleeping {retry_interval} seconds..\n') + time.sleep(retry_interval) + + # List the pods in the current namespace + try: + ret = v1.list_namespaced_pod(current_namespace, + label_selector=label_selector) + except Exception as e: + sys.stdout.write(f'Error: listing pods failed with error: {e}\n') + raise + + if len(ret.items) == 0: + sys.stdout.write(f'Did not find pods with label "{label_selector}" ' + f'in namespace {current_namespace}\n') + nocluster_delta = nocluster_delta + retry_interval_delta + sys.stdout.write( + f'Time since no pods found: {nocluster_delta}, alert ' + f'threshold: {alert_delta}\n') + else: + sys.stdout.write( + f'Found pods with label "{label_selector}" in namespace ' + f'{current_namespace}\n') + # reset .. + nocluster_delta = datetime.timedelta() + sys.stdout.write(f'noray_delta is reset: {nocluster_delta}\n') + + if nocluster_delta >= alert_delta: + sys.stdout.write( + f'nocluster_delta: {nocluster_delta} crossed alert threshold: ' + f'{alert_delta}. Time to terminate myself and my service.\n') + try: + # ssh jump resources created under same name + v1.delete_namespaced_service(current_name, current_namespace) + v1.delete_namespaced_pod(current_name, current_namespace) + except Exception as e: + sys.stdout.write('[ERROR] Deletion failed. Exiting ' + f'poll() with error: {e}\n') + raise + + break + + sys.stdout.write('Done polling.\n') + + +def main(): + sys.stdout.write('SkyPilot SSH Jump Pod Lifecycle Manager\n') + sys.stdout.write(f'current_name: {current_name}\n') + sys.stdout.write(f'current_namespace: {current_namespace}\n') + sys.stdout.write(f'alert_threshold time: {alert_threshold}\n') + sys.stdout.write(f'retry_interval time: {retry_interval}\n') + sys.stdout.write(f'label_selector: {label_selector}\n') + + if not current_name or not current_namespace: + # Raise Exception with message to terminate pod + raise Exception('Missing environment variables MY_POD_NAME or ' + 'MY_POD_NAMESPACE') + poll() + + +if __name__ == '__main__': + main() diff --git a/sky/utils/kubernetes_utils.py b/sky/utils/kubernetes_utils.py index b0d175b7e4f..cadd395bcd0 100644 --- a/sky/utils/kubernetes_utils.py +++ b/sky/utils/kubernetes_utils.py @@ -1,15 +1,26 @@ """Kubernetes utilities for SkyPilot.""" +import enum import math +import os import re -from typing import Any, List, Optional, Set, Tuple, Union +import subprocess +from typing import Any, Dict, List, Optional, Set, Tuple, Union +from urllib.parse import urlparse +import jinja2 +import yaml + +import sky from sky import exceptions +from sky import sky_logging from sky.adaptors import kubernetes +from sky.backends import backend_utils from sky.utils import common_utils from sky.utils import env_options from sky.utils import ux_utils DEFAULT_NAMESPACE = 'default' +LOCAL_PORT_FOR_PORT_FORWARD = 23100 MEMORY_SIZE_UNITS = { 'B': 1, @@ -20,6 +31,35 @@ 'P': 2**50, } +logger = sky_logging.init_logger(__name__) + + +class KubernetesNetworkingMode(enum.Enum): + """Enum for the different types of networking modes for accessing + jump pods. + """ + NODEPORT = 'nodeport' + PORTFORWARD = 'portforward' + + @classmethod + def from_str(cls, mode: str) -> 'KubernetesNetworkingMode': + """Returns the enum value for the given string.""" + if mode.lower() == cls.NODEPORT.value: + return cls.NODEPORT + elif mode.lower() == cls.PORTFORWARD.value: + return cls.PORTFORWARD + else: + raise ValueError(f'Unsupported kubernetes networking mode: ' + f'{mode}. The mode must be either ' + f'\'{cls.PORTFORWARD.value}\' or ' + f'\'{cls.NODEPORT.value}\'. ') + + +class KubernetesServiceType(enum.Enum): + """Enum for the different types of services.""" + NODEPORT = 'NodePort' + CLUSTERIP = 'ClusterIP' + class GPULabelFormatter: """Base class to define a GPU label formatter for a Kubernetes cluster @@ -346,6 +386,22 @@ def get_port(svc_name: str, namespace: str) -> int: return head_service.spec.ports[0].node_port +def get_external_ip(network_mode: Optional[KubernetesNetworkingMode]): + if network_mode == KubernetesNetworkingMode.PORTFORWARD: + return '127.0.0.1' + # Return the IP address of the first node with an external IP + nodes = kubernetes.core_api().list_node().items + for node in nodes: + if node.status.addresses: + for address in node.status.addresses: + if address.type == 'ExternalIP': + return address.address + # If no external IP is found, use the API server IP + api_host = kubernetes.core_api().api_client.configuration.host + parsed_url = urlparse(api_host) + return parsed_url.hostname + + def check_credentials(timeout: int = kubernetes.API_TIMEOUT) -> \ Tuple[bool, Optional[str]]: """Check if the credentials in kubeconfig file are valid @@ -578,3 +634,326 @@ def from_resources(cls, def __str__(self): return self.name + + +def construct_ssh_jump_command(private_key_path: str, + ssh_jump_port: int, + ssh_jump_ip: str, + proxy_cmd_path: Optional[str] = None) -> str: + ssh_jump_proxy_command = (f'ssh -tt -i {private_key_path} ' + '-o StrictHostKeyChecking=no ' + '-o UserKnownHostsFile=/dev/null ' + f'-o IdentitiesOnly=yes -p {ssh_jump_port} ' + f'-W %h:%p sky@{ssh_jump_ip}') + if proxy_cmd_path is not None: + proxy_cmd_path = os.path.expanduser(proxy_cmd_path) + # adding execution permission to the proxy command script + os.chmod(proxy_cmd_path, os.stat(proxy_cmd_path).st_mode | 0o111) + ssh_jump_proxy_command += f' -o ProxyCommand=\'{proxy_cmd_path}\' ' + return ssh_jump_proxy_command + + +def get_ssh_proxy_command(private_key_path: str, ssh_jump_name: str, + network_mode: KubernetesNetworkingMode, + namespace: str, port_fwd_proxy_cmd_path: str, + port_fwd_proxy_cmd_template: str) -> str: + """Generates the SSH proxy command to connect through the SSH jump pod. + + By default, establishing an SSH connection creates a communication + channel to a remote node by setting up a TCP connection. When a + ProxyCommand is specified, this default behavior is overridden. The command + specified in ProxyCommand is executed, and its standard input and output + become the communication channel for the SSH session. + + Pods within a Kubernetes cluster have internal IP addresses that are + typically not accessible from outside the cluster. Since the default TCP + connection of SSH won't allow access to these pods, we employ a + ProxyCommand to establish the required communication channel. We offer this + in two different networking options: NodePort/port-forward. + + With the NodePort networking mode, a NodePort service is launched. This + service opens an external port on the node which redirects to the desired + port within the pod. When establishing an SSH session in this mode, the + ProxyCommand makes use of this external port to create a communication + channel directly to port 22, which is the default port ssh server listens + on, of the jump pod. + + With Port-forward mode, instead of directly exposing an external port, + 'kubectl port-forward' sets up a tunnel between a local port + (127.0.0.1:23100) and port 22 of the jump pod. Then we establish a TCP + connection to the local end of this tunnel, 127.0.0.1:23100, using 'socat'. + This is setup in the inner ProxyCommand of the nested ProxyCommand, and the + rest is the same as NodePort approach, which the outer ProxyCommand + establishes a communication channel between 127.0.0.1:23100 and port 22 on + the jump pod. Consequently, any stdin provided on the local machine is + forwarded through this tunnel to the application (SSH server) listening in + the pod. Similarly, any output from the application in the pod is tunneled + back and displayed in the terminal on the local machine. + + Args: + private_key_path: str; Path to the private key to use for SSH. + This key must be authorized to access the SSH jump pod. + ssh_jump_name: str; Name of the SSH jump service to use + network_mode: KubernetesNetworkingMode; networking mode for ssh + session. It is either 'NODEPORT' or 'PORTFORWARD' + namespace: Kubernetes namespace to use + port_fwd_proxy_cmd_path: str; path to the script used as Proxycommand + with 'kubectl port-forward' + port_fwd_proxy_cmd_template: str; template used to create + 'kubectl port-forward' Proxycommand + """ + # Fetch IP to connect to for the jump svc + ssh_jump_ip = get_external_ip(network_mode) + if network_mode == KubernetesNetworkingMode.NODEPORT: + ssh_jump_port = get_port(ssh_jump_name, namespace) + ssh_jump_proxy_command = construct_ssh_jump_command( + private_key_path, ssh_jump_port, ssh_jump_ip) + # Setting kubectl port-forward/socat to establish ssh session using + # ClusterIP service to disallow any ports opened + else: + ssh_jump_port = LOCAL_PORT_FOR_PORT_FORWARD + vars_to_fill = { + 'ssh_jump_name': ssh_jump_name, + 'local_port': ssh_jump_port, + } + backend_utils.fill_template(port_fwd_proxy_cmd_template, + vars_to_fill, + output_path=port_fwd_proxy_cmd_path) + ssh_jump_proxy_command = construct_ssh_jump_command( + private_key_path, ssh_jump_port, ssh_jump_ip, + port_fwd_proxy_cmd_path) + return ssh_jump_proxy_command + + +def setup_ssh_jump_svc(ssh_jump_name: str, namespace: str, + service_type: KubernetesServiceType): + """Sets up Kubernetes service resource to access for SSH jump pod. + + This method acts as a necessary complement to be run along with + setup_ssh_jump_pod(...) method. This service ensures the pod is accessible. + + Args: + ssh_jump_name: Name to use for the SSH jump service + namespace: Namespace to create the SSH jump service in + service_type: Networking configuration on either to use NodePort + or ClusterIP service to ssh in + """ + # Fill in template - ssh_key_secret and ssh_jump_image are not required for + # the service spec, so we pass in empty strs. + content = fill_ssh_jump_template('', '', ssh_jump_name, service_type.value) + # Create service + try: + kubernetes.core_api().create_namespaced_service(namespace, + content['service_spec']) + except kubernetes.api_exception() as e: + # SSH Jump Pod service already exists. + if e.status == 409: + ssh_jump_service = kubernetes.core_api().read_namespaced_service( + name=ssh_jump_name, namespace=namespace) + curr_svc_type = ssh_jump_service.spec.type + if service_type.value == curr_svc_type: + # If the currently existing SSH Jump service's type is identical + # to user's configuration for networking mode + logger.debug( + f'SSH Jump Service {ssh_jump_name} already exists in the ' + 'cluster, using it.') + else: + # If a different type of service type for SSH Jump pod compared + # to user's configuration for networking mode exists, we remove + # existing servie to create a new one following user's config + kubernetes.core_api().delete_namespaced_service( + name=ssh_jump_name, namespace=namespace) + kubernetes.core_api().create_namespaced_service( + namespace, content['service_spec']) + port_forward_mode = KubernetesNetworkingMode.PORTFORWARD.value + nodeport_mode = KubernetesNetworkingMode.NODEPORT.value + clusterip_svc = KubernetesServiceType.CLUSTERIP.value + nodeport_svc = KubernetesServiceType.NODEPORT.value + curr_network_mode = port_forward_mode \ + if curr_svc_type == clusterip_svc else nodeport_mode + new_network_mode = nodeport_mode \ + if curr_svc_type == clusterip_svc else port_forward_mode + new_svc_type = nodeport_svc \ + if curr_svc_type == clusterip_svc else clusterip_svc + logger.info( + f'Switching the networking mode from ' + f'\'{curr_network_mode}\' to \'{new_network_mode}\' ' + f'following networking configuration. Deleting existing ' + f'\'{curr_svc_type}\' service and recreating as ' + f'\'{new_svc_type}\' service.') + else: + raise + else: + logger.info(f'Created SSH Jump Service {ssh_jump_name}.') + + +def setup_ssh_jump_pod(ssh_jump_name: str, ssh_jump_image: str, + ssh_key_secret: str, namespace: str): + """Sets up Kubernetes RBAC and pod for SSH jump host. + + Our Kubernetes implementation uses a SSH jump pod to reach SkyPilot clusters + running inside a cluster. This function sets up the resources needed for + the SSH jump pod. This includes a service account which grants the jump pod + permission to watch for other SkyPilot pods and terminate itself if there + are no SkyPilot pods running. + + setup_ssh_jump_service must also be run to ensure that the SSH jump pod is + reachable. + + Args: + ssh_jump_image: Container image to use for the SSH jump pod + ssh_jump_name: Name to use for the SSH jump pod + ssh_key_secret: Secret name for the SSH key stored in the cluster + namespace: Namespace to create the SSH jump pod in + """ + # Fill in template - service is created separately so service_type is not + # required, so we pass in empty str. + content = fill_ssh_jump_template(ssh_key_secret, ssh_jump_image, + ssh_jump_name, '') + # ServiceAccount + try: + kubernetes.core_api().create_namespaced_service_account( + namespace, content['service_account']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + 'SSH Jump ServiceAccount already exists in the cluster, using ' + 'it.') + else: + raise + else: + logger.info('Created SSH Jump ServiceAccount.') + # Role + try: + kubernetes.auth_api().create_namespaced_role(namespace, content['role']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + 'SSH Jump Role already exists in the cluster, using it.') + else: + raise + else: + logger.info('Created SSH Jump Role.') + # RoleBinding + try: + kubernetes.auth_api().create_namespaced_role_binding( + namespace, content['role_binding']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + 'SSH Jump RoleBinding already exists in the cluster, using ' + 'it.') + else: + raise + else: + logger.info('Created SSH Jump RoleBinding.') + # Pod + try: + kubernetes.core_api().create_namespaced_pod(namespace, + content['pod_spec']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + f'SSH Jump Host {ssh_jump_name} already exists in the cluster, ' + 'using it.') + else: + raise + else: + logger.info(f'Created SSH Jump Host {ssh_jump_name}.') + + +def clean_zombie_ssh_jump_pod(namespace: str, node_id: str): + """Analyzes SSH jump pod and removes if it is in a bad state + + Prevents the existence of a dangling SSH jump pod. This could happen + in case the pod main container did not start properly (or failed). In that + case, jump pod lifecycle manager will not function properly to + remove the pod and service automatically, and must be done manually. + + Args: + namespace: Namespace to remove the SSH jump pod and service from + node_id: Name of head pod + """ + + def find(l, predicate): + """Utility function to find element in given list""" + results = [x for x in l if predicate(x)] + return results[0] if len(results) > 0 else None + + # Get the SSH jump pod name from the head pod + try: + pod = kubernetes.core_api().read_namespaced_pod(node_id, namespace) + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(f'Failed to get pod {node_id},' + ' but the pod was not found (404).') + raise + else: + ssh_jump_name = pod.metadata.labels.get('skypilot-ssh-jump') + try: + ssh_jump_pod = kubernetes.core_api().read_namespaced_pod( + ssh_jump_name, namespace) + cont_ready_cond = find(ssh_jump_pod.status.conditions, + lambda c: c.type == 'ContainersReady') + if cont_ready_cond and \ + cont_ready_cond.status == 'False': + # The main container is not ready. To be on the safe side + # and prevent a dangling ssh jump pod, lets remove it and + # the service. Otherwise main container is ready and its lifecycle + # management script takes care of the cleaning. + kubernetes.core_api().delete_namespaced_pod(ssh_jump_name, + namespace) + kubernetes.core_api().delete_namespaced_service( + ssh_jump_name, namespace) + # only warn and proceed as usual + except kubernetes.api_exception() as e: + logger.warning( + f'Tried to check ssh jump pod {ssh_jump_name},' + f' but got error {e}\n. Consider running `kubectl ' + f'delete pod {ssh_jump_name} -n {namespace}` to manually ' + 'remove the pod if it has crashed.') + # We encountered an issue while checking ssh jump pod. To be on + # the safe side, lets remove its service so the port is freed + try: + kubernetes.core_api().delete_namespaced_service( + ssh_jump_name, namespace) + except kubernetes.api_exception(): + pass + + +def fill_ssh_jump_template(ssh_key_secret: str, ssh_jump_image: str, + ssh_jump_name: str, service_type: str) -> Dict: + template_path = os.path.join(sky.__root_dir__, 'templates', + 'kubernetes-ssh-jump.yml.j2') + if not os.path.exists(template_path): + raise FileNotFoundError( + 'Template "kubernetes-ssh-jump.j2" does not exist.') + with open(template_path) as fin: + template = fin.read() + j2_template = jinja2.Template(template) + cont = j2_template.render(name=ssh_jump_name, + image=ssh_jump_image, + secret=ssh_key_secret, + service_type=service_type) + content = yaml.safe_load(cont) + return content + + +def check_port_forward_mode_dependencies() -> None: + """Checks if 'socat' and 'lsof' is installed""" + for name, option in [('socat', '-V'), ('lsof', '-v')]: + try: + subprocess.run([name, option], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True) + except FileNotFoundError: + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'`{name}` is required to setup Kubernetes cloud with ' + f'`{KubernetesNetworkingMode.PORTFORWARD.value}` default ' + 'networking mode and it is not installed. ' + 'On Debian/Ubuntu, install it with:\n' + f' $ sudo apt install {name}\n' + f'On MacOS, install it with: \n' + f' $ brew install {name}') from None diff --git a/tests/kubernetes/README.md b/tests/kubernetes/README.md index f94c9dc50fd..17b462bde5d 100644 --- a/tests/kubernetes/README.md +++ b/tests/kubernetes/README.md @@ -34,15 +34,14 @@ sky local up CLUSTER_NAME=testclusterromil gcloud beta container --project "${PROJECT_ID}" clusters create "${CLUSTER_NAME}" --zone "us-central1-c" --no-enable-basic-auth --cluster-version "1.27.3-gke.100" --release-channel "regular" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-t4,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "1" --logging=SYSTEM,WORKLOAD --monitoring=SYSTEM --enable-ip-alias --network "projects/${PROJECT_ID}/global/networks/default" --subnetwork "projects/${PROJECT_ID}/regions/us-central1/subnetworks/default" --no-enable-intra-node-visibility --default-max-pods-per-node "110" --security-posture=standard --workload-vulnerability-scanning=disabled --no-enable-master-authorized-networks --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --enable-managed-prometheus --enable-shielded-nodes --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "v100" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-v100,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "1" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "largecpu" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "n1-standard-16" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "1" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" ``` -2. Make sure ports 30000-32767 are open in your node pool VPC's firewall. -3. Get the kubeconfig for your cluster and place it in `~/.kube/config`: +2. Get the kubeconfig for your cluster and place it in `~/.kube/config`: ```bash gcloud container clusters get-credentials --region # Example: # gcloud container clusters get-credentials testcluster --region us-central1-c ``` -4. Verify by running `kubectl get nodes`. You should see your nodes. -5. **If you want GPU support**, make sure you install GPU drivers by running: +3. Verify by running `kubectl get nodes`. You should see your nodes. +4. **If you want GPU support**, make sure you install GPU drivers by running: ```bash # If using COS based nodes (e.g., in the example above): kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded.yaml @@ -54,17 +53,18 @@ sky local up ```bash kubectl describe nodes ``` -6. Run `sky check`. +5. Run `sky check`. ```bash sky check ``` -7. You can run SkyPilot tasks now. After you're done, delete the cluster by running: +6. You can run SkyPilot tasks now. After you're done, delete the cluster by running: ```bash gcloud container clusters delete --region # Example: # gcloud container clusters delete testcluster --region us-central1-c ``` +NOTE - If are using nodeport networking, make sure port 32100 is open in your node pool VPC's firewall. ## Running a EKS cluster 1. Create a EKS cluster with at least 1 node. We recommend creating nodes with at least 4 vCPUs. @@ -72,9 +72,8 @@ sky local up ```bash eksctl create -f tests/kubernetes/eks_test_cluster.yaml ``` -2. Make sure ports 30000-32767 are open in your EKS cluster's default security group. -3. Verify by running `kubectl get nodes`. You should see your nodes. -4. **If you want GPU support**, EKS clusters already come with GPU drivers setup. However, you'll need to label the nodes with the GPU type. Use the SkyPilot node labelling tool to do so: +2. Verify by running `kubectl get nodes`. You should see your nodes. +3. **If you want GPU support**, EKS clusters already come with GPU drivers setup. However, you'll need to label the nodes with the GPU type. Use the SkyPilot node labelling tool to do so: ```bash python -m sky.utils.kubernetes.gpu_labeler ``` @@ -94,11 +93,13 @@ sky local up ```bash sky check ``` -6. After you are done, delete the cluster by running: +5. After you are done, delete the cluster by running: ```bash eksctl delete cluster -f tests/kubernetes/eks_test_cluster.yaml ``` +NOTE - If are using nodeport networking, make sure port 32100 is open in your EKS cluster's default security group. + ## Other useful scripts `scripts` directory contains other useful scripts for development, including Kubernetes dashboard, ray yaml for testing the SkyPilot Kubernetes node provider diff --git a/tests/kubernetes/build_image.sh b/tests/kubernetes/build_image.sh index 40dbad5be3f..675a15b12bc 100755 --- a/tests/kubernetes/build_image.sh +++ b/tests/kubernetes/build_image.sh @@ -10,6 +10,9 @@ TAG=us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot +push=false +gpu=false + # Parse command line arguments while getopts ":pg" opt; do case ${opt} in @@ -29,34 +32,37 @@ while getopts ":pg" opt; do done # Add -gpu to the tag if the GPU image is being built -if [[ $gpu ]]; then +if [[ $gpu == "true" ]]; then TAG=$TAG-gpu:latest else TAG=$TAG:latest fi +# Shift off the options +shift $((OPTIND-1)) + + # Navigate to the root of the project (inferred from git) cd "$(git rev-parse --show-toplevel)" # If push is used, build the image for both amd64 and arm64 -if [[ $push ]]; then +if [[ $push == "true" ]]; then # If gpu is used, build the GPU image - if [[ $gpu ]]; then - echo "Building and pushing GPU image for amd64" + if [[ $gpu == "true" ]]; then + echo "Building and pushing GPU image for amd64: $TAG" docker buildx build --push --platform linux/amd64 -t $TAG -f Dockerfile_k8s_gpu ./sky - fi - # Else, build the CPU image else - echo "Building and pushing CPU image for amd64 and arm64" + echo "Building and pushing CPU image for amd64 and arm64: $TAG" docker buildx build --push --platform linux/arm64,linux/amd64 -t $TAG -f Dockerfile_k8s ./sky + fi fi # Load the right image depending on the architecture of the host machine (Apple Silicon or Intel) if [[ $(uname -m) == "arm64" ]]; then - echo "Loading image for arm64 (Apple Silicon etc.)" + echo "Loading image for arm64 (Apple Silicon etc.): $TAG" docker buildx build --load --platform linux/arm64 -t $TAG -f Dockerfile_k8s ./sky elif [[ $(uname -m) == "x86_64" ]]; then - echo "Building for amd64 (Intel CPUs)" + echo "Building for amd64 (Intel CPUs): $TAG" docker buildx build --load --platform linux/amd64 -t $TAG -f Dockerfile_k8s ./sky else echo "Unsupported architecture: $(uname -m)" diff --git a/tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md b/tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md new file mode 100644 index 00000000000..050444ac94d --- /dev/null +++ b/tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md @@ -0,0 +1,55 @@ +# Kubernetes Networking Benchmarking + +A SkyPilot pod in Kubernetes can be accessed via three methods: +1. `direct`: NodePort service directly exposing the pod's SSH port +2. `sshjump`: NodePort service exposing a SSH jump pod that connects to the SkyPilot pod +3. `port-forward`: Uses `kubectl port-forward` to connect to ClusterIP service pointing to a SSH jump pod that connects to the SkyPilot pod + +`direct` requires opening a large range of ports on the cluster's firewall. +`sshjump` requires opening only one port on the cluster's firewall, but requires an additional SSH connection to the jump pod. +`port-forward` does not require opening any ports on the cluster's firewall, but routes all traffic over the kubernetes control plane. + +This document benchmarks the three approaches on a Kind cluster and a GKE cluster. + +We run two kinds of benchmarks: +1. `sky launch` benchmarks: how long does it take to launch a SkyPilot pod +2. Rsync benchmarks: how long does it take to copy a directory containing 1000 1MB files to the SkyPilot pod + +In summary, we find that `direct` is only marginally faster (~10%) than `sshjump` and `port-forward` for both `sky launch` and rsync benchmarks. + +Given these results, this document recommends using `port-forward` for all SkyPilot deployments because of its significant ease of use and security benefits. + +## Benchmark environment +These benchmarks were run on a 2023 M2 Max Macbook Pro with 32GB of RAM. Each benchmark was run on a GKE cluster and a local kind cluster (`sky local up`). Kubernetes v1.27 was used. This is on a 100mbit home connection. + +Note that GKE benchmarks, particularly rsync, are sensitive the network connection between the benchmarking machine and the GKE cluster. + +# `sky launch` benchmarks + +Runs 5 sky launch times and reports the average of the last four runs. + +Usage: +``` +./skylaunch_bench.sh +# e.g., `./skylaunch_bench.sh gkedirect` will create a file called skylaunch_results_gkedirect.txt +``` + +| | Direct | SSHJump | port-forward | +|-------------|---------|---------|--------------| +| **GKE** | 64.51s | 62.51s | 69.75s | +| **Kind** | 26.65s | 28.37s | 28.75s | + +## Rsync benchmarks + +Creates a directory with 1000 1MB files and copies it to the SkyPilot pod. Runs 5 rsync times and reports the average of the last four runs. + +Usage: +``` +./rsync_bench.sh +# e.g., `./rsync_bench.sh gkedirect` will create a file called rsync_results_gkedirect.txt +``` + +| | Direct | SSHJump | port-forward | +|-------------|---------|---------|--------------| +| **GKE** | 337.49s | 347.49s | 361.49s | +| **Kind** | 31.49s | 31.71s | 33.21s | diff --git a/tests/kubernetes/networking_benchmarks/rsync_bench.sh b/tests/kubernetes/networking_benchmarks/rsync_bench.sh new file mode 100644 index 00000000000..de0b4bf0845 --- /dev/null +++ b/tests/kubernetes/networking_benchmarks/rsync_bench.sh @@ -0,0 +1,76 @@ +#!/bin/bash + +average=0 +# Check if the command line argument (suffix) is provided +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi + +suffix="$1" + +# Declare a file to store the output of the time command with the given suffix +output_file="rsync_results_${suffix}.txt" + +runexpt=1 + +# Check if the output file exists and ask if it should be overwritten +if [ -f "$output_file" ]; then + read -p "The output file $output_file already exists. Do you want to overwrite it? (y/n) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Analyzing existing results..." + runexpt=0 + fi +fi + +if [ "$runexpt" -eq 1 ]; then + # Delete existing results + rm -f "$output_file" + + # Setup 1000 1MB (total 1GB) files to copy + mkdir -p $HOME/tmp/uploadtest + for i in {1..1000}; do dd if=/dev/urandom of=$HOME/tmp/uploadtest/$i bs=1M count=1; done + + # Create the cluster + sky launch -y -c test + + for i in {1..5}; do + ( + # Use the `time` command in a subshell to capture its output + time rsync -avz ~/tmp/uploadtest/ test:~/sky_workdir + ) 2>> "$output_file" + ssh test 'rm -rf ~/sky_workdir/' + done + + # Delete cluster after done + sky down -y test +fi + +# Process the results from the 2nd to 5th run +count=0 +while read -r line; do + # Check for the real time output from the time command + if [[ $line == real* ]]; then + if [ "$count" -eq 0 ]; then + # Skip first result + count=$((count+1)) + continue + fi + count=$((count+1)) + # Extract the minutes and seconds and convert to seconds + minutes=$(echo $line | cut -d'm' -f1 | sed 's/real //') + seconds=$(echo $line | cut -d'm' -f2 | cut -d's' -f1) + total_seconds=$(echo "$minutes*60 + $seconds" | bc) + # Accumulate the total time + average=$(echo "$average + $total_seconds" | bc) + fi +done < <(cat "$output_file") # start reading from the 2nd run + +# Subtract one from the count to account for the skipped first result +count=$((count-1)) +# Compute the average time +average=$(echo "scale=2; $average/$count" | bc) + +# Print the average time +echo "Average total time (from 2nd to 5th run): $average seconds" diff --git a/tests/kubernetes/networking_benchmarks/skylaunch_bench.sh b/tests/kubernetes/networking_benchmarks/skylaunch_bench.sh new file mode 100644 index 00000000000..ee67bc7a233 --- /dev/null +++ b/tests/kubernetes/networking_benchmarks/skylaunch_bench.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +average=0 +# Check if the command line argument (suffix) is provided +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi + +suffix="$1" + +# Declare a file to store the output of the time command with the given suffix +output_file="skylaunch_results_${suffix}.txt" + +runexpt=1 + +# Check if the output file exists and ask if it should be overwritten +if [ -f "$output_file" ]; then + read -p "The output file $output_file already exists. Do you want to overwrite it? (y/n) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Analyzing existing results..." + runexpt=0 + fi +fi + +if [ "$runexpt" -eq 1 ]; then + # Delete existing results + rm -f "$output_file" + for i in {1..5}; do + ( + # Use the `time` command in a subshell to capture its output + time sky launch -y -c test + ) 2>> "$output_file" + sky down -y test + done +fi + +# Process the results from the 2nd to 5th run +count=0 +while read -r line; do + # Check for the real time output from the time command + if [[ $line == real* ]]; then + if [ "$count" -eq 0 ]; then + # Skip first result + count=$((count+1)) + continue + fi + count=$((count+1)) + # Extract the minutes and seconds and convert to seconds + minutes=$(echo $line | cut -d'm' -f1 | sed 's/real //') + seconds=$(echo $line | cut -d'm' -f2 | cut -d's' -f1) + total_seconds=$(echo "$minutes*60 + $seconds" | bc) + # Accumulate the total time + average=$(echo "$average + $total_seconds" | bc) + fi +done < <(cat "$output_file") # start reading from the 2nd run + +# Subtract one from the count to account for the skipped first result +count=$((count-1)) +# Compute the average time +average=$(echo "scale=2; $average/$count" | bc) + +# Print the average time +echo "Average total time (from 2nd to 5th run): $average seconds" diff --git a/tests/test_config.py b/tests/test_config.py index dffd4843ffd..72d16ac362a 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,9 +6,12 @@ from sky import skypilot_config from sky.utils import common_utils +from sky.utils import kubernetes_utils VPC_NAME = 'vpc-12345678' PROXY_COMMAND = 'ssh -W %h:%p -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no' +NODEPORT_MODE_NAME = kubernetes_utils.KubernetesNetworkingMode.NODEPORT.value +PORT_FORWARD_MODE_NAME = kubernetes_utils.KubernetesNetworkingMode.PORTFORWARD.value def _reload_config() -> None: @@ -34,6 +37,8 @@ def _create_config_file(config_file_path: pathlib.Path) -> None: vpc_name: {VPC_NAME} use_internal_ips: true ssh_proxy_command: {PROXY_COMMAND} + kubernetes: + networking: {NODEPORT_MODE_NAME} """)) @@ -67,14 +72,18 @@ def test_config_get_set_nested(monkeypatch, tmp_path) -> None: assert skypilot_config.get_nested(('aws', 'use_internal_ips'), None) assert skypilot_config.get_nested(('aws', 'ssh_proxy_command'), None) == PROXY_COMMAND - + assert skypilot_config.get_nested(('kubernetes', 'networking'), + None) == NODEPORT_MODE_NAME # Check set_nested() will copy the config dict and return a new dict new_config = skypilot_config.set_nested(('aws', 'ssh_proxy_command'), 'new_value') assert new_config['aws']['ssh_proxy_command'] == 'new_value' assert skypilot_config.get_nested(('aws', 'ssh_proxy_command'), None) == PROXY_COMMAND - + new_config = skypilot_config.set_nested(('kubernetes', 'networking'), + PORT_FORWARD_MODE_NAME) + assert skypilot_config.get_nested(('kubernetes', 'networking'), + None) == NODEPORT_MODE_NAME # Check that dumping the config to a file with the new None can be reloaded new_config2 = skypilot_config.set_nested(('aws', 'ssh_proxy_command'), None) new_config_path = tmp_path / 'new_config.yaml'