Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][RayCluster]: introduce RayClusterSuspending and RayClusterSuspended conditions #2403

Merged
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ const (
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
// RayClusterSuspending is set to true when a user sets .Spec.Suspend to true, ensuring the atomicity of the suspend operation.
RayClusterSuspending RayClusterConditionType = "RayClusterSuspending"
// RayClusterSuspended is set to true when all Pods belonging to a suspending RayCluster are deleted. Note that RayClusterSuspending and RayClusterSuspended cannot both be true at the same time.
RayClusterSuspended RayClusterConditionType = "RayClusterSuspended"
)

// HeadInfo gives info about head
Expand Down
109 changes: 97 additions & 12 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,24 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common
return pods, nil
}

func (r *RayClusterReconciler) validateRayClusterStatus(instance *rayv1.RayCluster) error {
suspending := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending))
suspended := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspended))
if suspending && suspended {
return errstd.New("invalid RayCluster State: rayv1.RayClusterSuspending and rayv1.RayClusterSuspended conditions should not be both true")
}
return nil
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) {
var reconcileErr error
logger := ctrl.LoggerFrom(ctx)

if err := r.validateRayClusterStatus(instance); err != nil {
logger.Error(err, "The RayCluster status is invalid")
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// Please do NOT modify `originalRayClusterInstance` in the following code.
originalRayClusterInstance := instance.DeepCopy()

Expand Down Expand Up @@ -340,10 +354,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
rueian marked this conversation as resolved.
Show resolved Hide resolved
newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr)
var updateErr error
var inconsistent bool
if calculateErr != nil {
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
} else {
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
inconsistent, updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
}

// Return error based on order.
Expand All @@ -355,7 +370,9 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
} else {
err = updateErr
}
if err != nil {
// Besides an error, we also requeue the reconciliation if status changed.
rueian marked this conversation as resolved.
Show resolved Hide resolved
// This behavior is required by atomic operations that depend on status changes, such as suspending a RayCluster.
if err != nil || inconsistent {
rueian marked this conversation as resolved.
Show resolved Hide resolved
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

Expand Down Expand Up @@ -614,8 +631,11 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions)
if suspendStatus == rayv1.RayClusterSuspending ||
(!statusConditionGateEnabled && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to send a deleteAllPods request to the K8s API server for each reconciliation during Suspending?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do that until all pods are deleted and then we switch to suspended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it complex to call deleteAllPods only when there are running Pods in the cluster? If not, we can update in this PR. If it is not trivial, can you open an issue to track the progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After revising the source, I found that deleteAllPods already did the improvement: only sends API requests if there are Pods still running.

func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common.AssociationOptions) (pods corev1.PodList, err error) {
logger := ctrl.LoggerFrom(ctx)
if err = r.List(ctx, &pods, filters.ToListOptions()...); err != nil {
return pods, err
}
active := 0
for _, pod := range pods.Items {
if pod.DeletionTimestamp.IsZero() {
active++
}
}
if active > 0 {
logger.Info("Deleting all Pods with labels", "filters", filters, "Number of active Pods", active)
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, filters.ToDeleteOptions()...)
}
return pods, nil

I guess there is no need for an issue to track the progress?

r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod),
"Failed deleting Pods due to suspension for RayCluster %s/%s, %v",
Expand All @@ -629,6 +649,16 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

if statusConditionGateEnabled {
if suspendStatus == rayv1.RayClusterSuspended {
return nil // stop reconcilePods because the cluster is suspended.
}
// (suspendStatus != rayv1.RayClusterSuspending) is always true here because it has been checked above.
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
return nil // stop reconcilePods because the cluster is going to suspend.
}
}

// check if all the pods exist
headPods := corev1.PodList{}
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
Expand Down Expand Up @@ -1177,7 +1207,8 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

