Skip to content

Commit

Permalink
[k8s] handle network error from pulling docker image (#2551)
Browse files Browse the repository at this point in the history
* handle network error from pulling docker image

* update timeout

* nit

* nit

* separate scheduing and post-scheduling

* nit

* nit

* nit

* refactor pod scheduling check

* refactor create_node

* nit

* nit

* nit

* testing images

* back

* format

* nit

* nit

* nit

* Update sky/skylet/providers/kubernetes/node_provider.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* nit

* update with more waiting.reason list

* update waiting check locaion

* nit

---------

Co-authored-by: Romil Bhardwaj <[email protected]>
  • Loading branch information
landscapepainter and romilbhardwaj authored Nov 17, 2023
1 parent 4464aee commit 7b1bf0b
Showing 1 changed file with 139 additions and 90 deletions.
229 changes: 139 additions & 90 deletions sky/skylet/providers/kubernetes/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,20 @@ def _set_node_tags(self, node_id, tags):
kubernetes.core_api().patch_namespaced_pod(node_id, self.namespace, pod)

def _raise_pod_scheduling_errors(self, new_nodes):
"""Raise pod scheduling failure reason.
When a pod fails to schedule in Kubernetes, the reasons for the failure
are recorded as events. This function retrieves those events and raises
descriptive errors for better debugging and user feedback.
"""
for new_node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
new_node.metadata.name, self.namespace)
pod_status = pod.status.phase
# When there are multiple pods involved while launching instance,
# there may be a single pod causing issue while others are
# scheduled. In this case, we make sure to not surface the error
# message from the pod that is already scheduled.
# successfully scheduled. In this case, we make sure to not surface
# the error message from the pod that is already scheduled.
if pod_status != 'Pending':
continue
pod_name = pod._metadata._name
Expand All @@ -213,6 +219,8 @@ def _raise_pod_scheduling_errors(self, new_nodes):
events.items,
key=lambda e: e.metadata.creation_timestamp,
reverse=True)

event_message = None
for event in events_desc_by_time:
if event.reason == 'FailedScheduling':
event_message = event.message
Expand Down Expand Up @@ -242,8 +250,10 @@ def _raise_pod_scheduling_errors(self, new_nodes):
# TODO(romilb): We may have additional node
# affinity selectors in the future - in that
# case we will need to update this logic.
if 'Insufficient nvidia.com/gpu' in event_message or \
'didn\'t match Pod\'s node affinity/selector' in event_message:
if ('Insufficient nvidia.com/gpu'
in event_message or
'didn\'t match Pod\'s node affinity/selector'
in event_message):
raise config.KubernetesError(
f'{lack_resource_msg.format(resource="GPU")} '
f'Verify if {pod.spec.node_selector[label_key]}'
Expand All @@ -253,6 +263,122 @@ def _raise_pod_scheduling_errors(self, new_nodes):
f'Details: \'{event_message}\' ')
raise config.KubernetesError(f'{timeout_err_msg}')

def _wait_for_pods_to_schedule(self, new_nodes):
"""Wait for all pods to be scheduled.
Wait for all pods including jump pod to be scheduled, and if it
exceeds the timeout, raise an exception. If pod's container
is ContainerCreating, then we can assume that resources have been
allocated and we can exit.
"""
start_time = time.time()
while time.time() - start_time < self.timeout:
all_pods_scheduled = True
for node in new_nodes:
# Iterate over each pod to check their status
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
if pod.status.phase == 'Pending':
# If container_statuses is None, then the pod hasn't
# been scheduled yet.
if pod.status.container_statuses is None:
all_pods_scheduled = False
break

if all_pods_scheduled:
return
time.sleep(1)

# Handle pod scheduling errors
try:
self._raise_pod_scheduling_errors(new_nodes)
except config.KubernetesError:
raise
except Exception as e:
raise config.KubernetesError(
'An error occurred while trying to fetch the reason '
'for pod scheduling failure. '
f'Error: {common_utils.format_exception(e)}') from None

def _wait_for_pods_to_run(self, new_nodes):
"""Wait for pods and their containers to be ready.
Pods may be pulling images or may be in the process of container
creation.
"""
while True:
all_pods_running = True
# Iterate over each pod to check their status
for node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)

# Continue if pod and all the containers within the
# pod are succesfully created and running.
if pod.status.phase == 'Running' and all([
container.state.running
for container in pod.status.container_statuses
]):
continue

