Skip to content

Commit

Permalink
[Feature] Add a flag to make zero downtime upgrades optional (#1564)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 authored Oct 25, 2023
1 parent 528abc3 commit 99abccf
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 1 deletion.
4 changes: 4 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ env:
# If not set or set to "true", KubeRay will clean up the Redis storage namespace when a GCS FT-enabled RayCluster is deleted.
# - name: ENABLE_GCS_FT_REDIS_CLEANUP
# value: "true"
# For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously.
# Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades.
# - name: ENABLE_ZERO_DOWNTIME
# value: "true"
14 changes: 13 additions & 1 deletion ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ray
import (
"context"
"fmt"
"os"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -45,6 +46,7 @@ const (
ServiceRestartRequeueDuration = 10 * time.Second
RayClusterDeletionDelayDuration = 60 * time.Second
DeploymentUnhealthySecondThreshold = 300.0 // Dashboard agent related health check.
ENABLE_ZERO_DOWNTIME = "ENABLE_ZERO_DOWNTIME"
)

// RayServiceReconciler reconciles a RayService object
Expand Down Expand Up @@ -367,7 +369,17 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
}

if r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) {
r.markRestart(rayServiceInstance)
// For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously.
// Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades.
enableZeroDowntime := true
if s := os.Getenv(ENABLE_ZERO_DOWNTIME); strings.ToLower(s) == "false" {
enableZeroDowntime = false
}
if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil {
r.markRestart(rayServiceInstance)
} else {
r.Log.Info("Zero-downtime upgrade is disabled (ENABLE_ZERO_DOWNTIME: false). Skip preparing a new RayCluster.")
}
return activeRayCluster, nil, nil
}

Expand Down
42 changes: 42 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ray
import (
"context"
"fmt"
"os"
"time"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
Expand Down Expand Up @@ -377,6 +378,47 @@ applications:
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name)
})

It("Disable zero-downtime upgrade", func() {
// Disable zero-downtime upgrade.
os.Setenv("ENABLE_ZERO_DOWNTIME", "false")

// Try to trigger a zero-downtime upgrade.
oldRayVersion := myRayService.Spec.RayClusterSpec.RayVersion
newRayVersion := "2.198.0"
Expect(oldRayVersion).ShouldNot(Equal(newRayVersion))
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name)
myRayService.Spec.RayClusterSpec.RayVersion = newRayVersion
return k8sClient.Update(ctx, myRayService)
})
Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource")

// Because the zero-downtime upgrade is disabled, the RayService controller will not prepare a new RayCluster.
Consistently(
getPreparingRayClusterNameFunc(ctx, myRayService),
time.Second*5, time.Millisecond*500).Should(BeEmpty(), "Pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName)

// Set the RayVersion back to the old value to avoid triggering the zero-downtime upgrade.
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name)
myRayService.Spec.RayClusterSpec.RayVersion = oldRayVersion
return k8sClient.Update(ctx, myRayService)
})
Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource")

// Enable zero-downtime upgrade again.
os.Unsetenv("ENABLE_ZERO_DOWNTIME")

// Zero-downtime upgrade should not be triggered.
Consistently(
getPreparingRayClusterNameFunc(ctx, myRayService),
time.Second*5, time.Millisecond*500).Should(BeEmpty(), "Pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName)
})

It("Autoscaler updates the active RayCluster and should not switch to a new RayCluster", func() {
// Simulate autoscaler by updating the active RayCluster directly. Note that the autoscaler
// will not update the RayService directly.
Expand Down
109 changes: 109 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ray
import (
"context"
"fmt"
"os"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -682,6 +683,114 @@ applications:
assert.True(t, shouldCreate)
}

func TestReconcileRayCluster(t *testing.T) {
defer os.Unsetenv(ENABLE_ZERO_DOWNTIME)
// Create a new scheme with CRDs schemes.
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)

ctx := context.TODO()
namespace := "ray"
rayService := rayv1.RayService{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: namespace,
},
Status: rayv1.RayServiceStatuses{},
}

hash, err := generateRayClusterJsonHash(rayService.Spec.RayClusterSpec)
assert.Nil(t, err)
activeCluster := rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "active-cluster",
Namespace: namespace,
Annotations: map[string]string{
common.RayServiceClusterHashKey: hash,
},
},
}

tests := map[string]struct {
activeCluster *rayv1.RayCluster
updateRayClusterSpec bool
enableZeroDowntime bool
shouldPrepareNewCluster bool
}{
// Test 1: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set.
"Zero-downtime upgrade is enabled. Neither active nor pending clusters exist.": {
activeCluster: nil,
updateRayClusterSpec: false,
enableZeroDowntime: true,
shouldPrepareNewCluster: true,
},
// Test 2: The active cluster exists, but the pending cluster does not exist.
"Zero-downtime upgrade is enabled. The active cluster exists, but the pending cluster does not exist.": {
activeCluster: activeCluster.DeepCopy(),
updateRayClusterSpec: false,
enableZeroDowntime: true,
shouldPrepareNewCluster: false,
},
// Test 3: The active cluster exists. Trigger the zero-downtime upgrade.
"Zero-downtime upgrade is enabled. The active cluster exists. Trigger the zero-downtime upgrade.": {
activeCluster: activeCluster.DeepCopy(),
updateRayClusterSpec: true,
enableZeroDowntime: true,
shouldPrepareNewCluster: true,
},
// Test 4: The active cluster exists. Trigger the zero-downtime upgrade.
"Zero-downtime upgrade is disabled. The active cluster exists. Trigger the zero-downtime upgrade.": {
activeCluster: activeCluster.DeepCopy(),
updateRayClusterSpec: true,
enableZeroDowntime: false,
shouldPrepareNewCluster: false,
},
// Test 5: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set.
"Zero-downtime upgrade is disabled. Neither active nor pending clusters exist.": {
activeCluster: nil,
updateRayClusterSpec: false,
enableZeroDowntime: false,
shouldPrepareNewCluster: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// Enable or disable zero-downtime upgrade.
defer os.Unsetenv(ENABLE_ZERO_DOWNTIME)
if !tc.enableZeroDowntime {
os.Setenv(ENABLE_ZERO_DOWNTIME, "false")
}
runtimeObjects := []runtime.Object{}
if tc.activeCluster != nil {
runtimeObjects = append(runtimeObjects, tc.activeCluster.DeepCopy())
}
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r := RayServiceReconciler{
Client: fakeClient,
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
}
service := rayService.DeepCopy()
if tc.updateRayClusterSpec {
service.Spec.RayClusterSpec.RayVersion = "new-version"
}
if tc.activeCluster != nil {
service.Status.ActiveServiceStatus.RayClusterName = tc.activeCluster.Name
}
assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName)
_, _, err = r.reconcileRayCluster(ctx, service)
assert.Nil(t, err)

// If KubeRay operator is preparing a new cluster, the `PendingServiceStatus.RayClusterName` should be set by calling the function `markRestart`.
if tc.shouldPrepareNewCluster {
assert.NotEqual(t, "", service.Status.PendingServiceStatus.RayClusterName)
} else {
assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName)
}
})
}
}

func initFakeDashboardClient(appName string, deploymentStatus string, appStatus string) utils.RayDashboardClientInterface {
fakeDashboardClient := utils.FakeRayDashboardClient{}
status := generateServeStatus(deploymentStatus, appStatus)
Expand Down

0 comments on commit 99abccf

Please sign in to comment.