Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] handle network error from pulling docker image #2551

Merged
Merged
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3d5e97b
handle network error from pulling docker image
landscapepainter Sep 13, 2023
f6f93be
update timeout
landscapepainter Sep 13, 2023
7e84a1b
nit
landscapepainter Sep 14, 2023
2bf6968
nit
landscapepainter Sep 14, 2023
88fe005
separate scheduing and post-scheduling
landscapepainter Sep 16, 2023
3f8b923
Merge branch 'master' into handle_image_pull_failure
landscapepainter Sep 19, 2023
565765f
nit
landscapepainter Sep 19, 2023
5a7e843
nit
landscapepainter Sep 19, 2023
980446f
nit
landscapepainter Sep 21, 2023
e808b86
refactor pod scheduling check
landscapepainter Sep 26, 2023
30d1220
refactor create_node
landscapepainter Sep 27, 2023
fc08d50
nit
landscapepainter Sep 27, 2023
4039a51
nit
landscapepainter Sep 27, 2023
0d7fce5
nit
landscapepainter Sep 27, 2023
079caff
testing images
landscapepainter Oct 8, 2023
a98a6b4
back
landscapepainter Oct 18, 2023
704b7b8
Merge branch 'master' of https://github.com/landscapepainter/skypilot
landscapepainter Nov 7, 2023
554fd37
Merge branch 'master' into handle_image_pull_failure
landscapepainter Nov 7, 2023
324c6e1
format
landscapepainter Nov 7, 2023
7cd299d
nit
landscapepainter Nov 7, 2023
67e02eb
nit
landscapepainter Nov 7, 2023
5e92f8a
nit
landscapepainter Nov 9, 2023
5ec4a64
Update sky/skylet/providers/kubernetes/node_provider.py
landscapepainter Nov 9, 2023
f7bcb97
Merge branch 'handle_image_pull_failure' of https://github.com/landsc…
landscapepainter Nov 9, 2023
a364581
nit
landscapepainter Nov 9, 2023
f590ace
update with more waiting.reason list
landscapepainter Nov 12, 2023
87d41cf
update waiting check locaion
landscapepainter Nov 12, 2023
c131990
nit
landscapepainter Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 139 additions & 90 deletions sky/skylet/providers/kubernetes/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,20 @@ def _set_node_tags(self, node_id, tags):
kubernetes.core_api().patch_namespaced_pod(node_id, self.namespace, pod)

def _raise_pod_scheduling_errors(self, new_nodes):
"""Raise pod scheduling failure reason.

When a pod fails to schedule in Kubernetes, the reasons for the failure
are recorded as events. This function retrieves those events and raises
descriptive errors for better debugging and user feedback.
"""
for new_node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
new_node.metadata.name, self.namespace)
pod_status = pod.status.phase
# When there are multiple pods involved while launching instance,
# there may be a single pod causing issue while others are
# scheduled. In this case, we make sure to not surface the error
# message from the pod that is already scheduled.
# successfully scheduled. In this case, we make sure to not surface
# the error message from the pod that is already scheduled.
if pod_status != 'Pending':
continue
pod_name = pod._metadata._name
Expand All @@ -185,6 +191,8 @@ def _raise_pod_scheduling_errors(self, new_nodes):
events.items,
key=lambda e: e.metadata.creation_timestamp,
reverse=True)

event_message = None
for event in events_desc_by_time:
if event.reason == 'FailedScheduling':
event_message = event.message
Expand Down Expand Up @@ -214,8 +222,10 @@ def _raise_pod_scheduling_errors(self, new_nodes):
# TODO(romilb): We may have additional node
# affinity selectors in the future - in that
# case we will need to update this logic.
if 'Insufficient nvidia.com/gpu' in event_message or \
'didn\'t match Pod\'s node affinity/selector' in event_message:
if ('Insufficient nvidia.com/gpu'
in event_message or
'didn\'t match Pod\'s node affinity/selector'
in event_message):
raise config.KubernetesError(
f'{lack_resource_msg.format(resource="GPU")} '
f'Verify if {pod.spec.node_selector[label_key]}'
Expand All @@ -225,6 +235,122 @@ def _raise_pod_scheduling_errors(self, new_nodes):
f'Details: \'{event_message}\' ')
raise config.KubernetesError(f'{timeout_err_msg}')

def _wait_for_pods_to_schedule(self, new_nodes):
"""Wait for all pods to be scheduled.

Wait for all pods including jump pod to be scheduled, and if it
exceeds the timeout, raise an exception. If pod's container
is ContainerCreating, then we can assume that resources have been
allocated and we can exit.
"""
start_time = time.time()
while time.time() - start_time < self.timeout:
all_pods_scheduled = True
for node in new_nodes:
# Iterate over each pod to check their status
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
if pod.status.phase == 'Pending':
# If container_statuses is None, then the pod hasn't
# been scheduled yet.
if pod.status.container_statuses is None:
all_pods_scheduled = False
break

if all_pods_scheduled:
return
time.sleep(1)

# Handle pod scheduling errors
try:
self._raise_pod_scheduling_errors(new_nodes)
except config.KubernetesError:
raise
except Exception as e:
raise config.KubernetesError(
'An error occurred while trying to fetch the reason '
'for pod scheduling failure. '
f'Error: {common_utils.format_exception(e)}') from None