if features.Enabled(features.RayClusterStatusConditions) {
statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions)
if statusConditionGateEnabled {
if reconcileErr != nil {
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Expand Down Expand Up @@ -1218,8 +1249,8 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.State = rayv1.Ready //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
}

// Check if the head node is running and ready by checking the head pod's status.
if features.Enabled(features.RayClusterStatusConditions) {
// Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended.
if statusConditionGateEnabled {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
if err != nil {
return nil, err
Expand All @@ -1239,7 +1270,7 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra

if !meta.IsStatusConditionTrue(newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) {
// RayClusterProvisioned indicates whether all Ray Pods are ready when the RayCluster is first created.
// Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time.
// Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time. Unless the cluster has been suspended.
if utils.CheckAllPodsRunning(ctx, runtimePods) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterProvisioned),
Expand All @@ -1257,6 +1288,54 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}
}

suspendStatus := utils.FindRayClusterSuspendStatus(newInstance)
rueian marked this conversation as resolved.
Show resolved Hide resolved
if suspendStatus == rayv1.RayClusterSuspending {
if len(runtimePods.Items) == 0 {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterProvisioned),
Status: metav1.ConditionFalse,
Reason: rayv1.RayClusterPodsProvisioning,
Message: "RayCluster has been suspended",
})
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionFalse,
})
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
rueian marked this conversation as resolved.
Show resolved Hide resolved
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionTrue,
})
}
} else if suspendStatus == rayv1.RayClusterSuspended {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionFalse,
})
}
} else {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionFalse,
})
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionTrue,
})
} else {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionFalse,
})
}
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
Expand Down Expand Up @@ -1359,6 +1438,9 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
if headPod != nil {
instance.Status.Head.PodIP = headPod.Status.PodIP
instance.Status.Head.PodName = headPod.Name
} else {
instance.Status.Head.PodIP = ""
instance.Status.Head.PodName = ""
rueian marked this conversation as resolved.
Show resolved Hide resolved
}

ip, name, err := r.getHeadServiceIPAndName(ctx, instance)
Expand Down Expand Up @@ -1514,17 +1596,20 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
return nil
}

func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
// updateRayClusterStatus updates the RayCluster status if it is inconsistent with the old status and returns a bool to indicate the inconsistency.
// We rely on the returning bool to requeue the reconciliation for atomic operations, such as suspending a RayCluster.
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) (bool, error) {
rueian marked this conversation as resolved.
Show resolved Hide resolved
logger := ctrl.LoggerFrom(ctx)
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
return nil
inconsistent := r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status)
if !inconsistent {
return inconsistent, nil
}
logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
err := r.Status().Update(ctx, newInstance)
if err != nil {
logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance)
}
return err
return inconsistent, err
}

// sumGPUs sums the GPUs in the given resource list.
Expand Down
110 changes: 109 additions & 1 deletion ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ray

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -36,6 +37,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/utils/ptr"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -414,7 +416,7 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("Suspend RayCluster", Ordered, func() {
testSuspendRayCluster := func(withConditionEnabled bool) {
ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-suspend", namespace)
Expand All @@ -425,6 +427,29 @@ var _ = Context("Inside the default namespace", func() {
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()
allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions()

findRayClusterSuspendStatus := func() (rayv1.RayClusterConditionType, error) {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return "", err
}
suspending := meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterSuspending))
suspended := meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterSuspended))
if suspending && suspended {
return "", errors.New("invalid: rayv1.RayClusterSuspending and rayv1.RayClusterSuspended should not be both true")
} else if suspending {
return rayv1.RayClusterSuspending, nil
} else if suspended {
return rayv1.RayClusterSuspended, nil
}
return "", nil
}

BeforeAll(func() {
if withConditionEnabled {
cleanUpFunc := features.SetFeatureGateDuringTest(GinkgoTB(), features.RayClusterStatusConditions, true)
DeferCleanup(cleanUpFunc)
}
})

It("Verify RayCluster spec", func() {
// These test are designed based on the following assumptions:
// (1) Ray Autoscaler is disabled.
Expand All @@ -449,6 +474,16 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
listResourceFunc(ctx, &workerPods, workerFilters...),
time.Second*3, time.Millisecond*500).Should(Equal(numWorkerPods), fmt.Sprintf("workerGroup %v", workerPods.Items))

if withConditionEnabled { // Add finalizers to worker Pods to prevent it from being deleted so that we can test if the status condition makes the suspending process atomic.
for _, pod := range workerPods.Items {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod.Finalizers = append(pod.Finalizers, "ray.io/deletion-blocker")
return k8sClient.Update(ctx, &pod)
})
Expect(err).NotTo(HaveOccurred(), "Failed to update worker Pods")
}
}
})

