diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index 79b8db3b88e..20e69c399a1 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -186,6 +186,10 @@ const ( HeadPodReady RayClusterConditionType = "HeadPodReady" // RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted. RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure" + // RayClusterSuspending is set to true when a user sets .Spec.Suspend to true, ensuring the atomicity of the suspend operation. + RayClusterSuspending RayClusterConditionType = "RayClusterSuspending" + // RayClusterSuspended is set to true when all Pods belonging to a suspending RayCluster are deleted. Note that RayClusterSuspending and RayClusterSuspended cannot both be true at the same time. + RayClusterSuspended RayClusterConditionType = "RayClusterSuspended" ) // HeadInfo gives info about head diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 9c5ad47c724..64c755534f5 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -210,10 +210,24 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common return pods, nil } +func (r *RayClusterReconciler) validateRayClusterStatus(instance *rayv1.RayCluster) error { + suspending := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending)) + suspended := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspended)) + if suspending && suspended { + return errstd.New("invalid RayCluster State: rayv1.RayClusterSuspending and rayv1.RayClusterSuspended conditions should not be both true") + } + return nil +} + func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) { var reconcileErr error logger := ctrl.LoggerFrom(ctx) + if err := r.validateRayClusterStatus(instance); err != nil { + logger.Error(err, "The RayCluster status is invalid") + return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err + } + // Please do NOT modify `originalRayClusterInstance` in the following code. originalRayClusterInstance := instance.DeepCopy() @@ -340,10 +354,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request // Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it. newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr) var updateErr error + var inconsistent bool if calculateErr != nil { logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr) } else { - updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance) + inconsistent, updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance) } // Return error based on order. @@ -355,7 +370,10 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request } else { err = updateErr } - if err != nil { + // If the custom resource's status is updated, requeue the reconcile key. + // Without this behavior, atomic operations such as the suspend operation would need to wait for `RAYCLUSTER_DEFAULT_REQUEUE_SECONDS` to delete Pods + // after the condition rayv1.RayClusterSuspending is set to true. + if err != nil || inconsistent { return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err } @@ -614,8 +632,11 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error { logger := ctrl.LoggerFrom(ctx) - // if RayCluster is suspended, delete all pods and skip reconcile - if instance.Spec.Suspend != nil && *instance.Spec.Suspend { + // if RayCluster is suspending, delete all pods and skip reconcile + suspendStatus := utils.FindRayClusterSuspendStatus(instance) + statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions) + if suspendStatus == rayv1.RayClusterSuspending || + (!statusConditionGateEnabled && instance.Spec.Suspend != nil && *instance.Spec.Suspend) { if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil { r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod), "Failed deleting Pods due to suspension for RayCluster %s/%s, %v", @@ -629,6 +650,16 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv return nil } + if statusConditionGateEnabled { + if suspendStatus == rayv1.RayClusterSuspended { + return nil // stop reconcilePods because the cluster is suspended. + } + // (suspendStatus != rayv1.RayClusterSuspending) is always true here because it has been checked above. + if instance.Spec.Suspend != nil && *instance.Spec.Suspend { + return nil // stop reconcilePods because the cluster is going to suspend. + } + } + // check if all the pods exist headPods := corev1.PodList{} if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { @@ -1177,7 +1208,8 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra // Deep copy the instance, so we don't mutate the original object. newInstance := instance.DeepCopy() - if features.Enabled(features.RayClusterStatusConditions) { + statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions) + if statusConditionGateEnabled { if reconcileErr != nil { if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" { meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ @@ -1218,8 +1250,8 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra newInstance.Status.State = rayv1.Ready //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 } - // Check if the head node is running and ready by checking the head pod's status. - if features.Enabled(features.RayClusterStatusConditions) { + // Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended. + if statusConditionGateEnabled { headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance) if err != nil { return nil, err @@ -1237,9 +1269,10 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra meta.SetStatusCondition(&newInstance.Status.Conditions, headPodReadyCondition) } - if !meta.IsStatusConditionTrue(newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) { + suspendStatus := utils.FindRayClusterSuspendStatus(newInstance) + if !meta.IsStatusConditionTrue(newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) && suspendStatus != rayv1.RayClusterSuspended { // RayClusterProvisioned indicates whether all Ray Pods are ready when the RayCluster is first created. - // Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time. + // Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time. Unless the cluster has been suspended. if utils.CheckAllPodsRunning(ctx, runtimePods) { meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ Type: string(rayv1.RayClusterProvisioned), @@ -1257,6 +1290,53 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra } } + if suspendStatus == rayv1.RayClusterSuspending { + if len(runtimePods.Items) == 0 { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterProvisioned), + Status: metav1.ConditionFalse, + Reason: rayv1.RayClusterPodsProvisioning, + Message: "RayCluster has been suspended", + }) + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspending), + Reason: string(rayv1.RayClusterSuspending), + Status: metav1.ConditionFalse, + }) + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspended), + Reason: string(rayv1.RayClusterSuspended), + Status: metav1.ConditionTrue, + }) + } + } else if suspendStatus == rayv1.RayClusterSuspended { + if instance.Spec.Suspend != nil && !*instance.Spec.Suspend { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspended), + Reason: string(rayv1.RayClusterSuspended), + Status: metav1.ConditionFalse, + }) + } + } else { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspended), + Reason: string(rayv1.RayClusterSuspended), + Status: metav1.ConditionFalse, + }) + if instance.Spec.Suspend != nil && *instance.Spec.Suspend { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspending), + Reason: string(rayv1.RayClusterSuspending), + Status: metav1.ConditionTrue, + }) + } else { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterSuspending), + Reason: string(rayv1.RayClusterSuspending), + Status: metav1.ConditionFalse, + }) + } + } } if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 { @@ -1359,6 +1439,9 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray if headPod != nil { instance.Status.Head.PodIP = headPod.Status.PodIP instance.Status.Head.PodName = headPod.Name + } else { + instance.Status.Head.PodIP = "" + instance.Status.Head.PodName = "" } ip, name, err := r.getHeadServiceIPAndName(ctx, instance) @@ -1514,17 +1597,20 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex return nil } -func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error { +// updateRayClusterStatus updates the RayCluster status if it is inconsistent with the old status and returns a bool to indicate the inconsistency. +// We rely on the returning bool to requeue the reconciliation for atomic operations, such as suspending a RayCluster. +func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) (bool, error) { logger := ctrl.LoggerFrom(ctx) - if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) { - return nil + inconsistent := r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) + if !inconsistent { + return inconsistent, nil } logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status) err := r.Status().Update(ctx, newInstance) if err != nil { logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance) } - return err + return inconsistent, err } // sumGPUs sums the GPUs in the given resource list. diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index bdeca65562a..eeb0560d731 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -17,6 +17,7 @@ package ray import ( "context" + "errors" "fmt" "time" @@ -414,7 +415,33 @@ var _ = Context("Inside the default namespace", func() { }) }) - Describe("Suspend RayCluster", Ordered, func() { + updateRayClusterSuspendField := func(ctx context.Context, rayCluster *rayv1.RayCluster, suspend bool) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: rayCluster.Namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster = %v", rayCluster) + rayCluster.Spec.Suspend = &suspend + return k8sClient.Update(ctx, rayCluster) + }) + } + + findRayClusterSuspendStatus := func(ctx context.Context, rayCluster *rayv1.RayCluster) (rayv1.RayClusterConditionType, error) { + if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: rayCluster.Namespace}, rayCluster)(); err != nil { + return "", err + } + suspending := meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterSuspending)) + suspended := meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterSuspended)) + if suspending && suspended { + return "invalid", errors.New("invalid: rayv1.RayClusterSuspending and rayv1.RayClusterSuspended should not be both true") + } else if suspending { + return rayv1.RayClusterSuspending, nil + } else if suspended { + return rayv1.RayClusterSuspended, nil + } + return "", nil + } + + testSuspendRayCluster := func(withConditionEnabled bool) { ctx := context.Background() namespace := "default" rayCluster := rayClusterTemplate("raycluster-suspend", namespace) @@ -425,6 +452,13 @@ var _ = Context("Inside the default namespace", func() { headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions() + BeforeAll(func() { + if withConditionEnabled { + cleanUpFunc := features.SetFeatureGateDuringTest(GinkgoTB(), features.RayClusterStatusConditions, true) + DeferCleanup(cleanUpFunc) + } + }) + It("Verify RayCluster spec", func() { // These test are designed based on the following assumptions: // (1) Ray Autoscaler is disabled. @@ -453,14 +487,7 @@ var _ = Context("Inside the default namespace", func() { It("Should delete all head and worker Pods if suspended", func() { // suspend a Raycluster and check that all Pods are deleted. - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster), - time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster: %v", rayCluster) - suspend := true - rayCluster.Spec.Suspend = &suspend - return k8sClient.Update(ctx, rayCluster) - }) + err := updateRayClusterSuspendField(ctx, rayCluster, true) Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") // Check that all Pods are deleted @@ -479,18 +506,19 @@ var _ = Context("Inside the default namespace", func() { Eventually( getClusterState(ctx, namespace, rayCluster.Name), time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended)) + if withConditionEnabled { + Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(Equal(rayv1.RayClusterSuspended)) + Expect(meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterProvisioned))).To(BeFalse()) + // rayCluster.Status.Head.PodName will be cleared. + // rayCluster.Status.Head.PodIP will also be cleared, but we don't test it here since we don't have IPs in tests. + Expect(rayCluster.Status.Head.PodName).To(BeEmpty()) + } }) It("Set suspend to false and then revert it to true before all Pods are running", func() { // set suspend to false - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster), - time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster = %v", rayCluster) - suspend := false - rayCluster.Spec.Suspend = &suspend - return k8sClient.Update(ctx, rayCluster) - }) + err := updateRayClusterSuspendField(ctx, rayCluster, false) Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") // check that all Pods are created @@ -509,14 +537,7 @@ var _ = Context("Inside the default namespace", func() { } // change suspend to true before all Pods are Running. - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster), - time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster = %v", rayCluster) - suspend := true - rayCluster.Spec.Suspend = &suspend - return k8sClient.Update(ctx, rayCluster) - }) + err = updateRayClusterSuspendField(ctx, rayCluster, true) Expect(err).NotTo(HaveOccurred(), "Failed to update test RayCluster resource") // check that all Pods are deleted @@ -534,18 +555,16 @@ var _ = Context("Inside the default namespace", func() { Eventually( getClusterState(ctx, namespace, rayCluster.Name), time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended)) + + if withConditionEnabled { + Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(Equal(rayv1.RayClusterSuspended)) + } }) It("Should run all head and worker pods if un-suspended", func() { // Resume the suspended RayCluster - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster), - time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster = %v", rayCluster) - suspend := false - rayCluster.Spec.Suspend = &suspend - return k8sClient.Update(ctx, rayCluster) - }) + err := updateRayClusterSuspendField(ctx, rayCluster, false) Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") // check that all pods are created @@ -573,6 +592,126 @@ var _ = Context("Inside the default namespace", func() { Eventually( getClusterState(ctx, namespace, rayCluster.Name), time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + if withConditionEnabled { + Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(BeEmpty()) + Expect(meta.IsStatusConditionTrue(rayCluster.Status.Conditions, string(rayv1.RayClusterProvisioned))).To(BeTrue()) + // rayCluster.Status.Head.PodName should have a value now. + // rayCluster.Status.Head.PodIP should also have a value now, but we don't test it here since we don't have IPs in tests. + Expect(rayCluster.Status.Head.PodName).NotTo(BeEmpty()) + } + }) + + It("Delete the cluster", func() { + err := k8sClient.Delete(ctx, rayCluster) + Expect(err).NotTo(HaveOccurred()) + }) + } + + Describe("Suspend RayCluster", Ordered, func() { + testSuspendRayCluster(false) + }) + + Describe("Suspend RayCluster with Condition", Ordered, func() { + testSuspendRayCluster(true) + }) + + Describe("Suspend RayCluster atomically with Condition", Ordered, func() { + ctx := context.Background() + namespace := "default" + rayCluster := rayClusterTemplate("raycluster-suspend-atomically", namespace) + allPods := corev1.PodList{} + allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions() + numPods := 4 // 1 Head + 3 Workers + + BeforeAll(func() { + cleanUpFunc := features.SetFeatureGateDuringTest(GinkgoTB(), features.RayClusterStatusConditions, true) + DeferCleanup(cleanUpFunc) + }) + + It("Create a RayCluster custom resource", func() { + err := k8sClient.Create(ctx, rayCluster) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster") + Eventually(getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name) + }) + + It("Check the number of Pods and add finalizers", func() { + Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500). + Should(Equal(numPods), fmt.Sprintf("all pods %v", allPods.Items)) + // Add finalizers to worker Pods to prevent it from being deleted so that we can test if the status condition makes the suspending process atomic. + for _, pod := range allPods.Items { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + pod.Finalizers = append(pod.Finalizers, "ray.io/deletion-blocker") + return k8sClient.Update(ctx, &pod) + }) + Expect(err).NotTo(HaveOccurred(), "Failed to update Pods") + } + }) + + It("Should turn on the RayClusterSuspending if we set `.Spec.Suspend` back to true", func() { + // suspend a Raycluster. + err := updateRayClusterSuspendField(ctx, rayCluster, true) + Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") + + Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(Equal(rayv1.RayClusterSuspending)) + }) + + It("Should keep RayClusterSuspending consistently if we set `.Spec.Suspend` back to false", func() { + err := updateRayClusterSuspendField(ctx, rayCluster, false) + Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") + + Consistently(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(Equal(rayv1.RayClusterSuspending)) + }) + + It("Pods should be deleted and new Pods should created back after we remove those finalizers", func() { + Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500). + Should(Equal(numPods), fmt.Sprintf("all pods %v", allPods.Items)) + + var oldNames []string + for _, pod := range allPods.Items { + oldNames = append(oldNames, pod.Name) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + pod.Finalizers = nil + return k8sClient.Update(ctx, &pod) + }) + Expect(err).NotTo(HaveOccurred(), "Failed to update Pods") + } + // RayClusterSuspending and RayClusterSuspended should be both false. + Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(BeEmpty()) + Consistently(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(BeEmpty()) + + // New Pods should be created. + Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500). + Should(Equal(numPods), fmt.Sprintf("all pods %v", allPods.Items)) + + var newNames []string + for _, pod := range allPods.Items { + newNames = append(newNames, pod.Name) + } + Expect(newNames).NotTo(ConsistOf(oldNames)) + }) + + It("Set suspend to true and all Pods should be deleted again", func() { + err := updateRayClusterSuspendField(ctx, rayCluster, true) + Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") + + Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500). + Should(Equal(0), fmt.Sprintf("all pods %v", allPods.Items)) + + Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(Equal(rayv1.RayClusterSuspended)) + Consistently(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500). + WithArguments(ctx, rayCluster).Should(Equal(rayv1.RayClusterSuspended)) + }) + + It("Delete the cluster", func() { + err := k8sClient.Delete(ctx, rayCluster) + Expect(err).NotTo(HaveOccurred()) }) }) diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index ee05b3051ae..def623f0716 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -1283,8 +1283,9 @@ func TestReconcile_UpdateClusterReason(t *testing.T) { newTestRayCluster := testRayCluster.DeepCopy() newTestRayCluster.Status.Reason = reason - err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) + inconsistent, err := testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) assert.Nil(t, err, "Fail to update cluster reason") + assert.True(t, inconsistent) err = fakeClient.Get(ctx, namespacedName, &cluster) assert.Nil(t, err, "Fail to get RayCluster after updating reason") @@ -1563,8 +1564,9 @@ func TestReconcile_UpdateClusterState(t *testing.T) { state := rayv1.Ready newTestRayCluster := testRayCluster.DeepCopy() newTestRayCluster.Status.State = state //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 - err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) + inconsistent, err := testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster) assert.Nil(t, err, "Fail to update cluster state") + assert.True(t, inconsistent) err = fakeClient.Get(ctx, namespacedName, &cluster) assert.Nil(t, err, "Fail to get RayCluster after updating state") diff --git a/ray-operator/controllers/ray/suite_helpers_test.go b/ray-operator/controllers/ray/suite_helpers_test.go index 494d651619f..2b5f02639ac 100644 --- a/ray-operator/controllers/ray/suite_helpers_test.go +++ b/ray-operator/controllers/ray/suite_helpers_test.go @@ -33,7 +33,7 @@ func listResourceFunc(ctx context.Context, workerPods *corev1.PodList, opt ...cl count := 0 for _, aPod := range workerPods.Items { - if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && aPod.DeletionTimestamp == nil { + if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && (aPod.DeletionTimestamp == nil || len(aPod.Finalizers) != 0) { count++ } } diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 4375bea3385..13459d3ab17 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -102,6 +102,33 @@ func FindHeadPodReadyCondition(headPod *corev1.Pod) metav1.Condition { return headPodReadyCondition } +// FindRayClusterSuspendStatus returns the current suspend status from two conditions: +// 1. rayv1.RayClusterSuspending +// 2. rayv1.RayClusterSuspended +// +// The two conditions should not be both True at the same time. The transition logic should be the following: +// +// rayv1.RayClusterSuspending: +// False by default +// False -> True: when `spec.Suspend` is true. +// True -> False: when all Pods are deleted, set rayv1.RayClusterSuspended from False to True. +// rayv1.RayClusterSuspended +// False by default +// False -> True: when suspending transitions from True to False +// True -> False: when `spec.Suspend` is false. +// +// If both rayv1.RayClusterSuspending and rayv1.RayClusterSuspended are False, FindRayClusterSuspendStatus returns "". +func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) rayv1.RayClusterConditionType { + for _, cond := range instance.Status.Conditions { + if cond.Type == string(rayv1.RayClusterSuspending) || cond.Type == string(rayv1.RayClusterSuspended) { + if cond.Status == metav1.ConditionTrue { + return rayv1.RayClusterConditionType(cond.Type) + } + } + } + return "" +} + // IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady. func IsRunningAndReady(pod *corev1.Pod) bool { if pod.Status.Phase != corev1.PodRunning {