def _wait_for_pods_to_run(self, new_nodes):
"""Wait for pods and their containers to be ready.

Pods may be pulling images or may be in the process of container
creation.
"""
while True:
all_pods_running = True
# Iterate over each pod to check their status
for node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)

# Continue if pod and all the containers within the
# pod are succesfully created and running.
if pod.status.phase == 'Running' and all([
container.state.running
for container in pod.status.container_statuses
]):
continue

all_pods_running = False
if pod.status.phase == 'Pending':
# Iterate over each container in pod to check their status
for container_status in pod.status.container_statuses:
# If the container wasn't in 'ContainerCreating'
# state, then we know pod wasn't scheduled or
# had some other error, such as image pull error.
# See list of possible reasons for waiting here:
# https://stackoverflow.com/a/57886025
waiting = container_status.state.waiting
if waiting is not None and waiting.reason != 'ContainerCreating':
raise config.KubernetesError(
'Failed to create container while launching '
'the node. Error details: '
f'{container_status.state.waiting.message}.')
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
# Reaching this point means that one of the pods had an issue,
# so break out of the loop
break

if all_pods_running:
break
time.sleep(1)

def _set_env_vars_in_pods(self, new_nodes):
"""Setting environment variables in pods.

Once all containers are ready, we can exec into them and set env vars.
Kubernetes automatically populates containers with critical
environment variables, such as those for discovering services running
in the cluster and CUDA/nvidia environment variables. We need to
make sure these env vars are available in every task and ssh session.
This is needed for GPU support and service discovery.
See https://github.com/skypilot-org/skypilot/issues/2287 for
more details.

To do so, we capture env vars from the pod's runtime and write them to
/etc/profile.d/, making them available for all users in future
shell sessions.
"""
set_k8s_env_var_cmd = [
'/bin/sh', '-c',
('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && '
'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || '
'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh')
]

for new_node in new_nodes:
kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
new_node.metadata.name,
self.namespace,
command=set_k8s_env_var_cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)

def create_node(self, node_config, tags, count):
conf = copy.deepcopy(node_config)
pod_spec = conf.get('pod', conf)
Expand Down Expand Up @@ -265,97 +391,20 @@ def create_node(self, node_config, tags, count):
self.namespace, service_spec)
new_svcs.append(svc)

# Wait for all pods including jump pod to be ready, and if it
# exceeds the timeout, raise an exception. If pod's container
# is ContainerCreating, then we can assume that resources have been
# allocated and we can exit.
# Adding the jump pod to the new_nodes list as well so it can be
# checked if it's scheduled and running along with other pod instances.
ssh_jump_pod_name = conf['metadata']['labels']['skypilot-ssh-jump']
jump_pod = kubernetes.core_api().read_namespaced_pod(
ssh_jump_pod_name, self.namespace)
new_nodes.append(jump_pod)
start = time.time()
while True:
if time.time() - start > self.timeout:
try:
self._raise_pod_scheduling_errors(new_nodes)
except config.KubernetesError:
raise
except Exception as e:
raise config.KubernetesError(
'An error occurred while trying to fetch the reason '
'for pod scheduling failure. '
f'Error: {common_utils.format_exception(e)}') from None

all_ready = True
for node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
if pod.status.phase == 'Pending':
# Iterate over each pod to check their status
if pod.status.container_statuses is not None:
for container_status in pod.status.container_statuses:
# Continue if container status is ContainerCreating
# This indicates this pod has been scheduled.
if container_status.state.waiting is not None and container_status.state.waiting.reason == 'ContainerCreating':
continue
else:
# If the container wasn't in creating state,
# then we know pod wasn't scheduled or had some
# other error, such as image pull error.
# See list of possible reasons for waiting here:
# https://stackoverflow.com/a/57886025
all_ready = False
else:
# If container_statuses is None, then the pod hasn't
# been scheduled yet.
all_ready = False
if all_ready:
break
time.sleep(1)

# Wait for pod containers to be ready - they may be pulling images or
# may be in the process of container creation.
while True:
pods = []
for node in new_nodes:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
pods.append(pod)
if all([pod.status.phase == "Running" for pod in pods]) \
and all(
[container.state.running for pod in pods for container in
pod.status.container_statuses]):
break
time.sleep(1)

# Once all containers are ready, we can exec into them and set env vars.
# Kubernetes automatically populates containers with critical
# environment variables, such as those for discovering services running
# in the cluster and CUDA/nvidia environment variables. We need to
# make sure these env vars are available in every task and ssh session.
# This is needed for GPU support and service discovery.
# See https://github.com/skypilot-org/skypilot/issues/2287 for
# more details.
# To do so, we capture env vars from the pod's runtime and write them to
# /etc/profile.d/, making them available for all users in future
# shell sessions.
set_k8s_env_var_cmd = [
'/bin/sh', '-c',
('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && '
'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || '
'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh')
]
for new_node in new_nodes:
kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
new_node.metadata.name,
self.namespace,
command=set_k8s_env_var_cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)
# Wait until the pods are scheduled and surface cause for error
# if there is one
self._wait_for_pods_to_schedule(new_nodes)
# Wait until the pods and their containers are up and running, and
# fail early if there is an error
self._wait_for_pods_to_run(new_nodes)
self._set_env_vars_in_pods(new_nodes)

def terminate_node(self, node_id):
logger.info(config.log_prefix + 'calling delete_namespaced_pod')
Expand Down