From ee39d7e3e208fdfbe642895f3fd0d966374c4d88 Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 4 Oct 2024 17:10:14 -0700 Subject: [PATCH] [Feature][RayCluster]: introduce RayClusterSuspending and RayClusterSuspended conditions Signed-off-by: Rueian --- ray-operator/apis/ray/v1/raycluster_types.go | 8 +- .../controllers/ray/raycluster_controller.go | 96 +++++++++++-------- .../ray/raycluster_controller_test.go | 6 +- .../ray/raycluster_controller_unit_test.go | 6 +- ray-operator/controllers/ray/utils/util.go | 7 +- 5 files changed, 71 insertions(+), 52 deletions(-) diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index 1eb3bccaec..0dc00c38e1 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -172,8 +172,6 @@ type RayClusterConditionType string const ( AllPodRunningAndReadyFirstTime = "AllPodRunningAndReadyFirstTime" RayClusterPodsProvisioning = "RayClusterPodsProvisioning" - RayClusterSuspended = "RayClusterSuspended" - RayClusterSuspending = "RayClusterSuspending" HeadPodNotFound = "HeadPodNotFound" HeadPodRunningAndReady = "HeadPodRunningAndReady" // UnknownReason says that the reason for the condition is unknown. @@ -188,8 +186,10 @@ const ( HeadPodReady RayClusterConditionType = "HeadPodReady" // RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted. RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure" - // RayClusterSuspend is added in a RayCluster when a user change its .Spec.Suspend to true - RayClusterSuspend RayClusterConditionType = "RayClusterSuspend" + // RayClusterSuspending is added in a RayCluster when a user change its .Spec.Suspend to true. + RayClusterSuspending RayClusterConditionType = "RayClusterSuspending" + // RayClusterSuspended is added in a RayCluster when it is suspended from the suspending condition. + RayClusterSuspended RayClusterConditionType = "RayClusterSuspended" ) // HeadInfo gives info about head diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 29744f3cb5..93e7cc262f 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -55,11 +55,6 @@ var ( // Definition of a index field for pod name podUIDIndexField = "metadata.uid" - - // errSuspendCondition, errSuspending and errResuming are internal errors for shortcutting the reconcilePods. - errSuspendCondition = errstd.New("suspend condition changed") - errSuspending = fmt.Errorf("suspending: %w", errSuspendCondition) - errResuming = fmt.Errorf("resuming: %w", errSuspendCondition) ) // getDiscoveryClient returns a discovery client for the current reconciler @@ -335,7 +330,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request } for _, fn := range reconcileFuncs { - if reconcileErr = fn(ctx, instance); reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) { + if reconcileErr = fn(ctx, instance); reconcileErr != nil { funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName) break @@ -345,10 +340,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request // Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it. newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr) var updateErr error + var inconsistent bool if calculateErr != nil { logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr) } else { - updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance) + inconsistent, updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance) } // Return error based on order. @@ -360,7 +356,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request } else { err = updateErr } - if err != nil { + if err != nil || inconsistent { return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err } @@ -621,8 +617,9 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv // if RayCluster is suspending, delete all pods and skip reconcile suspendStatus := utils.FindRayClusterSuspendStatus(instance) + statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions) if suspendStatus == rayv1.RayClusterSuspending || - (!features.Enabled(features.RayClusterStatusConditions) && instance.Spec.Suspend != nil && *instance.Spec.Suspend) { + (!statusConditionGateEnabled && instance.Spec.Suspend != nil && *instance.Spec.Suspend) { if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil { r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod), "Failed deleting Pods due to suspension for RayCluster %s/%s, %v", @@ -636,16 +633,13 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv return nil } - if features.Enabled(features.RayClusterStatusConditions) { + if statusConditionGateEnabled { if suspendStatus == rayv1.RayClusterSuspended { - if instance.Spec.Suspend != nil && !*instance.Spec.Suspend { - return errResuming - } return nil // stop reconcilePods because the cluster is suspended. } // (suspendStatus != rayv1.RayClusterSuspending) is always true here because it has been checked above. if instance.Spec.Suspend != nil && *instance.Spec.Suspend { - return errSuspending + return nil // stop reconcilePods because the cluster is going to suspend. } } @@ -1190,27 +1184,16 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster, reconcileErr error) (*rayv1.RayCluster, error) { // TODO: Replace this log and use reconcileErr to set the condition field. logger := ctrl.LoggerFrom(ctx) - if reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) { + if reconcileErr != nil { logger.Info("Reconciliation error", "error", reconcileErr) } // Deep copy the instance, so we don't mutate the original object. newInstance := instance.DeepCopy() - if features.Enabled(features.RayClusterStatusConditions) { - if errstd.Is(reconcileErr, errSuspending) { - meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ - Type: string(rayv1.RayClusterSuspend), - Reason: rayv1.RayClusterSuspending, - Status: metav1.ConditionTrue, - }) - } else if errstd.Is(reconcileErr, errResuming) { - meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ - Type: string(rayv1.RayClusterSuspend), - Reason: rayv1.RayClusterSuspended, - Status: metav1.ConditionFalse, - }) - } else if reconcileErr != nil { + statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions) + if statusConditionGateEnabled { + if reconcileErr != nil { if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" { meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ Type: string(rayv1.RayClusterReplicaFailure), @@ -1251,7 +1234,7 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra } // Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended. - if features.Enabled(features.RayClusterStatusConditions) { + if statusConditionGateEnabled { headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance) if err != nil { return nil, err @@ -1290,13 +1273,47 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra } suspendStatus := utils.FindRayClusterSuspendStatus(newInstance) - if suspendStatus == rayv1.RayClusterSuspending && len(runtimePods.Items) == 0 { - meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) + if suspendStatus == rayv1.RayClusterSuspending { + if len(runtimePods.Items) == 0 { + meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspending), + Reason: string(rayv1.RayClusterSuspending), + Status: metav1.ConditionFalse, + }) + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspended), + Reason: string(rayv1.RayClusterSuspended), + Status: metav1.ConditionTrue, + }) + } + } else if suspendStatus == rayv1.RayClusterSuspended { + if instance.Spec.Suspend != nil && !*instance.Spec.Suspend { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspended), + Reason: string(rayv1.RayClusterSuspended), + Status: metav1.ConditionFalse, + }) + } + } else { meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ - Type: string(rayv1.RayClusterSuspend), - Reason: rayv1.RayClusterSuspended, - Status: metav1.ConditionTrue, + Type: string(rayv1.RayClusterSuspended), + Reason: string(rayv1.RayClusterSuspended), + Status: metav1.ConditionFalse, }) + if instance.Spec.Suspend != nil && *instance.Spec.Suspend { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspending), + Reason: string(rayv1.RayClusterSuspending), + Status: metav1.ConditionTrue, + }) + } else { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspending), + Reason: string(rayv1.RayClusterSuspending), + Status: metav1.ConditionFalse, + }) + } } } @@ -1555,17 +1572,18 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex return nil } -func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error { +func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) (bool, error) { logger := ctrl.LoggerFrom(ctx) - if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) { - return nil + inconsistent := r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) + if !inconsistent { + return inconsistent, nil } logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status) err := r.Status().Update(ctx, newInstance) if err != nil { logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance) } - return err + return inconsistent, err } // sumGPUs sums the GPUs in the given resource list. diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index 7bae0b1099..1e3311d5f3 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -488,7 +488,7 @@ var _ = Context("Inside the default namespace", func() { time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended)) if withConditionEnabled { Eventually( - func() string { + func() rayv1.RayClusterConditionType { if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil { return "" } @@ -554,7 +554,7 @@ var _ = Context("Inside the default namespace", func() { if withConditionEnabled { Eventually( - func() string { + func() rayv1.RayClusterConditionType { if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil { return "" } @@ -603,7 +603,7 @@ var _ = Context("Inside the default namespace", func() { time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) if withConditionEnabled { Eventually( - func() string { + func() rayv1.RayClusterConditionType { if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil { return "" } diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index ee05b3051a..def623f071 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -1283,8 +1283,9 @@ func TestReconcile_UpdateClusterReason(t *testing.T) { newTestRayCluster := testRayCluster.DeepCopy() newTestRayCluster.Status.Reason = reason - err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) + inconsistent, err := testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) assert.Nil(t, err, "Fail to update cluster reason") + assert.True(t, inconsistent) err = fakeClient.Get(ctx, namespacedName, &cluster) assert.Nil(t, err, "Fail to get RayCluster after updating reason") @@ -1563,8 +1564,9 @@ func TestReconcile_UpdateClusterState(t *testing.T) { state := rayv1.Ready newTestRayCluster := testRayCluster.DeepCopy() newTestRayCluster.Status.State = state //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 - err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) + inconsistent, err := testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) assert.Nil(t, err, "Fail to update cluster state") + assert.True(t, inconsistent) err = fakeClient.Get(ctx, namespacedName, &cluster) assert.Nil(t, err, "Fail to get RayCluster after updating state") diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index aa540f26b9..a9bb4553f5 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -102,13 +102,12 @@ func FindHeadPodReadyCondition(headPod *corev1.Pod) metav1.Condition { return headPodReadyCondition } -func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) string { +func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) rayv1.RayClusterConditionType { for _, cond := range instance.Status.Conditions { - if cond.Type == string(rayv1.RayClusterSuspend) { + if cond.Type == string(rayv1.RayClusterSuspending) || cond.Type == string(rayv1.RayClusterSuspended) { if cond.Status == metav1.ConditionTrue { - return cond.Reason + return rayv1.RayClusterConditionType(cond.Type) } - break } } return ""