From a788963f36bee9e0c5b7b58a20be0c9ae10f3b04 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 27 Dec 2024 17:15:17 -0500 Subject: [PATCH] [RayCluster] support suspending worker groups (#2663) Signed-off-by: Andrew Sy Kim --- docs/reference/api.md | 1 + .../crds/ray.io_rayclusters.yaml | 2 + .../kuberay-operator/crds/ray.io_rayjobs.yaml | 2 + .../crds/ray.io_rayservices.yaml | 2 + ray-operator/apis/ray/v1/raycluster_types.go | 3 ++ .../apis/ray/v1/zz_generated.deepcopy.go | 5 ++ .../config/crd/bases/ray.io_rayclusters.yaml | 2 + .../config/crd/bases/ray.io_rayjobs.yaml | 2 + .../config/crd/bases/ray.io_rayservices.yaml | 2 + .../controllers/ray/raycluster_controller.go | 12 +++++ .../ray/raycluster_controller_test.go | 52 +++++++++++++++++++ .../ray/v1/workergroupspec.go | 9 ++++ 12 files changed, 94 insertions(+) diff --git a/docs/reference/api.md b/docs/reference/api.md index 5d0a2ed062..69280008d8 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -289,6 +289,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | +| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.
A suspended worker group will have all pods deleted. | | | | `groupName` _string_ | we can have multiple worker groups, we distinguish them by name | | | | `replicas` _integer_ | Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional. | 0 | | | `minReplicas` _integer_ | MinReplicas denotes the minimum number of desired Pods for this worker group. | 0 | | diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 2d4ef992ea..ff40fae94b 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -4154,6 +4154,8 @@ spec: type: string type: array type: object + suspend: + type: boolean template: properties: metadata: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index ffbe90665d..e15a51be5b 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -4174,6 +4174,8 @@ spec: type: string type: array type: object + suspend: + type: boolean template: properties: metadata: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index 1c11cc038e..734a9e7e1e 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -4134,6 +4134,8 @@ spec: type: string type: array type: object + suspend: + type: boolean template: properties: metadata: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index c6c8979a21..6d5750b75a 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -54,6 +54,9 @@ type HeadGroupSpec struct { // WorkerGroupSpec are the specs for the worker pods type WorkerGroupSpec struct { + // Suspend indicates whether a worker group should be suspended. + // A suspended worker group will have all pods deleted. + Suspend *bool `json:"suspend,omitempty"` // we can have multiple worker groups, we distinguish them by name GroupName string `json:"groupName"` // Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional. diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index 586c39ac5a..8f4f6c83e8 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -657,6 +657,11 @@ func (in *SubmitterConfig) DeepCopy() *SubmitterConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerGroupSpec) DeepCopyInto(out *WorkerGroupSpec) { *out = *in + if in.Suspend != nil { + in, out := &in.Suspend, &out.Suspend + *out = new(bool) + **out = **in + } if in.Replicas != nil { in, out := &in.Replicas, &out.Replicas *out = new(int32) diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 2d4ef992ea..ff40fae94b 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4154,6 +4154,8 @@ spec: type: string type: array type: object + suspend: + type: boolean template: properties: metadata: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index ffbe90665d..e15a51be5b 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4174,6 +4174,8 @@ spec: type: string type: array type: object + suspend: + type: boolean template: properties: metadata: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index 1c11cc038e..734a9e7e1e 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4134,6 +4134,8 @@ spec: type: string type: array type: object + suspend: + type: boolean template: properties: metadata: diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 8965c29bfe..1bfa402537 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -776,6 +776,18 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv return err } + // Delete all workers if worker group is suspended and skip reconcile + if worker.Suspend != nil && *worker.Suspend { + if _, err := r.deleteAllPods(ctx, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName)); err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), + "Failed deleting worker Pods for suspended group %s in RayCluster %s/%s, %v", worker.GroupName, instance.Namespace, instance.Name, err) + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) + } + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), + "Deleted all pods for suspended worker group %s in RayCluster %s/%s", worker.GroupName, instance.Namespace, instance.Name) + continue + } + // Delete unhealthy worker Pods. deletedWorkers := make(map[string]struct{}) deleted := struct{}{} diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index 1a1bbb1547..03d92d5785 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -520,6 +520,16 @@ var _ = Context("Inside the default namespace", func() { }) } + updateRayClusterWorkerGroupSuspendField := func(ctx context.Context, rayCluster *rayv1.RayCluster, suspend bool) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: rayCluster.Namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster = %v", rayCluster) + rayCluster.Spec.WorkerGroupSpecs[0].Suspend = &suspend + return k8sClient.Update(ctx, rayCluster) + }) + } + findRayClusterSuspendStatus := func(ctx context.Context, rayCluster *rayv1.RayCluster) (rayv1.RayClusterConditionType, error) { if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: rayCluster.Namespace}, rayCluster)(); err != nil { return "", err @@ -809,6 +819,48 @@ var _ = Context("Inside the default namespace", func() { }) }) + Describe("Suspend RayCluster worker group", Ordered, func() { + ctx := context.Background() + namespace := "default" + rayCluster := rayClusterTemplate("raycluster-suspend-workergroup", namespace) + allPods := corev1.PodList{} + allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions() + workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions() + headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() + + It("Create a RayCluster custom resource", func() { + err := k8sClient.Create(ctx, rayCluster) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster") + Eventually(getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name) + }) + + It("Check the number of Pods and add finalizers", func() { + Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500). + Should(Equal(4), fmt.Sprintf("all pods %v", allPods.Items)) + }) + + It("Setting suspend=true in first worker group should not fail", func() { + // suspend the Raycluster worker group + err := updateRayClusterWorkerGroupSuspendField(ctx, rayCluster, true) + Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster") + }) + + It("Worker pods should be deleted but head pod should still be running", func() { + Eventually(listResourceFunc(ctx, &allPods, workerFilters...), time.Second*5, time.Millisecond*500). + Should(Equal(0), fmt.Sprintf("all pods %v", allPods.Items)) + Eventually(listResourceFunc(ctx, &allPods, headFilters...), time.Second*5, time.Millisecond*500). + Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items)) + Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*5, time.Millisecond*500). + Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items)) + }) + + It("Delete the cluster", func() { + err := k8sClient.Delete(ctx, rayCluster) + Expect(err).NotTo(HaveOccurred()) + }) + }) + Describe("RayCluster with a multi-host worker group", Ordered, func() { ctx := context.Background() namespace := "default" diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go index 7d35b86587..fa95cc1749 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/workergroupspec.go @@ -9,6 +9,7 @@ import ( // WorkerGroupSpecApplyConfiguration represents an declarative configuration of the WorkerGroupSpec type for use // with apply. type WorkerGroupSpecApplyConfiguration struct { + Suspend *bool `json:"suspend,omitempty"` GroupName *string `json:"groupName,omitempty"` Replicas *int32 `json:"replicas,omitempty"` MinReplicas *int32 `json:"minReplicas,omitempty"` @@ -26,6 +27,14 @@ func WorkerGroupSpec() *WorkerGroupSpecApplyConfiguration { return &WorkerGroupSpecApplyConfiguration{} } +// WithSuspend sets the Suspend field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Suspend field is set to the value of the last call. +func (b *WorkerGroupSpecApplyConfiguration) WithSuspend(value bool) *WorkerGroupSpecApplyConfiguration { + b.Suspend = &value + return b +} + // WithGroupName sets the GroupName field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the GroupName field is set to the value of the last call.