From 99abccf035fe95e949cc1670e6a4ce2270a14fe5 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Wed, 25 Oct 2023 11:10:59 -0700 Subject: [PATCH] [Feature] Add a flag to make zero downtime upgrades optional (#1564) --- helm-chart/kuberay-operator/values.yaml | 4 + .../controllers/ray/rayservice_controller.go | 14 ++- .../ray/rayservice_controller_test.go | 42 +++++++ .../ray/rayservice_controller_unit_test.go | 109 ++++++++++++++++++ 4 files changed, 168 insertions(+), 1 deletion(-) diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index e61c653ec4..87255fb3c7 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -101,3 +101,7 @@ env: # If not set or set to "true", KubeRay will clean up the Redis storage namespace when a GCS FT-enabled RayCluster is deleted. # - name: ENABLE_GCS_FT_REDIS_CLEANUP # value: "true" +# For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously. +# Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades. +# - name: ENABLE_ZERO_DOWNTIME +# value: "true" diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index ea5ed35cd4..c2648a5a94 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -3,6 +3,7 @@ package ray import ( "context" "fmt" + "os" "reflect" "strings" "time" @@ -45,6 +46,7 @@ const ( ServiceRestartRequeueDuration = 10 * time.Second RayClusterDeletionDelayDuration = 60 * time.Second DeploymentUnhealthySecondThreshold = 300.0 // Dashboard agent related health check. + ENABLE_ZERO_DOWNTIME = "ENABLE_ZERO_DOWNTIME" ) // RayServiceReconciler reconciles a RayService object @@ -367,7 +369,17 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi } if r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) { - r.markRestart(rayServiceInstance) + // For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously. + // Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades. + enableZeroDowntime := true + if s := os.Getenv(ENABLE_ZERO_DOWNTIME); strings.ToLower(s) == "false" { + enableZeroDowntime = false + } + if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil { + r.markRestart(rayServiceInstance) + } else { + r.Log.Info("Zero-downtime upgrade is disabled (ENABLE_ZERO_DOWNTIME: false). Skip preparing a new RayCluster.") + } return activeRayCluster, nil, nil } diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index e558df73d6..02fde22999 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -18,6 +18,7 @@ package ray import ( "context" "fmt" + "os" "time" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" @@ -377,6 +378,47 @@ applications: time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name) }) + It("Disable zero-downtime upgrade", func() { + // Disable zero-downtime upgrade. + os.Setenv("ENABLE_ZERO_DOWNTIME", "false") + + // Try to trigger a zero-downtime upgrade. + oldRayVersion := myRayService.Spec.RayClusterSpec.RayVersion + newRayVersion := "2.198.0" + Expect(oldRayVersion).ShouldNot(Equal(newRayVersion)) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService), + time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name) + myRayService.Spec.RayClusterSpec.RayVersion = newRayVersion + return k8sClient.Update(ctx, myRayService) + }) + Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource") + + // Because the zero-downtime upgrade is disabled, the RayService controller will not prepare a new RayCluster. + Consistently( + getPreparingRayClusterNameFunc(ctx, myRayService), + time.Second*5, time.Millisecond*500).Should(BeEmpty(), "Pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName) + + // Set the RayVersion back to the old value to avoid triggering the zero-downtime upgrade. + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService), + time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name) + myRayService.Spec.RayClusterSpec.RayVersion = oldRayVersion + return k8sClient.Update(ctx, myRayService) + }) + Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource") + + // Enable zero-downtime upgrade again. + os.Unsetenv("ENABLE_ZERO_DOWNTIME") + + // Zero-downtime upgrade should not be triggered. + Consistently( + getPreparingRayClusterNameFunc(ctx, myRayService), + time.Second*5, time.Millisecond*500).Should(BeEmpty(), "Pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName) + }) + It("Autoscaler updates the active RayCluster and should not switch to a new RayCluster", func() { // Simulate autoscaler by updating the active RayCluster directly. Note that the autoscaler // will not update the RayService directly. diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 52792b4e7a..ef965b000f 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -3,6 +3,7 @@ package ray import ( "context" "fmt" + "os" "reflect" "testing" "time" @@ -682,6 +683,114 @@ applications: assert.True(t, shouldCreate) } +func TestReconcileRayCluster(t *testing.T) { + defer os.Unsetenv(ENABLE_ZERO_DOWNTIME) + // Create a new scheme with CRDs schemes. + newScheme := runtime.NewScheme() + _ = rayv1.AddToScheme(newScheme) + + ctx := context.TODO() + namespace := "ray" + rayService := rayv1.RayService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: namespace, + }, + Status: rayv1.RayServiceStatuses{}, + } + + hash, err := generateRayClusterJsonHash(rayService.Spec.RayClusterSpec) + assert.Nil(t, err) + activeCluster := rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "active-cluster", + Namespace: namespace, + Annotations: map[string]string{ + common.RayServiceClusterHashKey: hash, + }, + }, + } + + tests := map[string]struct { + activeCluster *rayv1.RayCluster + updateRayClusterSpec bool + enableZeroDowntime bool + shouldPrepareNewCluster bool + }{ + // Test 1: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set. + "Zero-downtime upgrade is enabled. Neither active nor pending clusters exist.": { + activeCluster: nil, + updateRayClusterSpec: false, + enableZeroDowntime: true, + shouldPrepareNewCluster: true, + }, + // Test 2: The active cluster exists, but the pending cluster does not exist. + "Zero-downtime upgrade is enabled. The active cluster exists, but the pending cluster does not exist.": { + activeCluster: activeCluster.DeepCopy(), + updateRayClusterSpec: false, + enableZeroDowntime: true, + shouldPrepareNewCluster: false, + }, + // Test 3: The active cluster exists. Trigger the zero-downtime upgrade. + "Zero-downtime upgrade is enabled. The active cluster exists. Trigger the zero-downtime upgrade.": { + activeCluster: activeCluster.DeepCopy(), + updateRayClusterSpec: true, + enableZeroDowntime: true, + shouldPrepareNewCluster: true, + }, + // Test 4: The active cluster exists. Trigger the zero-downtime upgrade. + "Zero-downtime upgrade is disabled. The active cluster exists. Trigger the zero-downtime upgrade.": { + activeCluster: activeCluster.DeepCopy(), + updateRayClusterSpec: true, + enableZeroDowntime: false, + shouldPrepareNewCluster: false, + }, + // Test 5: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set. + "Zero-downtime upgrade is disabled. Neither active nor pending clusters exist.": { + activeCluster: nil, + updateRayClusterSpec: false, + enableZeroDowntime: false, + shouldPrepareNewCluster: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + // Enable or disable zero-downtime upgrade. + defer os.Unsetenv(ENABLE_ZERO_DOWNTIME) + if !tc.enableZeroDowntime { + os.Setenv(ENABLE_ZERO_DOWNTIME, "false") + } + runtimeObjects := []runtime.Object{} + if tc.activeCluster != nil { + runtimeObjects = append(runtimeObjects, tc.activeCluster.DeepCopy()) + } + fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build() + r := RayServiceReconciler{ + Client: fakeClient, + Log: ctrl.Log.WithName("controllers").WithName("RayService"), + } + service := rayService.DeepCopy() + if tc.updateRayClusterSpec { + service.Spec.RayClusterSpec.RayVersion = "new-version" + } + if tc.activeCluster != nil { + service.Status.ActiveServiceStatus.RayClusterName = tc.activeCluster.Name + } + assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName) + _, _, err = r.reconcileRayCluster(ctx, service) + assert.Nil(t, err) + + // If KubeRay operator is preparing a new cluster, the `PendingServiceStatus.RayClusterName` should be set by calling the function `markRestart`. + if tc.shouldPrepareNewCluster { + assert.NotEqual(t, "", service.Status.PendingServiceStatus.RayClusterName) + } else { + assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName) + } + }) + } +} + func initFakeDashboardClient(appName string, deploymentStatus string, appStatus string) utils.RayDashboardClientInterface { fakeDashboardClient := utils.FakeRayDashboardClient{} status := generateServeStatus(deploymentStatus, appStatus)