diff --git a/sky/skylet/providers/kubernetes/node_provider.py b/sky/skylet/providers/kubernetes/node_provider.py index 81230ff4ef8..e2174100041 100644 --- a/sky/skylet/providers/kubernetes/node_provider.py +++ b/sky/skylet/providers/kubernetes/node_provider.py @@ -163,14 +163,20 @@ def _set_node_tags(self, node_id, tags): kubernetes.core_api().patch_namespaced_pod(node_id, self.namespace, pod) def _raise_pod_scheduling_errors(self, new_nodes): + """Raise pod scheduling failure reason. + + When a pod fails to schedule in Kubernetes, the reasons for the failure + are recorded as events. This function retrieves those events and raises + descriptive errors for better debugging and user feedback. + """ for new_node in new_nodes: pod = kubernetes.core_api().read_namespaced_pod( new_node.metadata.name, self.namespace) pod_status = pod.status.phase # When there are multiple pods involved while launching instance, # there may be a single pod causing issue while others are - # scheduled. In this case, we make sure to not surface the error - # message from the pod that is already scheduled. + # successfully scheduled. In this case, we make sure to not surface + # the error message from the pod that is already scheduled. if pod_status != 'Pending': continue pod_name = pod._metadata._name @@ -185,6 +191,8 @@ def _raise_pod_scheduling_errors(self, new_nodes): events.items, key=lambda e: e.metadata.creation_timestamp, reverse=True) + + event_message = None for event in events_desc_by_time: if event.reason == 'FailedScheduling': event_message = event.message @@ -214,8 +222,10 @@ def _raise_pod_scheduling_errors(self, new_nodes): # TODO(romilb): We may have additional node # affinity selectors in the future - in that # case we will need to update this logic. - if 'Insufficient nvidia.com/gpu' in event_message or \ - 'didn\'t match Pod\'s node affinity/selector' in event_message: + if ('Insufficient nvidia.com/gpu' + in event_message or + 'didn\'t match Pod\'s node affinity/selector' + in event_message): raise config.KubernetesError( f'{lack_resource_msg.format(resource="GPU")} ' f'Verify if {pod.spec.node_selector[label_key]}' @@ -225,6 +235,122 @@ def _raise_pod_scheduling_errors(self, new_nodes): f'Details: \'{event_message}\' ') raise config.KubernetesError(f'{timeout_err_msg}') + def _wait_for_pods_to_schedule(self, new_nodes): + """Wait for all pods to be scheduled. + + Wait for all pods including jump pod to be scheduled, and if it + exceeds the timeout, raise an exception. If pod's container + is ContainerCreating, then we can assume that resources have been + allocated and we can exit. + """ + start_time = time.time() + while time.time() - start_time < self.timeout: + all_pods_scheduled = True + for node in new_nodes: + # Iterate over each pod to check their status + pod = kubernetes.core_api().read_namespaced_pod( + node.metadata.name, self.namespace) + if pod.status.phase == 'Pending': + # If container_statuses is None, then the pod hasn't + # been scheduled yet. + if pod.status.container_statuses is None: + all_pods_scheduled = False + break + + if all_pods_scheduled: + return + time.sleep(1) + + # Handle pod scheduling errors + try: + self._raise_pod_scheduling_errors(new_nodes) + except config.KubernetesError: + raise + except Exception as e: + raise config.KubernetesError( + 'An error occurred while trying to fetch the reason ' + 'for pod scheduling failure. ' + f'Error: {common_utils.format_exception(e)}') from None + + def _wait_for_pods_to_run(self, new_nodes): + """Wait for pods and their containers to be ready. + + Pods may be pulling images or may be in the process of container + creation. + """ + while True: + all_pods_running = True + # Iterate over each pod to check their status + for node in new_nodes: + pod = kubernetes.core_api().read_namespaced_pod( + node.metadata.name, self.namespace) + + # Continue if pod and all the containers within the + # pod are succesfully created and running. + if pod.status.phase == 'Running' and all([ + container.state.running + for container in pod.status.container_statuses + ]): + continue + + all_pods_running = False + if pod.status.phase == 'Pending': + # Iterate over each container in pod to check their status + for container_status in pod.status.container_statuses: + # If the container wasn't in 'ContainerCreating' + # state, then we know pod wasn't scheduled or + # had some other error, such as image pull error. + # See list of possible reasons for waiting here: + # https://stackoverflow.com/a/57886025 + waiting = container_status.state.waiting + if waiting is not None and waiting.reason != 'ContainerCreating': + raise config.KubernetesError( + 'Failed to create container while launching ' + 'the node. Error details: ' + f'{container_status.state.waiting.message}.') + # Reaching this point means that one of the pods had an issue, + # so break out of the loop + break + + if all_pods_running: + break + time.sleep(1) + + def _set_env_vars_in_pods(self, new_nodes): + """Setting environment variables in pods. + + Once all containers are ready, we can exec into them and set env vars. + Kubernetes automatically populates containers with critical + environment variables, such as those for discovering services running + in the cluster and CUDA/nvidia environment variables. We need to + make sure these env vars are available in every task and ssh session. + This is needed for GPU support and service discovery. + See https://github.com/skypilot-org/skypilot/issues/2287 for + more details. + + To do so, we capture env vars from the pod's runtime and write them to + /etc/profile.d/, making them available for all users in future + shell sessions. + """ + set_k8s_env_var_cmd = [ + '/bin/sh', '-c', + ('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && ' + 'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || ' + 'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh') + ] + + for new_node in new_nodes: + kubernetes.stream()( + kubernetes.core_api().connect_get_namespaced_pod_exec, + new_node.metadata.name, + self.namespace, + command=set_k8s_env_var_cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _request_timeout=kubernetes.API_TIMEOUT) + def create_node(self, node_config, tags, count): conf = copy.deepcopy(node_config) pod_spec = conf.get('pod', conf) @@ -265,97 +391,20 @@ def create_node(self, node_config, tags, count): self.namespace, service_spec) new_svcs.append(svc) - # Wait for all pods including jump pod to be ready, and if it - # exceeds the timeout, raise an exception. If pod's container - # is ContainerCreating, then we can assume that resources have been - # allocated and we can exit. + # Adding the jump pod to the new_nodes list as well so it can be + # checked if it's scheduled and running along with other pod instances. ssh_jump_pod_name = conf['metadata']['labels']['skypilot-ssh-jump'] jump_pod = kubernetes.core_api().read_namespaced_pod( ssh_jump_pod_name, self.namespace) new_nodes.append(jump_pod) - start = time.time() - while True: - if time.time() - start > self.timeout: - try: - self._raise_pod_scheduling_errors(new_nodes) - except config.KubernetesError: - raise - except Exception as e: - raise config.KubernetesError( - 'An error occurred while trying to fetch the reason ' - 'for pod scheduling failure. ' - f'Error: {common_utils.format_exception(e)}') from None - - all_ready = True - for node in new_nodes: - pod = kubernetes.core_api().read_namespaced_pod( - node.metadata.name, self.namespace) - if pod.status.phase == 'Pending': - # Iterate over each pod to check their status - if pod.status.container_statuses is not None: - for container_status in pod.status.container_statuses: - # Continue if container status is ContainerCreating - # This indicates this pod has been scheduled. - if container_status.state.waiting is not None and container_status.state.waiting.reason == 'ContainerCreating': - continue - else: - # If the container wasn't in creating state, - # then we know pod wasn't scheduled or had some - # other error, such as image pull error. - # See list of possible reasons for waiting here: - # https://stackoverflow.com/a/57886025 - all_ready = False - else: - # If container_statuses is None, then the pod hasn't - # been scheduled yet. - all_ready = False - if all_ready: - break - time.sleep(1) - - # Wait for pod containers to be ready - they may be pulling images or - # may be in the process of container creation. - while True: - pods = [] - for node in new_nodes: - pod = kubernetes.core_api().read_namespaced_pod( - node.metadata.name, self.namespace) - pods.append(pod) - if all([pod.status.phase == "Running" for pod in pods]) \ - and all( - [container.state.running for pod in pods for container in - pod.status.container_statuses]): - break - time.sleep(1) - # Once all containers are ready, we can exec into them and set env vars. - # Kubernetes automatically populates containers with critical - # environment variables, such as those for discovering services running - # in the cluster and CUDA/nvidia environment variables. We need to - # make sure these env vars are available in every task and ssh session. - # This is needed for GPU support and service discovery. - # See https://github.com/skypilot-org/skypilot/issues/2287 for - # more details. - # To do so, we capture env vars from the pod's runtime and write them to - # /etc/profile.d/, making them available for all users in future - # shell sessions. - set_k8s_env_var_cmd = [ - '/bin/sh', '-c', - ('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && ' - 'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || ' - 'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh') - ] - for new_node in new_nodes: - kubernetes.stream()( - kubernetes.core_api().connect_get_namespaced_pod_exec, - new_node.metadata.name, - self.namespace, - command=set_k8s_env_var_cmd, - stderr=True, - stdin=False, - stdout=True, - tty=False, - _request_timeout=kubernetes.API_TIMEOUT) + # Wait until the pods are scheduled and surface cause for error + # if there is one + self._wait_for_pods_to_schedule(new_nodes) + # Wait until the pods and their containers are up and running, and + # fail early if there is an error + self._wait_for_pods_to_run(new_nodes) + self._set_env_vars_in_pods(new_nodes) def terminate_node(self, node_id): logger.info(config.log_prefix + 'calling delete_namespaced_pod')