It("Should delete all head and worker Pods if suspended", func() {
Expand All @@ -463,6 +498,46 @@ var _ = Context("Inside the default namespace", func() {
})
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")

// Make sure the suspending process is atomic by removing finalizers one by one.
if withConditionEnabled {
// Make sure the cluster is now marked with RayClusterSuspending.
Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500).Should(Equal(rayv1.RayClusterSuspending))
// Then turn the Spec.Suspend to false.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
rueian marked this conversation as resolved.
Show resolved Hide resolved
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster: %v", rayCluster)
suspend := false
rayCluster.Spec.Suspend = &suspend
return k8sClient.Update(ctx, rayCluster)
})
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")

// Remove the finalizers one by one to make sure the suspending process is still ongoing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate the atomicity test into its own test? I want to avoid mixing both code paths in a single test. In the atomicity test, can you verify the following:

  1. Add finalizers to the Pods.
  2. Set suspend to true, and check that the suspending condition is true.
  3. Set suspend back to false, and ensure the suspending condition remains true for 2 to 3 seconds (use Consistently).
  4. Remove the finalizers, and verify that both suspending and suspended conditions become false, all old Pods are deleted, and new Pods are created.
  5. Set suspend to true, and all Pods should be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, a new test is added.

Eventually(listResourceFunc(ctx, &workerPods, workerFilters...), time.Second*3, time.Millisecond*500).Should(Equal(3))
for i, pod := range workerPods.Items {
if i == len(workerPods.Items)-1 { // Turn the Spec.Suspend back to true before we remove the last finalizer.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster: %v", rayCluster)
suspend := true
rayCluster.Spec.Suspend = &suspend
return k8sClient.Update(ctx, rayCluster)
})
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")
}
Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500).Should(Equal(rayv1.RayClusterSuspending))
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(getResourceFunc(ctx, client.ObjectKey{Name: pod.Name, Namespace: namespace}, &pod), time.Second*3, time.Millisecond*500).Should(BeNil())
pod.Finalizers = nil
return k8sClient.Update(ctx, &pod)
})
Expect(err).NotTo(HaveOccurred(), "Failed to update the worker Pod")
Eventually(getResourceFunc(ctx, client.ObjectKey{Name: pod.Name, Namespace: namespace}, &pod), time.Second*3, time.Millisecond*500).Should(MatchError(k8serrors.NewNotFound(corev1.Resource("pods"), pod.Name)))
}
}

// Check that all Pods are deleted
Eventually(
listResourceFunc(ctx, &workerPods, workerFilters...),
Expand All @@ -479,6 +554,13 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended))
if withConditionEnabled {
Eventually(
findRayClusterSuspendStatus,
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.RayClusterSuspended))
Expect(meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterProvisioned))).To(BeFalse())
Expect(rayCluster.Status.Head.PodName).To(BeEmpty())
}
})

It("Set suspend to false and then revert it to true before all Pods are running", func() {
Expand Down Expand Up @@ -534,6 +616,12 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended))

rueian marked this conversation as resolved.
Show resolved Hide resolved
if withConditionEnabled {
Eventually(
findRayClusterSuspendStatus,
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.RayClusterSuspended))
}
})

It("Should run all head and worker pods if un-suspended", func() {
Expand Down Expand Up @@ -573,7 +661,27 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
if withConditionEnabled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly add comments to describe which fields should be updated when the status transitions?

Eventually(
findRayClusterSuspendStatus,
time.Second*3, time.Millisecond*500).Should(BeEmpty())
Expect(meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterProvisioned))).To(BeTrue())
Expect(rayCluster.Status.Head.PodName).NotTo(BeEmpty())
}
})

It("Delete the cluster", func() {
err := k8sClient.Delete(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred())
})
}

Describe("Suspend RayCluster", Ordered, func() {
testSuspendRayCluster(false)
})

Describe("Suspend RayCluster with Condition", Ordered, func() {
testSuspendRayCluster(true)
})

Describe("RayCluster with a multi-host worker group", Ordered, func() {
Expand Down
Loading
Loading