all_pods_running = False
if pod.status.phase == 'Pending':
# Iterate over each container in pod to check their status
for container_status in pod.status.container_statuses:
# If the container wasn't in 'ContainerCreating'
# state, then we know pod wasn't scheduled or
# had some other error, such as image pull error.
# See list of possible reasons for waiting here:
# https://stackoverflow.com/a/57886025
waiting = container_status.state.waiting
if waiting is not None and waiting.reason != 'ContainerCreating':
raise config.KubernetesError(
'Failed to create container while launching '
'the node. Error details: '
f'{container_status.state.waiting.message}.')
# Reaching this point means that one of the pods had an issue,
# so break out of the loop
break

if all_pods_running:
break
time.sleep(1)

def _set_env_vars_in_pods(self, new_nodes):
"""Setting environment variables in pods.
Once all containers are ready, we can exec into them and set env vars.
Kubernetes automatically populates containers with critical
environment variables, such as those for discovering services running
in the cluster and CUDA/nvidia environment variables. We need to
make sure these env vars are available in every task and ssh session.
This is needed for GPU support and service discovery.
See https://github.com/skypilot-org/skypilot/issues/2287 for
more details.
To do so, we capture env vars from the pod's runtime and write them to
/etc/profile.d/, making them available for all users in future
shell sessions.
"""
set_k8s_env_var_cmd = [
'/bin/sh', '-c',
('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && '
'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || '
'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh')
]

for new_node in new_nodes:
kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
new_node.metadata.name,
self.namespace,
command=set_k8s_env_var_cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)

def create_node(self, node_config, tags, count):
conf = copy.deepcopy(node_config)
pod_spec = conf.get('pod', conf)
Expand Down Expand Up @@ -293,97 +419,20 @@ def create_node(self, node_config, tags, count):
self.namespace, service_spec)
new_svcs.append(svc)

# Wait for all pods including jump pod to be ready, and if it
# exceeds the timeout, raise an exception. If pod's container
# is ContainerCreating, then we can assume that resources have been
# allocated and we can exit.
# Adding the jump pod to the new_nodes list as well so it can be
# checked if it's scheduled and running along with other pod instances.
ssh_jump_pod_name = conf['metadata']['labels']['skypilot-ssh-jump']
jump_pod = kubernetes.core_api().read_namespaced_pod(
ssh_jump_pod_name, self.namespace)
new_nodes.append(jump_pod)
start = time.time()
while True:
if time.time() - start > self.timeout:
try:
self._raise_pod_scheduling_errors(new_nodes)
except config.KubernetesError:
raise
except Exception as e:
raise config.KubernetesError(
'An error occurred while trying to fetch the reason '
'for pod scheduling failure. '
f'Error: {common_utils.format_exception(e)}') from None

all_ready = True
for node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
if pod.status.phase == 'Pending':
# Iterate over each pod to check their status
if pod.status.container_statuses is not None:
for container_status in pod.status.container_statuses:
# Continue if container status is ContainerCreating
# This indicates this pod has been scheduled.
if container_status.state.waiting is not None and container_status.state.waiting.reason == 'ContainerCreating':
continue
else:
# If the container wasn't in creating state,
# then we know pod wasn't scheduled or had some
# other error, such as image pull error.
# See list of possible reasons for waiting here:
# https://stackoverflow.com/a/57886025
all_ready = False
else:
# If container_statuses is None, then the pod hasn't
# been scheduled yet.
all_ready = False
if all_ready:
break
time.sleep(1)

# Wait for pod containers to be ready - they may be pulling images or
# may be in the process of container creation.
while True:
pods = []
for node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
pods.append(pod)
if all([pod.status.phase == "Running" for pod in pods]) \
and all(
[container.state.running for pod in pods for container in
pod.status.container_statuses]):
break
time.sleep(1)

# Once all containers are ready, we can exec into them and set env vars.
# Kubernetes automatically populates containers with critical
# environment variables, such as those for discovering services running
# in the cluster and CUDA/nvidia environment variables. We need to
# make sure these env vars are available in every task and ssh session.
# This is needed for GPU support and service discovery.
# See https://github.com/skypilot-org/skypilot/issues/2287 for
# more details.
# To do so, we capture env vars from the pod's runtime and write them to
# /etc/profile.d/, making them available for all users in future
# shell sessions.
set_k8s_env_var_cmd = [
'/bin/sh', '-c',
('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && '
'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || '
'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh')
]
for new_node in new_nodes:
kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
new_node.metadata.name,
self.namespace,
command=set_k8s_env_var_cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)
# Wait until the pods are scheduled and surface cause for error
# if there is one
self._wait_for_pods_to_schedule(new_nodes)
# Wait until the pods and their containers are up and running, and
# fail early if there is an error
self._wait_for_pods_to_run(new_nodes)
self._set_env_vars_in_pods(new_nodes)

def terminate_node(self, node_id):
logger.info(config.log_prefix + 'calling delete_namespaced_pod')
Expand Down

0 comments on commit 7b1bf0b

Please sign in to comment.