From f0d3dfcdf297e215d0d4b7a8f021e20d34d17233 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Sat, 16 Sep 2023 10:48:56 -0700 Subject: [PATCH] [K8s] Zero config networking for Kubernetes (#2500) * Working Ray K8s node provider based on SSH * wip * working provisioning with SkyPilot and ssh config * working provisioning with SkyPilot and ssh config * Updates to master * ray2.3 * Clean up docs * multiarch build * hacking around ray start * more port fixes * fix up default instance selection * fix resource selection * Add provisioning timeout by checking if pods are ready * Working mounting * Remove catalog * fixes * fixes * Fix ssh-key auth to create unique secrets * Fix for ContainerCreating timeout * Fix head node ssh port caching * mypy * lint * fix ports * typo * cleanup * cleanup * wip * Update setup * readme updates * lint * Fix failover * Fix failover * optimize setup * Fix sync down logs for k8s * test wip * instance name parsing wip * Fix instance name parsing * Merge fixes for query_status * [k8s_cloud] Delete k8s service resources. (#2105) Delete k8s service resources. - 'sky down' for Kubernetes cloud to remove cluster service resources. * Status refresh WIP * refactor to kubernetes adaptor * tests wip * clean up auth * wip tests * cli * cli * sky local up/down cli * cli * lint * lint * lint * Speed up kind cluster creation * tests * lint * tests * handling for non-reachable clusters * Invalid kubeconfig handling * Timeout for sky check * code cleanup * lint * Do not raise error if GPUs requested, return empty list * Address comments * comments * lint * Remove public key upload * GPU support init * wip * add shebang * comments * change permissions * remove chmod * merge 2241 * add todo * Handle kube config management for sky local commands (#2253) * Set current-context (if availablee) after sky local down and remove incorrect prompt in sky local up * Warn user of kubeconfig context switch during sky local up * Use Optional instead of Union * Switch context in create_cluster if cluster already exists. * fix typo * update sky check error msg after sky local down * lint * update timeout check * fix import error * Fix kube API access from within cluster (load_incluster_auth) * lint * lint * working autodown and sky status -r * lint * add test_kubernetes_autodown * lint * address comments * address comments * lint * deletion timeouts wip * [k8s_cloud] Ray pod not created under current context namespace. (#2302) 'namespace' exists under 'context' key. * head ssh port namespace fix * [k8s-cloud] Typo in sky local --help. (#2308) Typo. * [k8s-cloud] Set build_image.sh to be executable. (#2307) * Set build_image.sh to be executable. * Use TAG to easily switch between registries. * remove ingress * remove debug statements * UX and readme updates * lint * fix logging for 409 retry * lint * lint * Debug dockerfile * wip * Fix GPU image * Query cloud specific env vars in task setup (#2347) * Query cloud specific env vars in task setup * Make query_env_vars specific to Kubernetes cloud * Address PR comments * working GPU type selection for GKE and EKS. GFD needs work. * TODO for auto-detection * Add image toggling for CPU/GPU * Add image toggling for CPU/GPU * Fix none acce_type * remove memory from j2 * Make resnet examples run again * lint * v100 readme * dockerfile and smoketest * fractional cpu and mem * nits * refactor utils * lint and cleanup * lint and cleanup * lint and cleanup * lint and cleanup * lint and cleanup * lint and cleanup * lint * lint * manual lint * manual isort * test readme update * Remove EKS * lint * add gpu labeler * updates * lint * update script * ux * fix formatter * test update * test update * fix test_optimizer_dryruns * docs * cleanup * test readme update * lint * lint * [k8s_cloud_beta1] Add sshjump host support. (#2369) * Update build image * fix image path * fix merge * cleanup * lint * fix utils ref * typo * refactor pod creation * lint * merge fixes * portfix * merge fixes * [k8s_cloud_beta1] Sky down for a cluster deployed in Kubernetes to possibly remove sshjump pod. (#2425) * Sky down for a kubernetes cluster to possibly terminate sshjump pod. - If the related sshjump pod is being reported as its main container not have been started, then remove its pod and service. This is to minimize the chances for remaining with dangling sshjump pod. * Remove sshjump service in case of an failure to analyze sshjump. - remove _request_timeout as it might not be needed due to terminationGracePeriodSeconds being set in sshjump template. * Move sshjump analysis to kubernetes_utils. * Apply changes per ./format.sh. * Minor comment rephrase. * Use sshjump_name from ray pod label. - rather than from clouds.Kubernetes * cleanup * Add networking benchmarks * comment * comment * lint * autodown fixes * lint * fix label * [k8s_cloud_beta1] Adding support for ssh using kubectl port-forward to access k8s instance (#2412) * Add sshjump support. * Update lcm script. - add comments - rename variables - typo * Set imagePullPolicy to IfNotPresent. * add support for port-forward * remove unused * comments * Disable ControlMaster for ssh_options_list * nit * update to disable rest of the ControlMaster * command runner rsync update * relocating run_on_k8s * relocate run_on_k8s * Make Kubernetes specific env variables available when joining a cluster via SSH * merge k8s_cloud_beta1 * format * remove redundant utils.py * format and comments * update with proxy_to_k8s * Update sky/authentication.py Co-authored-by: Romil Bhardwaj * resolving comments on structures * Update sky/utils/command_runner.py Co-authored-by: Romil Bhardwaj * document on nodeport/port-forward proxycommand * error handling when socat is not installed * removing KUBECONFIG from port-forward shell script * nit * nit * Add suport for nodeport * Update sky/utils/kubernetes_utils.py Co-authored-by: Romil Bhardwaj * update * switch svc when conflicting jump pod svc exist * format * Update sky/utils/kubernetes_utils.py Co-authored-by: Romil Bhardwaj * refactoring check for socat * resolve comments * add ServiceType enum and port-forward proxy script * update k8s env var access * add check for container status remove unused func * nit * update get_external_ip for portforward mode * conditionally use sudo and quote values of env var --------- Co-authored-by: Avi Weit Co-authored-by: hemildesai Co-authored-by: Romil Bhardwaj * refactor * fix * updates * lint * Update sky/skylet/providers/kubernetes/node_provider.py * fix test * [k8s] Showing reasons for provisioning failure in K8s (#2422) * surface provision failure message * nit * nit * format * nit * CPU message fix * update Insufficient memory handling * nit * nit * Update sky/skylet/providers/kubernetes/node_provider.py Co-authored-by: Romil Bhardwaj * Update sky/skylet/providers/kubernetes/node_provider.py Co-authored-by: Romil Bhardwaj * Update sky/skylet/providers/kubernetes/node_provider.py Co-authored-by: Romil Bhardwaj * Update sky/skylet/providers/kubernetes/node_provider.py Co-authored-by: Romil Bhardwaj * format * update gpu failure message and condition * fix GPU handling cases * fix * comment * nit * add try except block with general error handling --------- Co-authored-by: Romil Bhardwaj * cleanup * lint * fix for ssh jump image_id * comments * ssh jump refactor * lint * image build fixes --------- Co-authored-by: Avi Weit Co-authored-by: Hemil Desai Co-authored-by: Doyoung Kim <34902420+landscapepainter@users.noreply.github.com> --- Dockerfile_k8s | 3 + Dockerfile_k8s_gpu | 3 + sky/authentication.py | 43 ++ sky/backends/backend_utils.py | 6 +- sky/backends/cloud_vm_ray_backend.py | 49 +-- sky/clouds/kubernetes.py | 51 +-- sky/skylet/providers/kubernetes/config.py | 36 ++ .../providers/kubernetes/node_provider.py | 160 ++++++-- ...ubernetes-port-forward-proxy-command.sh.j2 | 43 ++ sky/templates/kubernetes-ray.yml.j2 | 11 +- sky/templates/kubernetes-ssh-jump.yml.j2 | 90 +++++ sky/utils/command_runner.py | 42 +- sky/utils/command_runner.pyi | 33 +- .../kubernetes/ssh_jump_lifecycle_manager.py | 111 +++++ sky/utils/kubernetes_utils.py | 381 +++++++++++++++++- tests/kubernetes/README.md | 21 +- tests/kubernetes/build_image.sh | 24 +- .../k8s_network_benchmarks.md | 55 +++ .../networking_benchmarks/rsync_bench.sh | 76 ++++ .../networking_benchmarks/skylaunch_bench.sh | 65 +++ tests/test_config.py | 13 +- 21 files changed, 1156 insertions(+), 160 deletions(-) create mode 100644 sky/templates/kubernetes-port-forward-proxy-command.sh.j2 create mode 100644 sky/templates/kubernetes-ssh-jump.yml.j2 create mode 100644 sky/utils/kubernetes/ssh_jump_lifecycle_manager.py create mode 100644 tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md create mode 100644 tests/kubernetes/networking_benchmarks/rsync_bench.sh create mode 100644 tests/kubernetes/networking_benchmarks/skylaunch_bench.sh diff --git a/Dockerfile_k8s b/Dockerfile_k8s index 12dbaa9006c..cf6ff86cbed 100644 --- a/Dockerfile_k8s +++ b/Dockerfile_k8s @@ -45,6 +45,9 @@ RUN cd /skypilot/ && \ sudo mv -v sky/setup_files/* . && \ pip install ".[aws]" +# Set PYTHONUNBUFFERED=1 to have Python print to stdout/stderr immediately +ENV PYTHONUNBUFFERED=1 + # Set WORKDIR and initialize conda for sky user WORKDIR /home/sky RUN conda init diff --git a/Dockerfile_k8s_gpu b/Dockerfile_k8s_gpu index 413253d08b8..6f242b6f5ab 100644 --- a/Dockerfile_k8s_gpu +++ b/Dockerfile_k8s_gpu @@ -53,6 +53,9 @@ RUN cd /skypilot/ && \ sudo mv -v sky/setup_files/* . && \ pip install ".[aws]" +# Set PYTHONUNBUFFERED=1 to have Python print to stdout/stderr immediately +ENV PYTHONUNBUFFERED=1 + # Set WORKDIR and initialize conda for sky user WORKDIR /home/sky RUN conda init diff --git a/sky/authentication.py b/sky/authentication.py index 27029b982de..022dba9264c 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -37,10 +37,12 @@ from sky import clouds from sky import sky_logging +from sky import skypilot_config from sky.adaptors import gcp from sky.adaptors import ibm from sky.skylet.providers.lambda_cloud import lambda_utils from sky.utils import common_utils +from sky.utils import kubernetes_utils from sky.utils import subprocess_utils from sky.utils import ux_utils @@ -377,6 +379,21 @@ def setup_scp_authentication(config: Dict[str, Any]) -> Dict[str, Any]: def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + # Default ssh session is established with kubectl port-forwarding with + # ClusterIP service. + nodeport_mode = kubernetes_utils.KubernetesNetworkingMode.NODEPORT + port_forward_mode = kubernetes_utils.KubernetesNetworkingMode.PORTFORWARD + network_mode_str = skypilot_config.get_nested(('kubernetes', 'networking'), + port_forward_mode.value) + try: + network_mode = kubernetes_utils.KubernetesNetworkingMode.from_str( + network_mode_str) + except ValueError as e: + # Add message saying "Please check: ~/.sky/config.yaml" to the error + # message. + with ux_utils.print_exception_no_traceback(): + raise ValueError(str(e) + ' Please check: ~/.sky/config.yaml.') \ + from None get_or_generate_keys() # Run kubectl command to add the public key to the cluster. @@ -403,4 +420,30 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: logger.error(suffix) raise + ssh_jump_name = clouds.Kubernetes.SKY_SSH_JUMP_NAME + if network_mode == nodeport_mode: + service_type = kubernetes_utils.KubernetesServiceType.NODEPORT + elif network_mode == port_forward_mode: + kubernetes_utils.check_port_forward_mode_dependencies() + # Using `kubectl port-forward` creates a direct tunnel to jump pod and + # does not require opening any ports on Kubernetes nodes. As a result, + # the service can be a simple ClusterIP service which we access with + # `kubectl port-forward`. + service_type = kubernetes_utils.KubernetesServiceType.CLUSTERIP + else: + # This should never happen because we check for this in from_str above. + raise ValueError(f'Unsupported networking mode: {network_mode_str}') + # Setup service for SSH jump pod. We create the SSH jump service here + # because we need to know the service IP address and port to set the + # ssh_proxy_command in the autoscaler config. + namespace = kubernetes_utils.get_current_kube_config_context_namespace() + kubernetes_utils.setup_ssh_jump_svc(ssh_jump_name, namespace, service_type) + + ssh_proxy_cmd = kubernetes_utils.get_ssh_proxy_command( + PRIVATE_SSH_KEY_PATH, ssh_jump_name, network_mode, namespace, + clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_PATH, + clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_TEMPLATE) + + config['auth']['ssh_proxy_command'] = ssh_proxy_cmd + return config diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index b3162aa0226..d5f391bffca 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1353,7 +1353,7 @@ def wait_until_ray_cluster_ready( def ssh_credential_from_yaml(cluster_yaml: str, docker_user: Optional[str] = None - ) -> Dict[str, str]: + ) -> Dict[str, Any]: """Returns ssh_user, ssh_private_key and ssh_control name.""" config = common_utils.read_yaml(cluster_yaml) auth_section = config['auth'] @@ -1369,6 +1369,10 @@ def ssh_credential_from_yaml(cluster_yaml: str, } if docker_user is not None: credentials['docker_user'] = docker_user + ssh_provider_module = config['provider']['module'] + # If we are running ssh command on kubernetes node. + if 'kubernetes' in ssh_provider_module: + credentials['disable_control_master'] = True return credentials diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f222a98241e..e72abf774ad 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2319,23 +2319,12 @@ def _update_cluster_region(self): self.launched_resources = self.launched_resources.copy(region=region) def update_ssh_ports(self, max_attempts: int = 1) -> None: - """Updates the cluster SSH ports cached in the handle.""" - # TODO(romilb): Replace this with a call to the cloud class to get ports - # Use port 22 for everything except Kubernetes - if not isinstance(self.launched_resources.cloud, clouds.Kubernetes): - head_ssh_port = 22 - else: - svc_name = f'{self.cluster_name_on_cloud}-ray-head-ssh' - retry_cnt = 0 - while True: - try: - head_ssh_port = clouds.Kubernetes.get_port(svc_name) - break - except Exception: # pylint: disable=broad-except - retry_cnt += 1 - if retry_cnt >= max_attempts: - raise - # TODO(romilb): Multinode doesn't work with Kubernetes yet. + """Fetches and sets the SSH ports for the cluster nodes. + + Use this method to use any cloud-specific port fetching logic. + """ + del max_attempts # Unused. + head_ssh_port = 22 self.stable_ssh_ports = ([head_ssh_port] + [22] * (self.num_node_ips - 1)) @@ -3011,37 +3000,12 @@ def _sync_file_mounts( self._execute_file_mounts(handle, all_file_mounts) self._execute_storage_mounts(handle, storage_mounts) - def _update_envs_for_k8s(self, handle: CloudVmRayResourceHandle, - task: task_lib.Task) -> None: - """Update envs with env vars from Kubernetes if cloud is Kubernetes. - - Kubernetes automatically populates containers with critical environment - variables, such as those for discovering services running in the - cluster and CUDA/nvidia environment variables. We need to update task - environment variables with these env vars. This is needed for GPU - support and service discovery. - - See https://github.com/skypilot-org/skypilot/issues/2287 for - more details. - """ - if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): - temp_envs = copy.deepcopy(task.envs) - cloud_env_vars = handle.launched_resources.cloud.query_env_vars( - handle.cluster_name_on_cloud) - task.update_envs(cloud_env_vars) - - # Re update the envs with the original envs to give priority to - # the original envs. - task.update_envs(temp_envs) - def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, detach_setup: bool) -> None: start = time.time() style = colorama.Style fore = colorama.Fore - self._update_envs_for_k8s(handle, task) - if task.setup is None: return @@ -3350,7 +3314,6 @@ def _execute( # Check the task resources vs the cluster resources. Since `sky exec` # will not run the provision and _check_existing_cluster self.check_resources_fit_cluster(handle, task) - self._update_envs_for_k8s(handle, task) resources_str = backend_utils.get_task_resources_str(task) diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index ae15fe1a8f5..50b358755ed 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -20,7 +20,7 @@ logger = sky_logging.init_logger(__name__) -_CREDENTIAL_PATH = '~/.kube/config' +CREDENTIAL_PATH = '~/.kube/config' @clouds.CLOUD_REGISTRY.register @@ -28,7 +28,10 @@ class Kubernetes(clouds.Cloud): """Kubernetes.""" SKY_SSH_KEY_SECRET_NAME = f'sky-ssh-{common_utils.get_user_hash()}' - + SKY_SSH_JUMP_NAME = f'sky-ssh-jump-{common_utils.get_user_hash()}' + PORT_FORWARD_PROXY_CMD_TEMPLATE = \ + 'kubernetes-port-forward-proxy-command.sh.j2' + PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/port-forward-proxy-cmd.sh' # Timeout for resource provisioning. This timeout determines how long to # wait for pod to be in pending status before giving up. # Larger timeout may be required for autoscaling clusters, since autoscaler @@ -209,6 +212,9 @@ def make_deploy_resources_variables( assert image_id.startswith('skypilot:') image_id = service_catalog.get_image_id_from_tag(image_id, clouds='kubernetes') + # TODO(romilb): Create a lightweight image for SSH jump host + ssh_jump_image = service_catalog.get_image_id_from_tag( + self.IMAGE_CPU, clouds='kubernetes') k8s_acc_label_key = None k8s_acc_label_value = None @@ -229,6 +235,8 @@ def make_deploy_resources_variables( 'k8s_ssh_key_secret_name': self.SKY_SSH_KEY_SECRET_NAME, 'k8s_acc_label_key': k8s_acc_label_key, 'k8s_acc_label_value': k8s_acc_label_value, + 'k8s_ssh_jump_name': self.SKY_SSH_JUMP_NAME, + 'k8s_ssh_jump_image': ssh_jump_image, # TODO(romilb): Allow user to specify custom images 'image_id': image_id, } @@ -298,7 +306,7 @@ def _make(instance_list): @classmethod def check_credentials(cls) -> Tuple[bool, Optional[str]]: - if os.path.exists(os.path.expanduser(_CREDENTIAL_PATH)): + if os.path.exists(os.path.expanduser(CREDENTIAL_PATH)): # Test using python API try: return kubernetes_utils.check_credentials() @@ -307,10 +315,10 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]: f'{common_utils.format_exception(e)}') else: return (False, 'Credentials not found - ' - f'check if {_CREDENTIAL_PATH} exists.') + f'check if {CREDENTIAL_PATH} exists.') def get_credential_file_mounts(self) -> Dict[str, str]: - return {_CREDENTIAL_PATH: _CREDENTIAL_PATH} + return {CREDENTIAL_PATH: CREDENTIAL_PATH} def instance_type_exists(self, instance_type: str) -> bool: return kubernetes_utils.KubernetesInstanceType.is_valid_instance_type( @@ -368,39 +376,6 @@ def query_status(cls, name: str, tag_filters: Dict[str, str], # If pods are not found, we don't add them to the return list return cluster_status - @classmethod - def query_env_vars(cls, name: str) -> Dict[str, str]: - namespace = kubernetes_utils.get_current_kube_config_context_namespace() - pod = kubernetes.core_api().list_namespaced_pod( - namespace, - label_selector=f'skypilot-cluster={name},ray-node-type=head' - ).items[0] - response = kubernetes.stream()( - kubernetes.core_api().connect_get_namespaced_pod_exec, - pod.metadata.name, - namespace, - command=['env'], - stderr=True, - stdin=False, - stdout=True, - tty=False, - _request_timeout=kubernetes.API_TIMEOUT) - # Split response by newline and filter lines containing '=' - raw_lines = response.split('\n') - filtered_lines = [line for line in raw_lines if '=' in line] - - # Split each line at the first '=' occurrence - lines = [line.split('=', 1) for line in filtered_lines] - - # Construct the dictionary using only valid environment variable names - env_vars = {} - for line in lines: - key = line[0] - if common_utils.is_valid_env_var(key): - env_vars[key] = line[1] - - return env_vars - @classmethod def get_current_user_identity(cls) -> Optional[List[str]]: k8s = kubernetes.get_kubernetes() diff --git a/sky/skylet/providers/kubernetes/config.py b/sky/skylet/providers/kubernetes/config.py index 4132af1b89c..2ab466f03ed 100644 --- a/sky/skylet/providers/kubernetes/config.py +++ b/sky/skylet/providers/kubernetes/config.py @@ -55,6 +55,8 @@ def bootstrap_kubernetes(config: Dict[str, Any]) -> Dict[str, Any]: _configure_services(namespace, config['provider']) + config = _configure_ssh_jump(namespace, config) + if not config['provider'].get('_operator'): # These steps are unecessary when using the Operator. _configure_autoscaler_service_account(namespace, config['provider']) @@ -257,6 +259,40 @@ def _configure_autoscaler_role_binding(namespace: str, logger.info(log_prefix + created_msg(binding_field, name)) +def _configure_ssh_jump(namespace, config): + """Creates a SSH jump pod to connect to the cluster. + + Also updates config['auth']['ssh_proxy_command'] to use the newly created + jump pod. + """ + pod_cfg = config['available_node_types']['ray_head_default']['node_config'] + + ssh_jump_name = pod_cfg['metadata']['labels']['skypilot-ssh-jump'] + ssh_jump_image = config['provider']['ssh_jump_image'] + + volumes = pod_cfg['spec']['volumes'] + # find 'secret-volume' and get the secret name + secret_volume = next(filter(lambda x: x['name'] == 'secret-volume', + volumes)) + ssh_key_secret_name = secret_volume['secret']['secretName'] + + # TODO(romilb): We currently split SSH jump pod and svc creation. Service + # is first created in authentication.py::setup_kubernetes_authentication + # and then SSH jump pod creation happens here. This is because we need to + # set the ssh_proxy_command in the ray YAML before we pass it to the + # autoscaler. If in the future if we can write the ssh_proxy_command to the + # cluster yaml through this method, then we should move the service + # creation here. + + # TODO(romilb): We should add a check here to make sure the service is up + # and available before we create the SSH jump pod. If for any reason the + # service is missing, we should raise an error. + + kubernetes_utils.setup_ssh_jump_pod(ssh_jump_name, ssh_jump_image, + ssh_key_secret_name, namespace) + return config + + def _configure_services(namespace: str, provider_config: Dict[str, Any]) -> None: service_field = 'services' diff --git a/sky/skylet/providers/kubernetes/node_provider.py b/sky/skylet/providers/kubernetes/node_provider.py index 77222e72ab5..8963225cc3f 100644 --- a/sky/skylet/providers/kubernetes/node_provider.py +++ b/sky/skylet/providers/kubernetes/node_provider.py @@ -2,7 +2,6 @@ import logging import time from typing import Dict -from urllib.parse import urlparse from uuid import uuid4 from ray.autoscaler._private.command_runner import SSHCommandRunner @@ -13,6 +12,7 @@ from sky.adaptors import kubernetes from sky.skylet.providers.kubernetes import config +from sky.utils import common_utils from sky.utils import kubernetes_utils logger = logging.getLogger(__name__) @@ -100,17 +100,7 @@ def node_tags(self, node_id): return pod.metadata.labels def external_ip(self, node_id): - # Return the IP address of the first node with an external IP - nodes = kubernetes.core_api().list_node().items - for node in nodes: - if node.status.addresses: - for address in node.status.addresses: - if address.type == 'ExternalIP': - return address.address - # If no external IP is found, use the API server IP - api_host = kubernetes.core_api().api_client.configuration.host - parsed_url = urlparse(api_host) - return parsed_url.hostname + return kubernetes_utils.get_external_ip() def external_port(self, node_id): # Extract the NodePort of the head node's SSH service @@ -172,6 +162,61 @@ def _set_node_tags(self, node_id, tags): pod.metadata.labels.update(tags) kubernetes.core_api().patch_namespaced_pod(node_id, self.namespace, pod) + def _raise_pod_scheduling_errors(self, new_nodes): + for new_node in new_nodes: + pod_status = new_node.status.phase + pod_name = new_node._metadata._name + events = kubernetes.core_api().list_namespaced_event( + self.namespace, + field_selector=(f'involvedObject.name={pod_name},' + 'involvedObject.kind=Pod')) + # Events created in the past hours are kept by + # Kubernetes python client and we want to surface + # the latest event message + events_desc_by_time = \ + sorted(events.items, + key=lambda e: e.metadata.creation_timestamp, + reverse=True) + for event in events_desc_by_time: + if event.reason == 'FailedScheduling': + event_message = event.message + break + timeout_err_msg = ('Timed out while waiting for nodes to start. ' + 'Cluster may be out of resources or ' + 'may be too slow to autoscale.') + lack_resource_msg = ( + 'Insufficient {resource} capacity on the cluster. ' + 'Other SkyPilot tasks or pods may be using resources. ' + 'Check resource usage by running `kubectl describe nodes`.') + if event_message is not None: + if pod_status == 'Pending': + if 'Insufficient cpu' in event_message: + raise config.KubernetesError( + lack_resource_msg.format(resource='CPU')) + if 'Insufficient memory' in event_message: + raise config.KubernetesError( + lack_resource_msg.format(resource='memory')) + gpu_lf_keys = [ + lf.get_label_key() + for lf in kubernetes_utils.LABEL_FORMATTER_REGISTRY + ] + if new_node.spec.node_selector: + for label_key in new_node.spec.node_selector.keys(): + if label_key in gpu_lf_keys: + # TODO(romilb): We may have additional node + # affinity selectors in the future - in that + # case we will need to update this logic. + if 'Insufficient nvidia.com/gpu' in event_message or \ + 'didn\'t match Pod\'s node affinity/selector' in event_message: + raise config.KubernetesError( + f'{lack_resource_msg.format(resource="GPU")} ' + f'Verify if {new_node.spec.node_selector[label_key]}' + ' is available in the cluster.') + raise config.KubernetesError(f'{timeout_err_msg} ' + f'Pod status: {pod_status}' + f'Details: \'{event_message}\' ') + raise config.KubernetesError(f'{timeout_err_msg}') + def create_node(self, node_config, tags, count): conf = copy.deepcopy(node_config) pod_spec = conf.get('pod', conf) @@ -204,7 +249,6 @@ def create_node(self, node_config, tags, count): '(count={}).'.format(count)) for new_node in new_nodes: - metadata = service_spec.get('metadata', {}) metadata['name'] = new_node.metadata.name service_spec['metadata'] = metadata @@ -216,16 +260,20 @@ def create_node(self, node_config, tags, count): # Wait for all pods to be ready, and if it exceeds the timeout, raise an # exception. If pod's container is ContainerCreating, then we can assume # that resources have been allocated and we can exit. - start = time.time() while True: if time.time() - start > self.timeout: - raise config.KubernetesError( - 'Timed out while waiting for nodes to start. ' - 'Cluster may be out of resources or ' - 'may be too slow to autoscale.') - all_ready = True + try: + self._raise_pod_scheduling_errors(new_nodes) + except config.KubernetesError: + raise + except Exception as e: + raise config.KubernetesError( + 'An error occurred while trying to fetch the reason ' + 'for pod scheduling failure. ' + f'Error: {common_utils.format_exception(e)}') from None + all_ready = True for node in new_nodes: pod = kubernetes.core_api().read_namespaced_pod( node.metadata.name, self.namespace) @@ -252,20 +300,57 @@ def create_node(self, node_config, tags, count): break time.sleep(1) + # Wait for pod containers to be ready - they may be pulling images or + # may be in the process of container creation. + while True: + pods = [] + for node in new_nodes: + pod = kubernetes.core_api().read_namespaced_pod( + node.metadata.name, self.namespace) + pods.append(pod) + if all([pod.status.phase == "Running" for pod in pods]) \ + and all( + [container.state.running for pod in pods for container in + pod.status.container_statuses]): + break + time.sleep(1) + + # Once all containers are ready, we can exec into them and set env vars. + # Kubernetes automatically populates containers with critical + # environment variables, such as those for discovering services running + # in the cluster and CUDA/nvidia environment variables. We need to + # make sure these env vars are available in every task and ssh session. + # This is needed for GPU support and service discovery. + # See https://github.com/skypilot-org/skypilot/issues/2287 for + # more details. + # To do so, we capture env vars from the pod's runtime and write them to + # /etc/profile.d/, making them available for all users in future + # shell sessions. + set_k8s_env_var_cmd = [ + '/bin/sh', '-c', + ('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && ' + 'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || ' + 'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh') + ] + for new_node in new_nodes: + kubernetes.stream()( + kubernetes.core_api().connect_get_namespaced_pod_exec, + new_node.metadata.name, + self.namespace, + command=set_k8s_env_var_cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _request_timeout=kubernetes.API_TIMEOUT) + def terminate_node(self, node_id): logger.info(config.log_prefix + 'calling delete_namespaced_pod') try: - kubernetes.core_api().delete_namespaced_pod( - node_id, - self.namespace, - _request_timeout=config.DELETION_TIMEOUT) - except kubernetes.api_exception() as e: - if e.status == 404: - logger.warning(config.log_prefix + - f'Tried to delete pod {node_id},' - ' but the pod was not found (404).') - else: - raise + kubernetes_utils.clean_zombie_ssh_jump_pod(self.namespace, node_id) + except Exception as e: + logger.warning(config.log_prefix + + f'Error occurred when analyzing SSH Jump pod: {e}') try: kubernetes.core_api().delete_namespaced_service( node_id, @@ -277,6 +362,21 @@ def terminate_node(self, node_id): _request_timeout=config.DELETION_TIMEOUT) except kubernetes.api_exception(): pass + # Note - delete pod after all other resources are deleted. + # This is to ensure there are no leftover resources if this down is run + # from within the pod, e.g., for autodown. + try: + kubernetes.core_api().delete_namespaced_pod( + node_id, + self.namespace, + _request_timeout=config.DELETION_TIMEOUT) + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(config.log_prefix + + f'Tried to delete pod {node_id},' + ' but the pod was not found (404).') + else: + raise def terminate_nodes(self, node_ids): # TODO(romilb): terminate_nodes should be include optimizations for diff --git a/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 b/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 new file mode 100644 index 00000000000..fa71df3a0ec --- /dev/null +++ b/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +set -uo pipefail + +# Checks if socat is installed +if ! command -v socat > /dev/null; then + echo "Using 'port-forward' mode to run ssh session on Kubernetes instances requires 'socat' to be installed. Please install 'socat'" >&2 + exit +fi + +# Checks if lsof is installed +if ! command -v lsof > /dev/null; then + echo "Checking port availability for 'port-forward' mode requires 'lsof' to be installed. Please install 'lsof'" >&2 + exit 1 +fi + +# Function to check if port is in use +is_port_in_use() { + local port="$1" + lsof -i :${port} > /dev/null 2>&1 +} + +# Start from a fixed local port and increment if in use +local_port={{ local_port }} +while is_port_in_use "${local_port}"; do + local_port=$((local_port + 1)) +done + +# Establishes connection between local port and the ssh jump pod +kubectl port-forward svc/{{ ssh_jump_name }} "${local_port}":22 & + +# Terminate the port-forward process when this script exits. +K8S_PORT_FWD_PID=$! +trap "kill $K8S_PORT_FWD_PID" EXIT + +# checks if a connection to local_port of 127.0.0.1:[local_port] is established +while ! nc -z 127.0.0.1 "${local_port}"; do + sleep 0.1 +done + +# Establishes two directional byte streams to handle stdin/stdout between +# terminal and the jump pod. +# socat process terminates when port-forward terminates. +socat - tcp:127.0.0.1:"${local_port}" \ No newline at end of file diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index d8e14adcf4e..da8e4253290 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -16,11 +16,15 @@ provider: type: external module: sky.skylet.providers.kubernetes.KubernetesNodeProvider - # Use False if running from outside of k8s cluster - use_internal_ips: false + # We use internal IPs since we set up a port-forward between the kubernetes + # cluster and the local machine, or directly use NodePort to reach the + # head node. + use_internal_ips: true timeout: {{timeout}} + ssh_jump_image: {{k8s_ssh_jump_image}} + # ServiceAccount created by the autoscaler for the head node pod that it # runs in. If this field isn't provided, the head pod config below must # contain a user-created service account with the proper permissions. @@ -78,7 +82,6 @@ provider: skypilot-cluster: {{cluster_name_on_cloud}} name: {{cluster_name_on_cloud}}-ray-head-ssh spec: - type: NodePort selector: component: {{cluster_name_on_cloud}}-ray-head ports: @@ -126,6 +129,8 @@ available_node_types: parent: skypilot component: {{cluster_name_on_cloud}}-ray-head skypilot-cluster: {{cluster_name_on_cloud}} + # Identifies the SSH jump pod used by this pod. Used in life cycle management of the ssh jump pod. + skypilot-ssh-jump: {{k8s_ssh_jump_name}} spec: # Change this if you altered the autoscaler_service_account above # or want to provide your own. diff --git a/sky/templates/kubernetes-ssh-jump.yml.j2 b/sky/templates/kubernetes-ssh-jump.yml.j2 new file mode 100644 index 00000000000..a4c9929fe1e --- /dev/null +++ b/sky/templates/kubernetes-ssh-jump.yml.j2 @@ -0,0 +1,90 @@ +pod_spec: + apiVersion: v1 + kind: Pod + metadata: + name: {{ name }} + labels: + component: {{ name }} + parent: skypilot + spec: + serviceAccountName: sky-ssh-jump-sa + volumes: + - name: secret-volume + secret: + secretName: {{ secret }} + containers: + - name: {{ name }} + imagePullPolicy: Always + image: {{ image }} + command: ["python3", "-u", "/skypilot/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py"] + ports: + - containerPort: 22 + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: /etc/secret-volume + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] + env: + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: MY_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: ALERT_THRESHOLD + # seconds + value: "600" + - name: RETRY_INTERVAL + # seconds + value: "60" + terminationGracePeriodSeconds: 0 +service_spec: + apiVersion: v1 + kind: Service + metadata: + name: {{ name }} + labels: + parent: skypilot + spec: + type: {{ service_type }} + selector: + component: {{ name }} + ports: + - protocol: TCP + port: 22 + targetPort: 22 +# The following ServiceAccount/Role/RoleBinding sets up an RBAC for life cycle +# management of the jump pod/service +service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + name: sky-ssh-jump-sa + parent: skypilot +role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-ssh-jump-role + rules: + - apiGroups: [""] + resources: ["pods", "pods/status", "pods/exec", "services"] + verbs: ["get", "list", "create", "delete"] +role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: sky-ssh-jump-rb + parent: skypilot + subjects: + - kind: ServiceAccount + name: sky-ssh-jump-sa + roleRef: + kind: Role + name: sky-ssh-jump-role + apiGroup: rbac.authorization.k8s.io diff --git a/sky/utils/command_runner.py b/sky/utils/command_runner.py index 08fde49354d..c3dd73eb345 100644 --- a/sky/utils/command_runner.py +++ b/sky/utils/command_runner.py @@ -42,13 +42,16 @@ def _ssh_control_path(ssh_control_filename: Optional[str]) -> Optional[str]: return path -def ssh_options_list(ssh_private_key: Optional[str], - ssh_control_name: Optional[str], - *, - ssh_proxy_command: Optional[str] = None, - docker_ssh_proxy_command: Optional[str] = None, - timeout: int = 30, - port: int = 22) -> List[str]: +def ssh_options_list( + ssh_private_key: Optional[str], + ssh_control_name: Optional[str], + *, + ssh_proxy_command: Optional[str] = None, + docker_ssh_proxy_command: Optional[str] = None, + timeout: int = 30, + port: int = 22, + disable_control_master: Optional[bool] = False, +) -> List[str]: """Returns a list of sane options for 'ssh'.""" # Forked from Ray SSHOptions: # https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/command_runner.py @@ -79,7 +82,13 @@ def ssh_options_list(ssh_private_key: Optional[str], } # SSH Control will have a severe delay when using docker_ssh_proxy_command. # TODO(tian): Investigate why. - if ssh_control_name is not None and docker_ssh_proxy_command is None: + # We also do not use ControlMaster when we use `kubectl port-forward` + # to access Kubernetes pods over SSH+Proxycommand. This is because the + # process running ProxyCommand is kept running as long as the ssh session + # is running and the ControlMaster keeps the session, which results in + # 'ControlPersist' number of seconds delay per ssh commands ran. + if (ssh_control_name is not None and docker_ssh_proxy_command is None and + not disable_control_master): arg_dict.update({ # Control path: important optimization as we do multiple ssh in one # sky.launch(). @@ -136,6 +145,7 @@ def __init__( ssh_proxy_command: Optional[str] = None, port: int = 22, docker_user: Optional[str] = None, + disable_control_master: Optional[bool] = False, ): """Initialize SSHCommandRunner. @@ -158,13 +168,17 @@ def __init__( port: The port to use for ssh. docker_user: The docker user to use for ssh. If specified, the command will be run inside a docker container which have a ssh - server running at port sky.skylet.constants.DEFAULT_DOCKER_PORT. + server running at port sky.skylet.constants.DEFAULT_DOCKER_PORT + disable_control_master: bool; specifies either or not the ssh + command will utilize ControlMaster. We currently disable + it for k8s instance. """ self.ssh_private_key = ssh_private_key self.ssh_control_name = ( None if ssh_control_name is None else hashlib.md5( ssh_control_name.encode()).hexdigest()[:_HASH_MAX_LENGTH]) self._ssh_proxy_command = ssh_proxy_command + self.disable_control_master = disable_control_master if docker_user is not None: assert port is None or port == 22, ( f'port must be None or 22 for docker_user, got {port}.') @@ -190,6 +204,7 @@ def make_runner_list( ssh_private_key: str, ssh_control_name: Optional[str] = None, ssh_proxy_command: Optional[str] = None, + disable_control_master: Optional[bool] = False, port_list: Optional[List[int]] = None, docker_user: Optional[str] = None, ) -> List['SSHCommandRunner']: @@ -198,7 +213,8 @@ def make_runner_list( port_list = [22] * len(ip_list) return [ SSHCommandRunner(ip, ssh_user, ssh_private_key, ssh_control_name, - ssh_proxy_command, port, docker_user) + ssh_proxy_command, port, docker_user, + disable_control_master) for ip, port in zip(ip_list, port_list) ] @@ -228,7 +244,9 @@ def _ssh_base_command(self, *, ssh_mode: SshMode, ssh_proxy_command=self._ssh_proxy_command, docker_ssh_proxy_command=docker_ssh_proxy_command, port=self.port, - ) + [f'{self.ssh_user}@{self.ip}'] + disable_control_master=self.disable_control_master) + [ + f'{self.ssh_user}@{self.ip}' + ] def run( self, @@ -388,7 +406,7 @@ def rsync( ssh_proxy_command=self._ssh_proxy_command, docker_ssh_proxy_command=docker_ssh_proxy_command, port=self.port, - )) + disable_control_master=self.disable_control_master)) rsync_command.append(f'-e "ssh {ssh_options}"') # To support spaces in the path, we need to quote source and target. # rsync doesn't support '~' in a quoted local path, but it is ok to diff --git a/sky/utils/command_runner.pyi b/sky/utils/command_runner.pyi index e5feb5fb8db..893acb1e57b 100644 --- a/sky/utils/command_runner.pyi +++ b/sky/utils/command_runner.pyi @@ -20,10 +20,16 @@ RSYNC_FILTER_OPTION: str RSYNC_EXCLUDE_OPTION: str -def ssh_options_list(ssh_private_key: Optional[str], - ssh_control_name: Optional[str], - *, - timeout: int = ...) -> List[str]: +def ssh_options_list( + ssh_private_key: Optional[str], + ssh_control_name: Optional[str], + *, + ssh_proxy_command: Optional[str] = ..., + docker_ssh_proxy_command: Optional[str] = ..., + timeout: int = ..., + port: int = ..., + disable_control_master: Optional[bool] = ..., +) -> List[str]: ... @@ -40,14 +46,18 @@ class SSHCommandRunner: ssh_control_name: Optional[str] docker_user: str port: int + disable_control_master: Optional[bool] - def __init__(self, - ip: str, - ssh_user: str, - ssh_private_key: str, - ssh_control_name: Optional[str] = ..., - port: int = ..., - docker_user: Optional[str] = ...) -> None: + def __init__( + self, + ip: str, + ssh_user: str, + ssh_private_key: str, + ssh_control_name: Optional[str] = ..., + port: int = ..., + docker_user: Optional[str] = ..., + disable_control_master: Optional[bool] = ..., + ) -> None: ... @staticmethod @@ -59,6 +69,7 @@ class SSHCommandRunner: ssh_proxy_command: Optional[str] = ..., port_list: Optional[List[int]] = ..., docker_user: Optional[str] = ..., + disable_control_master: Optional[bool] = ..., ) -> List['SSHCommandRunner']: ... diff --git a/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py b/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py new file mode 100644 index 00000000000..05f6a8d7a42 --- /dev/null +++ b/sky/utils/kubernetes/ssh_jump_lifecycle_manager.py @@ -0,0 +1,111 @@ +"""Manages lifecycle of ssh jump pod. + +This script runs inside ssh jump pod as the main process (PID 1). + +It terminates itself (by removing ssh jump service and pod via a call to +kubeapi), if it does not see ray pods in the duration of 10 minutes. If the +user re-launches a task before the duration is over, then ssh jump pod is being +reused and will terminate itself when it sees that no ray cluster exist in that +duration. +""" +import datetime +import os +import sys +import time + +from kubernetes import client +from kubernetes import config + +# Load kube config +config.load_incluster_config() + +v1 = client.CoreV1Api() + +current_name = os.getenv('MY_POD_NAME') +current_namespace = os.getenv('MY_POD_NAMESPACE') + +# The amount of time in seconds where no Ray pods exist in which after that time +# ssh jump pod terminates itself +alert_threshold = int(os.getenv('ALERT_THRESHOLD', '600')) +# The amount of time in seconds to wait between Ray pods existence checks +retry_interval = int(os.getenv('RETRY_INTERVAL', '60')) + +# Ray pods are labeled with this value i.e., ssh jump name which is unique per +# user (based on user hash) +label_selector = f'skypilot-ssh-jump={current_name}' + + +def poll(): + sys.stdout.write('Starting polling.\n') + + alert_delta = datetime.timedelta(seconds=alert_threshold) + + # Set delay for each retry + retry_interval_delta = datetime.timedelta(seconds=retry_interval) + + # Accumulated time of where no SkyPilot cluster exists. Used to compare + # against alert_threshold + nocluster_delta = datetime.timedelta() + + while True: + sys.stdout.write(f'Sleeping {retry_interval} seconds..\n') + time.sleep(retry_interval) + + # List the pods in the current namespace + try: + ret = v1.list_namespaced_pod(current_namespace, + label_selector=label_selector) + except Exception as e: + sys.stdout.write(f'Error: listing pods failed with error: {e}\n') + raise + + if len(ret.items) == 0: + sys.stdout.write(f'Did not find pods with label "{label_selector}" ' + f'in namespace {current_namespace}\n') + nocluster_delta = nocluster_delta + retry_interval_delta + sys.stdout.write( + f'Time since no pods found: {nocluster_delta}, alert ' + f'threshold: {alert_delta}\n') + else: + sys.stdout.write( + f'Found pods with label "{label_selector}" in namespace ' + f'{current_namespace}\n') + # reset .. + nocluster_delta = datetime.timedelta() + sys.stdout.write(f'noray_delta is reset: {nocluster_delta}\n') + + if nocluster_delta >= alert_delta: + sys.stdout.write( + f'nocluster_delta: {nocluster_delta} crossed alert threshold: ' + f'{alert_delta}. Time to terminate myself and my service.\n') + try: + # ssh jump resources created under same name + v1.delete_namespaced_service(current_name, current_namespace) + v1.delete_namespaced_pod(current_name, current_namespace) + except Exception as e: + sys.stdout.write('[ERROR] Deletion failed. Exiting ' + f'poll() with error: {e}\n') + raise + + break + + sys.stdout.write('Done polling.\n') + + +def main(): + sys.stdout.write('SkyPilot SSH Jump Pod Lifecycle Manager\n') + sys.stdout.write(f'current_name: {current_name}\n') + sys.stdout.write(f'current_namespace: {current_namespace}\n') + sys.stdout.write(f'alert_threshold time: {alert_threshold}\n') + sys.stdout.write(f'retry_interval time: {retry_interval}\n') + sys.stdout.write(f'label_selector: {label_selector}\n') + + if not current_name or not current_namespace: + # Raise Exception with message to terminate pod + raise Exception('Missing environment variables MY_POD_NAME or ' + 'MY_POD_NAMESPACE') + poll() + + +if __name__ == '__main__': + main() diff --git a/sky/utils/kubernetes_utils.py b/sky/utils/kubernetes_utils.py index b0d175b7e4f..cadd395bcd0 100644 --- a/sky/utils/kubernetes_utils.py +++ b/sky/utils/kubernetes_utils.py @@ -1,15 +1,26 @@ """Kubernetes utilities for SkyPilot.""" +import enum import math +import os import re -from typing import Any, List, Optional, Set, Tuple, Union +import subprocess +from typing import Any, Dict, List, Optional, Set, Tuple, Union +from urllib.parse import urlparse +import jinja2 +import yaml + +import sky from sky import exceptions +from sky import sky_logging from sky.adaptors import kubernetes +from sky.backends import backend_utils from sky.utils import common_utils from sky.utils import env_options from sky.utils import ux_utils DEFAULT_NAMESPACE = 'default' +LOCAL_PORT_FOR_PORT_FORWARD = 23100 MEMORY_SIZE_UNITS = { 'B': 1, @@ -20,6 +31,35 @@ 'P': 2**50, } +logger = sky_logging.init_logger(__name__) + + +class KubernetesNetworkingMode(enum.Enum): + """Enum for the different types of networking modes for accessing + jump pods. + """ + NODEPORT = 'nodeport' + PORTFORWARD = 'portforward' + + @classmethod + def from_str(cls, mode: str) -> 'KubernetesNetworkingMode': + """Returns the enum value for the given string.""" + if mode.lower() == cls.NODEPORT.value: + return cls.NODEPORT + elif mode.lower() == cls.PORTFORWARD.value: + return cls.PORTFORWARD + else: + raise ValueError(f'Unsupported kubernetes networking mode: ' + f'{mode}. The mode must be either ' + f'\'{cls.PORTFORWARD.value}\' or ' + f'\'{cls.NODEPORT.value}\'. ') + + +class KubernetesServiceType(enum.Enum): + """Enum for the different types of services.""" + NODEPORT = 'NodePort' + CLUSTERIP = 'ClusterIP' + class GPULabelFormatter: """Base class to define a GPU label formatter for a Kubernetes cluster @@ -346,6 +386,22 @@ def get_port(svc_name: str, namespace: str) -> int: return head_service.spec.ports[0].node_port +def get_external_ip(network_mode: Optional[KubernetesNetworkingMode]): + if network_mode == KubernetesNetworkingMode.PORTFORWARD: + return '127.0.0.1' + # Return the IP address of the first node with an external IP + nodes = kubernetes.core_api().list_node().items + for node in nodes: + if node.status.addresses: + for address in node.status.addresses: + if address.type == 'ExternalIP': + return address.address + # If no external IP is found, use the API server IP + api_host = kubernetes.core_api().api_client.configuration.host + parsed_url = urlparse(api_host) + return parsed_url.hostname + + def check_credentials(timeout: int = kubernetes.API_TIMEOUT) -> \ Tuple[bool, Optional[str]]: """Check if the credentials in kubeconfig file are valid @@ -578,3 +634,326 @@ def from_resources(cls, def __str__(self): return self.name + + +def construct_ssh_jump_command(private_key_path: str, + ssh_jump_port: int, + ssh_jump_ip: str, + proxy_cmd_path: Optional[str] = None) -> str: + ssh_jump_proxy_command = (f'ssh -tt -i {private_key_path} ' + '-o StrictHostKeyChecking=no ' + '-o UserKnownHostsFile=/dev/null ' + f'-o IdentitiesOnly=yes -p {ssh_jump_port} ' + f'-W %h:%p sky@{ssh_jump_ip}') + if proxy_cmd_path is not None: + proxy_cmd_path = os.path.expanduser(proxy_cmd_path) + # adding execution permission to the proxy command script + os.chmod(proxy_cmd_path, os.stat(proxy_cmd_path).st_mode | 0o111) + ssh_jump_proxy_command += f' -o ProxyCommand=\'{proxy_cmd_path}\' ' + return ssh_jump_proxy_command + + +def get_ssh_proxy_command(private_key_path: str, ssh_jump_name: str, + network_mode: KubernetesNetworkingMode, + namespace: str, port_fwd_proxy_cmd_path: str, + port_fwd_proxy_cmd_template: str) -> str: + """Generates the SSH proxy command to connect through the SSH jump pod. + + By default, establishing an SSH connection creates a communication + channel to a remote node by setting up a TCP connection. When a + ProxyCommand is specified, this default behavior is overridden. The command + specified in ProxyCommand is executed, and its standard input and output + become the communication channel for the SSH session. + + Pods within a Kubernetes cluster have internal IP addresses that are + typically not accessible from outside the cluster. Since the default TCP + connection of SSH won't allow access to these pods, we employ a + ProxyCommand to establish the required communication channel. We offer this + in two different networking options: NodePort/port-forward. + + With the NodePort networking mode, a NodePort service is launched. This + service opens an external port on the node which redirects to the desired + port within the pod. When establishing an SSH session in this mode, the + ProxyCommand makes use of this external port to create a communication + channel directly to port 22, which is the default port ssh server listens + on, of the jump pod. + + With Port-forward mode, instead of directly exposing an external port, + 'kubectl port-forward' sets up a tunnel between a local port + (127.0.0.1:23100) and port 22 of the jump pod. Then we establish a TCP + connection to the local end of this tunnel, 127.0.0.1:23100, using 'socat'. + This is setup in the inner ProxyCommand of the nested ProxyCommand, and the + rest is the same as NodePort approach, which the outer ProxyCommand + establishes a communication channel between 127.0.0.1:23100 and port 22 on + the jump pod. Consequently, any stdin provided on the local machine is + forwarded through this tunnel to the application (SSH server) listening in + the pod. Similarly, any output from the application in the pod is tunneled + back and displayed in the terminal on the local machine. + + Args: + private_key_path: str; Path to the private key to use for SSH. + This key must be authorized to access the SSH jump pod. + ssh_jump_name: str; Name of the SSH jump service to use + network_mode: KubernetesNetworkingMode; networking mode for ssh + session. It is either 'NODEPORT' or 'PORTFORWARD' + namespace: Kubernetes namespace to use + port_fwd_proxy_cmd_path: str; path to the script used as Proxycommand + with 'kubectl port-forward' + port_fwd_proxy_cmd_template: str; template used to create + 'kubectl port-forward' Proxycommand + """ + # Fetch IP to connect to for the jump svc + ssh_jump_ip = get_external_ip(network_mode) + if network_mode == KubernetesNetworkingMode.NODEPORT: + ssh_jump_port = get_port(ssh_jump_name, namespace) + ssh_jump_proxy_command = construct_ssh_jump_command( + private_key_path, ssh_jump_port, ssh_jump_ip) + # Setting kubectl port-forward/socat to establish ssh session using + # ClusterIP service to disallow any ports opened + else: + ssh_jump_port = LOCAL_PORT_FOR_PORT_FORWARD + vars_to_fill = { + 'ssh_jump_name': ssh_jump_name, + 'local_port': ssh_jump_port, + } + backend_utils.fill_template(port_fwd_proxy_cmd_template, + vars_to_fill, + output_path=port_fwd_proxy_cmd_path) + ssh_jump_proxy_command = construct_ssh_jump_command( + private_key_path, ssh_jump_port, ssh_jump_ip, + port_fwd_proxy_cmd_path) + return ssh_jump_proxy_command + + +def setup_ssh_jump_svc(ssh_jump_name: str, namespace: str, + service_type: KubernetesServiceType): + """Sets up Kubernetes service resource to access for SSH jump pod. + + This method acts as a necessary complement to be run along with + setup_ssh_jump_pod(...) method. This service ensures the pod is accessible. + + Args: + ssh_jump_name: Name to use for the SSH jump service + namespace: Namespace to create the SSH jump service in + service_type: Networking configuration on either to use NodePort + or ClusterIP service to ssh in + """ + # Fill in template - ssh_key_secret and ssh_jump_image are not required for + # the service spec, so we pass in empty strs. + content = fill_ssh_jump_template('', '', ssh_jump_name, service_type.value) + # Create service + try: + kubernetes.core_api().create_namespaced_service(namespace, + content['service_spec']) + except kubernetes.api_exception() as e: + # SSH Jump Pod service already exists. + if e.status == 409: + ssh_jump_service = kubernetes.core_api().read_namespaced_service( + name=ssh_jump_name, namespace=namespace) + curr_svc_type = ssh_jump_service.spec.type + if service_type.value == curr_svc_type: + # If the currently existing SSH Jump service's type is identical + # to user's configuration for networking mode + logger.debug( + f'SSH Jump Service {ssh_jump_name} already exists in the ' + 'cluster, using it.') + else: + # If a different type of service type for SSH Jump pod compared + # to user's configuration for networking mode exists, we remove + # existing servie to create a new one following user's config + kubernetes.core_api().delete_namespaced_service( + name=ssh_jump_name, namespace=namespace) + kubernetes.core_api().create_namespaced_service( + namespace, content['service_spec']) + port_forward_mode = KubernetesNetworkingMode.PORTFORWARD.value + nodeport_mode = KubernetesNetworkingMode.NODEPORT.value + clusterip_svc = KubernetesServiceType.CLUSTERIP.value + nodeport_svc = KubernetesServiceType.NODEPORT.value + curr_network_mode = port_forward_mode \ + if curr_svc_type == clusterip_svc else nodeport_mode + new_network_mode = nodeport_mode \ + if curr_svc_type == clusterip_svc else port_forward_mode + new_svc_type = nodeport_svc \ + if curr_svc_type == clusterip_svc else clusterip_svc + logger.info( + f'Switching the networking mode from ' + f'\'{curr_network_mode}\' to \'{new_network_mode}\' ' + f'following networking configuration. Deleting existing ' + f'\'{curr_svc_type}\' service and recreating as ' + f'\'{new_svc_type}\' service.') + else: + raise + else: + logger.info(f'Created SSH Jump Service {ssh_jump_name}.') + + +def setup_ssh_jump_pod(ssh_jump_name: str, ssh_jump_image: str, + ssh_key_secret: str, namespace: str): + """Sets up Kubernetes RBAC and pod for SSH jump host. + + Our Kubernetes implementation uses a SSH jump pod to reach SkyPilot clusters + running inside a cluster. This function sets up the resources needed for + the SSH jump pod. This includes a service account which grants the jump pod + permission to watch for other SkyPilot pods and terminate itself if there + are no SkyPilot pods running. + + setup_ssh_jump_service must also be run to ensure that the SSH jump pod is + reachable. + + Args: + ssh_jump_image: Container image to use for the SSH jump pod + ssh_jump_name: Name to use for the SSH jump pod + ssh_key_secret: Secret name for the SSH key stored in the cluster + namespace: Namespace to create the SSH jump pod in + """ + # Fill in template - service is created separately so service_type is not + # required, so we pass in empty str. + content = fill_ssh_jump_template(ssh_key_secret, ssh_jump_image, + ssh_jump_name, '') + # ServiceAccount + try: + kubernetes.core_api().create_namespaced_service_account( + namespace, content['service_account']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + 'SSH Jump ServiceAccount already exists in the cluster, using ' + 'it.') + else: + raise + else: + logger.info('Created SSH Jump ServiceAccount.') + # Role + try: + kubernetes.auth_api().create_namespaced_role(namespace, content['role']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + 'SSH Jump Role already exists in the cluster, using it.') + else: + raise + else: + logger.info('Created SSH Jump Role.') + # RoleBinding + try: + kubernetes.auth_api().create_namespaced_role_binding( + namespace, content['role_binding']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + 'SSH Jump RoleBinding already exists in the cluster, using ' + 'it.') + else: + raise + else: + logger.info('Created SSH Jump RoleBinding.') + # Pod + try: + kubernetes.core_api().create_namespaced_pod(namespace, + content['pod_spec']) + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info( + f'SSH Jump Host {ssh_jump_name} already exists in the cluster, ' + 'using it.') + else: + raise + else: + logger.info(f'Created SSH Jump Host {ssh_jump_name}.') + + +def clean_zombie_ssh_jump_pod(namespace: str, node_id: str): + """Analyzes SSH jump pod and removes if it is in a bad state + + Prevents the existence of a dangling SSH jump pod. This could happen + in case the pod main container did not start properly (or failed). In that + case, jump pod lifecycle manager will not function properly to + remove the pod and service automatically, and must be done manually. + + Args: + namespace: Namespace to remove the SSH jump pod and service from + node_id: Name of head pod + """ + + def find(l, predicate): + """Utility function to find element in given list""" + results = [x for x in l if predicate(x)] + return results[0] if len(results) > 0 else None + + # Get the SSH jump pod name from the head pod + try: + pod = kubernetes.core_api().read_namespaced_pod(node_id, namespace) + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(f'Failed to get pod {node_id},' + ' but the pod was not found (404).') + raise + else: + ssh_jump_name = pod.metadata.labels.get('skypilot-ssh-jump') + try: + ssh_jump_pod = kubernetes.core_api().read_namespaced_pod( + ssh_jump_name, namespace) + cont_ready_cond = find(ssh_jump_pod.status.conditions, + lambda c: c.type == 'ContainersReady') + if cont_ready_cond and \ + cont_ready_cond.status == 'False': + # The main container is not ready. To be on the safe side + # and prevent a dangling ssh jump pod, lets remove it and + # the service. Otherwise main container is ready and its lifecycle + # management script takes care of the cleaning. + kubernetes.core_api().delete_namespaced_pod(ssh_jump_name, + namespace) + kubernetes.core_api().delete_namespaced_service( + ssh_jump_name, namespace) + # only warn and proceed as usual + except kubernetes.api_exception() as e: + logger.warning( + f'Tried to check ssh jump pod {ssh_jump_name},' + f' but got error {e}\n. Consider running `kubectl ' + f'delete pod {ssh_jump_name} -n {namespace}` to manually ' + 'remove the pod if it has crashed.') + # We encountered an issue while checking ssh jump pod. To be on + # the safe side, lets remove its service so the port is freed + try: + kubernetes.core_api().delete_namespaced_service( + ssh_jump_name, namespace) + except kubernetes.api_exception(): + pass + + +def fill_ssh_jump_template(ssh_key_secret: str, ssh_jump_image: str, + ssh_jump_name: str, service_type: str) -> Dict: + template_path = os.path.join(sky.__root_dir__, 'templates', + 'kubernetes-ssh-jump.yml.j2') + if not os.path.exists(template_path): + raise FileNotFoundError( + 'Template "kubernetes-ssh-jump.j2" does not exist.') + with open(template_path) as fin: + template = fin.read() + j2_template = jinja2.Template(template) + cont = j2_template.render(name=ssh_jump_name, + image=ssh_jump_image, + secret=ssh_key_secret, + service_type=service_type) + content = yaml.safe_load(cont) + return content + + +def check_port_forward_mode_dependencies() -> None: + """Checks if 'socat' and 'lsof' is installed""" + for name, option in [('socat', '-V'), ('lsof', '-v')]: + try: + subprocess.run([name, option], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True) + except FileNotFoundError: + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'`{name}` is required to setup Kubernetes cloud with ' + f'`{KubernetesNetworkingMode.PORTFORWARD.value}` default ' + 'networking mode and it is not installed. ' + 'On Debian/Ubuntu, install it with:\n' + f' $ sudo apt install {name}\n' + f'On MacOS, install it with: \n' + f' $ brew install {name}') from None diff --git a/tests/kubernetes/README.md b/tests/kubernetes/README.md index f94c9dc50fd..17b462bde5d 100644 --- a/tests/kubernetes/README.md +++ b/tests/kubernetes/README.md @@ -34,15 +34,14 @@ sky local up CLUSTER_NAME=testclusterromil gcloud beta container --project "${PROJECT_ID}" clusters create "${CLUSTER_NAME}" --zone "us-central1-c" --no-enable-basic-auth --cluster-version "1.27.3-gke.100" --release-channel "regular" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-t4,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "1" --logging=SYSTEM,WORKLOAD --monitoring=SYSTEM --enable-ip-alias --network "projects/${PROJECT_ID}/global/networks/default" --subnetwork "projects/${PROJECT_ID}/regions/us-central1/subnetworks/default" --no-enable-intra-node-visibility --default-max-pods-per-node "110" --security-posture=standard --workload-vulnerability-scanning=disabled --no-enable-master-authorized-networks --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --enable-managed-prometheus --enable-shielded-nodes --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "v100" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-v100,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "1" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "largecpu" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "n1-standard-16" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "1" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" ``` -2. Make sure ports 30000-32767 are open in your node pool VPC's firewall. -3. Get the kubeconfig for your cluster and place it in `~/.kube/config`: +2. Get the kubeconfig for your cluster and place it in `~/.kube/config`: ```bash gcloud container clusters get-credentials --region # Example: # gcloud container clusters get-credentials testcluster --region us-central1-c ``` -4. Verify by running `kubectl get nodes`. You should see your nodes. -5. **If you want GPU support**, make sure you install GPU drivers by running: +3. Verify by running `kubectl get nodes`. You should see your nodes. +4. **If you want GPU support**, make sure you install GPU drivers by running: ```bash # If using COS based nodes (e.g., in the example above): kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded.yaml @@ -54,17 +53,18 @@ sky local up ```bash kubectl describe nodes ``` -6. Run `sky check`. +5. Run `sky check`. ```bash sky check ``` -7. You can run SkyPilot tasks now. After you're done, delete the cluster by running: +6. You can run SkyPilot tasks now. After you're done, delete the cluster by running: ```bash gcloud container clusters delete --region # Example: # gcloud container clusters delete testcluster --region us-central1-c ``` +NOTE - If are using nodeport networking, make sure port 32100 is open in your node pool VPC's firewall. ## Running a EKS cluster 1. Create a EKS cluster with at least 1 node. We recommend creating nodes with at least 4 vCPUs. @@ -72,9 +72,8 @@ sky local up ```bash eksctl create -f tests/kubernetes/eks_test_cluster.yaml ``` -2. Make sure ports 30000-32767 are open in your EKS cluster's default security group. -3. Verify by running `kubectl get nodes`. You should see your nodes. -4. **If you want GPU support**, EKS clusters already come with GPU drivers setup. However, you'll need to label the nodes with the GPU type. Use the SkyPilot node labelling tool to do so: +2. Verify by running `kubectl get nodes`. You should see your nodes. +3. **If you want GPU support**, EKS clusters already come with GPU drivers setup. However, you'll need to label the nodes with the GPU type. Use the SkyPilot node labelling tool to do so: ```bash python -m sky.utils.kubernetes.gpu_labeler ``` @@ -94,11 +93,13 @@ sky local up ```bash sky check ``` -6. After you are done, delete the cluster by running: +5. After you are done, delete the cluster by running: ```bash eksctl delete cluster -f tests/kubernetes/eks_test_cluster.yaml ``` +NOTE - If are using nodeport networking, make sure port 32100 is open in your EKS cluster's default security group. + ## Other useful scripts `scripts` directory contains other useful scripts for development, including Kubernetes dashboard, ray yaml for testing the SkyPilot Kubernetes node provider diff --git a/tests/kubernetes/build_image.sh b/tests/kubernetes/build_image.sh index 40dbad5be3f..675a15b12bc 100755 --- a/tests/kubernetes/build_image.sh +++ b/tests/kubernetes/build_image.sh @@ -10,6 +10,9 @@ TAG=us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot +push=false +gpu=false + # Parse command line arguments while getopts ":pg" opt; do case ${opt} in @@ -29,34 +32,37 @@ while getopts ":pg" opt; do done # Add -gpu to the tag if the GPU image is being built -if [[ $gpu ]]; then +if [[ $gpu == "true" ]]; then TAG=$TAG-gpu:latest else TAG=$TAG:latest fi +# Shift off the options +shift $((OPTIND-1)) + + # Navigate to the root of the project (inferred from git) cd "$(git rev-parse --show-toplevel)" # If push is used, build the image for both amd64 and arm64 -if [[ $push ]]; then +if [[ $push == "true" ]]; then # If gpu is used, build the GPU image - if [[ $gpu ]]; then - echo "Building and pushing GPU image for amd64" + if [[ $gpu == "true" ]]; then + echo "Building and pushing GPU image for amd64: $TAG" docker buildx build --push --platform linux/amd64 -t $TAG -f Dockerfile_k8s_gpu ./sky - fi - # Else, build the CPU image else - echo "Building and pushing CPU image for amd64 and arm64" + echo "Building and pushing CPU image for amd64 and arm64: $TAG" docker buildx build --push --platform linux/arm64,linux/amd64 -t $TAG -f Dockerfile_k8s ./sky + fi fi # Load the right image depending on the architecture of the host machine (Apple Silicon or Intel) if [[ $(uname -m) == "arm64" ]]; then - echo "Loading image for arm64 (Apple Silicon etc.)" + echo "Loading image for arm64 (Apple Silicon etc.): $TAG" docker buildx build --load --platform linux/arm64 -t $TAG -f Dockerfile_k8s ./sky elif [[ $(uname -m) == "x86_64" ]]; then - echo "Building for amd64 (Intel CPUs)" + echo "Building for amd64 (Intel CPUs): $TAG" docker buildx build --load --platform linux/amd64 -t $TAG -f Dockerfile_k8s ./sky else echo "Unsupported architecture: $(uname -m)" diff --git a/tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md b/tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md new file mode 100644 index 00000000000..050444ac94d --- /dev/null +++ b/tests/kubernetes/networking_benchmarks/k8s_network_benchmarks.md @@ -0,0 +1,55 @@ +# Kubernetes Networking Benchmarking + +A SkyPilot pod in Kubernetes can be accessed via three methods: +1. `direct`: NodePort service directly exposing the pod's SSH port +2. `sshjump`: NodePort service exposing a SSH jump pod that connects to the SkyPilot pod +3. `port-forward`: Uses `kubectl port-forward` to connect to ClusterIP service pointing to a SSH jump pod that connects to the SkyPilot pod + +`direct` requires opening a large range of ports on the cluster's firewall. +`sshjump` requires opening only one port on the cluster's firewall, but requires an additional SSH connection to the jump pod. +`port-forward` does not require opening any ports on the cluster's firewall, but routes all traffic over the kubernetes control plane. + +This document benchmarks the three approaches on a Kind cluster and a GKE cluster. + +We run two kinds of benchmarks: +1. `sky launch` benchmarks: how long does it take to launch a SkyPilot pod +2. Rsync benchmarks: how long does it take to copy a directory containing 1000 1MB files to the SkyPilot pod + +In summary, we find that `direct` is only marginally faster (~10%) than `sshjump` and `port-forward` for both `sky launch` and rsync benchmarks. + +Given these results, this document recommends using `port-forward` for all SkyPilot deployments because of its significant ease of use and security benefits. + +## Benchmark environment +These benchmarks were run on a 2023 M2 Max Macbook Pro with 32GB of RAM. Each benchmark was run on a GKE cluster and a local kind cluster (`sky local up`). Kubernetes v1.27 was used. This is on a 100mbit home connection. + +Note that GKE benchmarks, particularly rsync, are sensitive the network connection between the benchmarking machine and the GKE cluster. + +# `sky launch` benchmarks + +Runs 5 sky launch times and reports the average of the last four runs. + +Usage: +``` +./skylaunch_bench.sh +# e.g., `./skylaunch_bench.sh gkedirect` will create a file called skylaunch_results_gkedirect.txt +``` + +| | Direct | SSHJump | port-forward | +|-------------|---------|---------|--------------| +| **GKE** | 64.51s | 62.51s | 69.75s | +| **Kind** | 26.65s | 28.37s | 28.75s | + +## Rsync benchmarks + +Creates a directory with 1000 1MB files and copies it to the SkyPilot pod. Runs 5 rsync times and reports the average of the last four runs. + +Usage: +``` +./rsync_bench.sh +# e.g., `./rsync_bench.sh gkedirect` will create a file called rsync_results_gkedirect.txt +``` + +| | Direct | SSHJump | port-forward | +|-------------|---------|---------|--------------| +| **GKE** | 337.49s | 347.49s | 361.49s | +| **Kind** | 31.49s | 31.71s | 33.21s | diff --git a/tests/kubernetes/networking_benchmarks/rsync_bench.sh b/tests/kubernetes/networking_benchmarks/rsync_bench.sh new file mode 100644 index 00000000000..de0b4bf0845 --- /dev/null +++ b/tests/kubernetes/networking_benchmarks/rsync_bench.sh @@ -0,0 +1,76 @@ +#!/bin/bash + +average=0 +# Check if the command line argument (suffix) is provided +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi + +suffix="$1" + +# Declare a file to store the output of the time command with the given suffix +output_file="rsync_results_${suffix}.txt" + +runexpt=1 + +# Check if the output file exists and ask if it should be overwritten +if [ -f "$output_file" ]; then + read -p "The output file $output_file already exists. Do you want to overwrite it? (y/n) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Analyzing existing results..." + runexpt=0 + fi +fi + +if [ "$runexpt" -eq 1 ]; then + # Delete existing results + rm -f "$output_file" + + # Setup 1000 1MB (total 1GB) files to copy + mkdir -p $HOME/tmp/uploadtest + for i in {1..1000}; do dd if=/dev/urandom of=$HOME/tmp/uploadtest/$i bs=1M count=1; done + + # Create the cluster + sky launch -y -c test + + for i in {1..5}; do + ( + # Use the `time` command in a subshell to capture its output + time rsync -avz ~/tmp/uploadtest/ test:~/sky_workdir + ) 2>> "$output_file" + ssh test 'rm -rf ~/sky_workdir/' + done + + # Delete cluster after done + sky down -y test +fi + +# Process the results from the 2nd to 5th run +count=0 +while read -r line; do + # Check for the real time output from the time command + if [[ $line == real* ]]; then + if [ "$count" -eq 0 ]; then + # Skip first result + count=$((count+1)) + continue + fi + count=$((count+1)) + # Extract the minutes and seconds and convert to seconds + minutes=$(echo $line | cut -d'm' -f1 | sed 's/real //') + seconds=$(echo $line | cut -d'm' -f2 | cut -d's' -f1) + total_seconds=$(echo "$minutes*60 + $seconds" | bc) + # Accumulate the total time + average=$(echo "$average + $total_seconds" | bc) + fi +done < <(cat "$output_file") # start reading from the 2nd run + +# Subtract one from the count to account for the skipped first result +count=$((count-1)) +# Compute the average time +average=$(echo "scale=2; $average/$count" | bc) + +# Print the average time +echo "Average total time (from 2nd to 5th run): $average seconds" diff --git a/tests/kubernetes/networking_benchmarks/skylaunch_bench.sh b/tests/kubernetes/networking_benchmarks/skylaunch_bench.sh new file mode 100644 index 00000000000..ee67bc7a233 --- /dev/null +++ b/tests/kubernetes/networking_benchmarks/skylaunch_bench.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +average=0 +# Check if the command line argument (suffix) is provided +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi + +suffix="$1" + +# Declare a file to store the output of the time command with the given suffix +output_file="skylaunch_results_${suffix}.txt" + +runexpt=1 + +# Check if the output file exists and ask if it should be overwritten +if [ -f "$output_file" ]; then + read -p "The output file $output_file already exists. Do you want to overwrite it? (y/n) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Analyzing existing results..." + runexpt=0 + fi +fi + +if [ "$runexpt" -eq 1 ]; then + # Delete existing results + rm -f "$output_file" + for i in {1..5}; do + ( + # Use the `time` command in a subshell to capture its output + time sky launch -y -c test + ) 2>> "$output_file" + sky down -y test + done +fi + +# Process the results from the 2nd to 5th run +count=0 +while read -r line; do + # Check for the real time output from the time command + if [[ $line == real* ]]; then + if [ "$count" -eq 0 ]; then + # Skip first result + count=$((count+1)) + continue + fi + count=$((count+1)) + # Extract the minutes and seconds and convert to seconds + minutes=$(echo $line | cut -d'm' -f1 | sed 's/real //') + seconds=$(echo $line | cut -d'm' -f2 | cut -d's' -f1) + total_seconds=$(echo "$minutes*60 + $seconds" | bc) + # Accumulate the total time + average=$(echo "$average + $total_seconds" | bc) + fi +done < <(cat "$output_file") # start reading from the 2nd run + +# Subtract one from the count to account for the skipped first result +count=$((count-1)) +# Compute the average time +average=$(echo "scale=2; $average/$count" | bc) + +# Print the average time +echo "Average total time (from 2nd to 5th run): $average seconds" diff --git a/tests/test_config.py b/tests/test_config.py index dffd4843ffd..72d16ac362a 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,9 +6,12 @@ from sky import skypilot_config from sky.utils import common_utils +from sky.utils import kubernetes_utils VPC_NAME = 'vpc-12345678' PROXY_COMMAND = 'ssh -W %h:%p -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no' +NODEPORT_MODE_NAME = kubernetes_utils.KubernetesNetworkingMode.NODEPORT.value +PORT_FORWARD_MODE_NAME = kubernetes_utils.KubernetesNetworkingMode.PORTFORWARD.value def _reload_config() -> None: @@ -34,6 +37,8 @@ def _create_config_file(config_file_path: pathlib.Path) -> None: vpc_name: {VPC_NAME} use_internal_ips: true ssh_proxy_command: {PROXY_COMMAND} + kubernetes: + networking: {NODEPORT_MODE_NAME} """)) @@ -67,14 +72,18 @@ def test_config_get_set_nested(monkeypatch, tmp_path) -> None: assert skypilot_config.get_nested(('aws', 'use_internal_ips'), None) assert skypilot_config.get_nested(('aws', 'ssh_proxy_command'), None) == PROXY_COMMAND - + assert skypilot_config.get_nested(('kubernetes', 'networking'), + None) == NODEPORT_MODE_NAME # Check set_nested() will copy the config dict and return a new dict new_config = skypilot_config.set_nested(('aws', 'ssh_proxy_command'), 'new_value') assert new_config['aws']['ssh_proxy_command'] == 'new_value' assert skypilot_config.get_nested(('aws', 'ssh_proxy_command'), None) == PROXY_COMMAND - + new_config = skypilot_config.set_nested(('kubernetes', 'networking'), + PORT_FORWARD_MODE_NAME) + assert skypilot_config.get_nested(('kubernetes', 'networking'), + None) == NODEPORT_MODE_NAME # Check that dumping the config to a file with the new None can be reloaded new_config2 = skypilot_config.set_nested(('aws', 'ssh_proxy_command'), None) new_config_path = tmp_path / 'new_config.yaml'