Skip to content

Commit

Permalink
[RayCluster][Fix] evicted head-pod can be recreated or restarted (ray…
Browse files Browse the repository at this point in the history
…-project#2217)

Signed-off-by: Goober <[email protected]>
  • Loading branch information
JasonChen86899 authored Aug 9, 2024
1 parent e71c70c commit 6c1c16e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 29 deletions.
37 changes: 14 additions & 23 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
if len(headPods.Items) == 1 {
headPod := headPods.Items[0]
logger.Info("reconcilePods", "Found 1 head Pod", headPod.Name, "Pod status", headPod.Status.Phase,
"Pod status reason", headPod.Status.Reason,
"Pod restart policy", headPod.Spec.RestartPolicy,
"Ray container terminated status", getRayContainerStateTerminated(headPod))

Expand Down Expand Up @@ -812,43 +813,33 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// (1) shouldDelete: Whether the Pod should be deleted.
// (2) reason: The reason why the Pod should or should not be deleted.
func shouldDeletePod(pod corev1.Pod, nodeType rayv1.RayNodeType) (bool, string) {
// If a Pod's restart policy is set to `Always`, KubeRay will not delete
// the Pod and rely on the Pod's restart policy to restart the Pod.
isRestartPolicyAlways := pod.Spec.RestartPolicy == corev1.RestartPolicyAlways
// Based on the logic of the change of the status of the K8S pod, the following judgment is made.
// https://github.com/kubernetes/kubernetes/blob/3361895612dac57044d5dacc029d2ace1865479c/pkg/kubelet/kubelet_pods.go#L1556

// If the Pod's status is `Failed` or `Succeeded`, the Pod will not restart and we can safely delete it.
if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded {
if isRestartPolicyAlways {
// Based on my observation, a Pod with `RestartPolicy: Always` will never be in the terminated states (i.e., `Failed` or `Succeeded`).
// However, I couldn't find any well-defined behavior in the Kubernetes documentation, so I can't guarantee that the status transition
// from `Running` to `Failed / Succeeded` and back to `Running` won't occur when we kill the main process (i.e., `ray start` in KubeRay)
// in the head Pod. Therefore, I've added this check as a safeguard.
reason := fmt.Sprintf(
"The status of the %s Pod %s is %s. However, KubeRay will not delete the Pod because its restartPolicy is set to 'Always' "+
"and it should be able to restart automatically.", nodeType, pod.Name, pod.Status.Phase)
return false, reason
}

reason := fmt.Sprintf(
"The %s Pod %s status is %s which is a terminal state and it will not restart. "+
"KubeRay will delete the Pod and create new Pods in the next reconciliation if necessary.", nodeType, pod.Name, pod.Status.Phase)
"The %s Pod %s status is %s which is a terminal state. "+
"KubeRay will delete the Pod and create new Pods in the next reconciliation if necessary.",
nodeType, pod.Name, pod.Status.Phase)
return true, reason
}

rayContainerTerminated := getRayContainerStateTerminated(pod)
if pod.Status.Phase == corev1.PodRunning && rayContainerTerminated != nil {
if isRestartPolicyAlways {
// If restart policy is set to `Always`, KubeRay will not delete the Pod.
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
reason := fmt.Sprintf(
"The Pod status of the %s Pod %s is %s, and the Ray container terminated status is %v. However, KubeRay will not delete the Pod because its restartPolicy is set to 'Always' "+
"and it should be able to restart automatically.", nodeType, pod.Name, pod.Status.Phase, rayContainerTerminated)
return false, reason
"The Pod status of the %s Pod %s is %s, and the Ray container terminated status is %v. "+
"The container is unable to restart due to its restart policy %s, so KubeRay will delete it.",
nodeType, pod.Name, pod.Status.Phase, rayContainerTerminated, pod.Spec.RestartPolicy)
return true, reason
}
// If restart policy is set to `Always` or `OnFailure`, KubeRay will not delete the Pod.
reason := fmt.Sprintf(
"The Pod status of the %s Pod %s is %s, and the Ray container terminated status is %v. "+
"The container is unable to restart due to its restart policy %s, so KubeRay will delete it.",
"However, KubeRay will not delete the Pod because its restartPolicy is set to %s and it should be able to restart automatically.",
nodeType, pod.Name, pod.Status.Phase, rayContainerTerminated, pod.Spec.RestartPolicy)
return true, reason
return false, reason
}

// TODO (kevin85421): Consider deleting a Pod if its Ray container restarts excessively, as this might
Expand Down
28 changes: 22 additions & 6 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,10 @@ func TestReconcile_PodEvicted_DiffLess0_OK(t *testing.T) {
assert.Equal(t, len(testPods), len(podList.Items), "Init pod list len is wrong")

// Simulate head pod get evicted.
podList.Items[0].Spec.RestartPolicy = corev1.RestartPolicyAlways
err = fakeClient.Update(ctx, &podList.Items[0])
assert.Nil(t, err, "Fail to update head Pod restart policy")
podList.Items[0].Status.Phase = corev1.PodFailed
podList.Items[0].Status.Reason = "Evicted"
err = fakeClient.Status().Update(ctx, &podList.Items[0])
assert.Nil(t, err, "Fail to update head Pod status")

Expand Down Expand Up @@ -2062,18 +2064,32 @@ func Test_TerminatedHead_RestartPolicy(t *testing.T) {
Scheme: newScheme,
}

// The head Pod will not be deleted because the restart policy is `Always`.
// The head Pod will be deleted regardless restart policy.
err = testRayClusterReconciler.reconcilePods(ctx, cluster)
assert.NotNil(t, err)
err = fakeClient.List(ctx, &podList, client.InNamespace(namespaceStr))
assert.Nil(t, err, "Fail to get pod list")
assert.Equal(t, 0, len(podList.Items))

// The new head Pod will be created in this reconcile loop.
err = testRayClusterReconciler.reconcilePods(ctx, cluster)
assert.Nil(t, err)
err = fakeClient.List(ctx, &podList, client.InNamespace(namespaceStr))
assert.Nil(t, err, "Fail to get pod list")
assert.Equal(t, 1, len(podList.Items))

// Make sure the head Pod's restart policy is `Never` and status is `Failed`.
// Make sure the head Pod's restart policy is `Never` and status is `Running`.
podList.Items[0].Spec.RestartPolicy = corev1.RestartPolicyNever
err = fakeClient.Update(ctx, &podList.Items[0])
assert.Nil(t, err)
podList.Items[0].Status.Phase = corev1.PodFailed
podList.Items[0].Status.Phase = corev1.PodRunning
podList.Items[0].Status.ContainerStatuses = append(podList.Items[0].Status.ContainerStatuses,
corev1.ContainerStatus{
Name: podList.Items[0].Spec.Containers[utils.RayContainerIndex].Name,
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{},
},
})
err = fakeClient.Status().Update(ctx, &podList.Items[0])
assert.Nil(t, err)

Expand Down Expand Up @@ -2182,10 +2198,10 @@ func Test_ShouldDeletePod(t *testing.T) {
// The restart policy is `Always` and the Pod is in a terminate state.
// The expected behavior is that the controller will not delete the Pod because
// the restart policy is `Always`.
name: "restartPolicy=Always, phase=PodFailed, shouldDelete=false",
name: "restartPolicy=Always, phase=PodFailed, shouldDelete=true",
restartPolicy: corev1.RestartPolicyAlways,
phase: corev1.PodFailed,
shouldDelete: false,
shouldDelete: true,
},
{
// The restart policy is `Always`, the Pod is not in a terminate state,
Expand Down

0 comments on commit 6c1c16e

Please sign in to comment.