Skip to content

Commit

Permalink
[RayCluster] support suspending worker groups (#2663)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim authored Dec 27, 2024
1 parent efbd35e commit a788963
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ _Appears in:_

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.<br />A suspended worker group will have all pods deleted. | | |
| `groupName` _string_ | we can have multiple worker groups, we distinguish them by name | | |
| `replicas` _integer_ | Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional. | 0 | |
| `minReplicas` _integer_ | MinReplicas denotes the minimum number of desired Pods for this worker group. | 0 | |
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type HeadGroupSpec struct {

// WorkerGroupSpec are the specs for the worker pods
type WorkerGroupSpec struct {
// Suspend indicates whether a worker group should be suspended.
// A suspended worker group will have all pods deleted.
Suspend *bool `json:"suspend,omitempty"`
// we can have multiple worker groups, we distinguish them by name
GroupName string `json:"groupName"`
// Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional.
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,18 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return err
}

// Delete all workers if worker group is suspended and skip reconcile
if worker.Suspend != nil && *worker.Suspend {
if _, err := r.deleteAllPods(ctx, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod),
"Failed deleting worker Pods for suspended group %s in RayCluster %s/%s, %v", worker.GroupName, instance.Namespace, instance.Name, err)
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod),
"Deleted all pods for suspended worker group %s in RayCluster %s/%s", worker.GroupName, instance.Namespace, instance.Name)
continue
}

// Delete unhealthy worker Pods.
deletedWorkers := make(map[string]struct{})
deleted := struct{}{}
Expand Down
52 changes: 52 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,16 @@ var _ = Context("Inside the default namespace", func() {
})
}

updateRayClusterWorkerGroupSuspendField := func(ctx context.Context, rayCluster *rayv1.RayCluster, suspend bool) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: rayCluster.Namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "rayCluster = %v", rayCluster)
rayCluster.Spec.WorkerGroupSpecs[0].Suspend = &suspend
return k8sClient.Update(ctx, rayCluster)
})
}

findRayClusterSuspendStatus := func(ctx context.Context, rayCluster *rayv1.RayCluster) (rayv1.RayClusterConditionType, error) {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: rayCluster.Namespace}, rayCluster)(); err != nil {
return "", err
Expand Down Expand Up @@ -809,6 +819,48 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("Suspend RayCluster worker group", Ordered, func() {
ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-suspend-workergroup", namespace)
allPods := corev1.PodList{}
allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions()
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Eventually(getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name)
})

It("Check the number of Pods and add finalizers", func() {
Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500).
Should(Equal(4), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Setting suspend=true in first worker group should not fail", func() {
// suspend the Raycluster worker group
err := updateRayClusterWorkerGroupSuspendField(ctx, rayCluster, true)
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")
})

It("Worker pods should be deleted but head pod should still be running", func() {
Eventually(listResourceFunc(ctx, &allPods, workerFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(0), fmt.Sprintf("all pods %v", allPods.Items))
Eventually(listResourceFunc(ctx, &allPods, headFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items))
Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Delete the cluster", func() {
err := k8sClient.Delete(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred())
})
})

Describe("RayCluster with a multi-host worker group", Ordered, func() {
ctx := context.Background()
namespace := "default"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a788963

Please sign in to comment.