Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][RayCluster]: introduce RayClusterSuspending and RayClusterSuspended conditions #2403

Merged

Conversation

rueian
Copy link
Contributor

@rueian rueian commented Sep 27, 2024

Why are these changes needed?

Resolves #2277

This PR introduces a new status condition: RayClusterSuspend with two variations, RayClusterSuspending and RayClusterSuspended.

This PR introduces two new status conditions: RayClusterSuspending and RayClusterSuspended.

The new conditions will populated by default with metav1.ConditionFalse if the features.Enabled(features.RayClusterStatusConditions) is enabled.

When the gate is enabled, we rely on the new RayClusterSuspending to suspend a RayCluster atomically. When a user turns on the .Spec.Suspend of a RayCluster, we will enable the RayClusterSuspending and persist it before actually stopping all Pods. We will stop all Pods in the next reconciliation and do not consider the .Spec.Suspend again before switching to the RayClusterSuspend.

If the condition RayClusterSuspend is enabled and the .Spec.Suspend is turned off by the user, we will disable the condition by setting it to metav1.ConditionFalse, so that we can preserve the transition timestamp for more observability.

In summary, this is the status BEFORE suspending a RayCluster:

image

This is the status DURING suspending a RayCluster:

image

This is the status AFTER it is suspended:

image

This is the status AFTER the cluster is resumed:

image

Related issue number

#2277

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests

@rueian rueian force-pushed the feat-raycluster-suspending-condition branch from 5f82269 to fc13eab Compare September 27, 2024 19:51
@rueian rueian force-pushed the feat-raycluster-suspending-condition branch from a2a618e to 8efab0c Compare September 28, 2024 00:33
@rueian rueian marked this pull request as ready for review September 28, 2024 00:55
@rueian
Copy link
Contributor Author

rueian commented Sep 28, 2024

Hi @kevin85421, please take a look when you are available.

@kevin85421 kevin85421 self-assigned this Oct 4, 2024
@kevin85421
Copy link
Member

I will review the PR tomorrow.

ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
if suspendStatus == rayv1.RayClusterSuspending ||
(!features.Enabled(features.RayClusterStatusConditions) && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to send a deleteAllPods request to the K8s API server for each reconciliation during Suspending?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do that until all pods are deleted and then we switch to suspended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it complex to call deleteAllPods only when there are running Pods in the cluster? If not, we can update in this PR. If it is not trivial, can you open an issue to track the progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After revising the source, I found that deleteAllPods already did the improvement: only sends API requests if there are Pods still running.

func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common.AssociationOptions) (pods corev1.PodList, err error) {
logger := ctrl.LoggerFrom(ctx)
if err = r.List(ctx, &pods, filters.ToListOptions()...); err != nil {
return pods, err
}
active := 0
for _, pod := range pods.Items {
if pod.DeletionTimestamp.IsZero() {
active++
}
}
if active > 0 {
logger.Info("Deleting all Pods with labels", "filters", filters, "Number of active Pods", active)
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, filters.ToDeleteOptions()...)
}
return pods, nil

I guess there is no need for an issue to track the progress?

ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
@kevin85421
Copy link
Member

Btw, we may also ping some Kueue folks to review this PR before merge.

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion offline:

  1. Use two conditions: suspending and suspended.
  2. Return a boolean to determine whether to requeue the CR immediately.

@rueian rueian force-pushed the feat-raycluster-suspending-condition branch from 2d9fc54 to 4fd2ec9 Compare October 5, 2024 00:24
@rueian rueian force-pushed the feat-raycluster-suspending-condition branch from 4fd2ec9 to ee39d7e Compare October 5, 2024 00:32
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main motivation to add suspending is to make the suspend operation to be atomic. Can you add a test to test the atomicity of the suspend operation? You can refer to

Describe("RayJob suspend operation shoud be atomic", Ordered, func() {

ray-operator/apis/ray/v1/raycluster_types.go Outdated Show resolved Hide resolved
ray-operator/apis/ray/v1/raycluster_types.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller_test.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
@rueian rueian force-pushed the feat-raycluster-suspending-condition branch from 55d435a to c8e8924 Compare October 6, 2024 05:25
@rueian rueian force-pushed the feat-raycluster-suspending-condition branch from c8e8924 to f7fe429 Compare October 6, 2024 05:40
@rueian
Copy link
Contributor Author

rueian commented Oct 6, 2024

The main motivation to add suspending is to make the suspend operation to be atomic. Can you add a test to test the atomicity of the suspend operation? You can refer to

Describe("RayJob suspend operation shoud be atomic", Ordered, func() {

Ok, I added it with the same finalizer technique to the existing test. And all the above suggestions are resolved too.

ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
if suspendStatus == rayv1.RayClusterSuspending ||
(!features.Enabled(features.RayClusterStatusConditions) && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it complex to call deleteAllPods only when there are running Pods in the cluster? If not, we can update in this PR. If it is not trivial, can you open an issue to track the progress?

ray-operator/controllers/ray/utils/util.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/utils/util.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/utils/util.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller_test.go Outdated Show resolved Hide resolved
})
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")

// Remove the finalizers one by one to make sure the suspending process is still ongoing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate the atomicity test into its own test? I want to avoid mixing both code paths in a single test. In the atomicity test, can you verify the following:

  1. Add finalizers to the Pods.
  2. Set suspend to true, and check that the suspending condition is true.
  3. Set suspend back to false, and ensure the suspending condition remains true for 2 to 3 seconds (use Consistently).
  4. Remove the finalizers, and verify that both suspending and suspended conditions become false, all old Pods are deleted, and new Pods are created.
  5. Set suspend to true, and all Pods should be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, a new test is added.

@@ -33,7 +33,7 @@ func listResourceFunc(ctx context.Context, workerPods *corev1.PodList, opt ...cl

count := 0
for _, aPod := range workerPods.Items {
if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && aPod.DeletionTimestamp == nil {
if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && (aPod.DeletionTimestamp == nil || len(aPod.Finalizers) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to update this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, the listResourceFunc will always return 0 even if Pods are not yet deleted because of their Finalizers, and make the following assertion fail.

Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500).
	Should(Equal(numPods), fmt.Sprintf("all pods %v", allPods.Items))

I found this failure very misleading and, most importantly, without the change, we can't make sure those Pods still exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the following example. However, Pod.Status.Phase is running. Does the k8sClient.List do anything special for Pods with finalizers?

kubectl run curl --image=radial/busyboxplus:curl -i --tty

# Add a finalizer
kubectl patch pod curl --type='json' -p='[{"op": "add", "path": "/metadata/finalizers", "value":["ray.io/test"]}]'  

# Delete the Pod
kubectl delete pod curl

kubectl edit pod curl
# metadata.deletionTimestamp: "2024-10-08T17:49:04Z"
# status.phase: Running 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it does something special, would you mind adding some comments to describe it?

Copy link
Contributor Author

@rueian rueian Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the k8sClient.List does anything special. The behavior you observed was expected but since the deletionTimestamp was not zero, the pod would not be counted to the return value of listResourceFunc.

That made listResourceFunc always return 0 even though those Pods did still exist. Therefore, I want it to count Pods with non-zero deletionTimestamps and finalizers as well, otherwise, the 0 returned is too misleading.

@@ -479,18 +506,17 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended))
if withConditionEnabled {
Eventually(findRayClusterSuspendStatus, time.Second*3, time.Millisecond*500).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly add comments to describe which fields should be reset when the status transitions to Suspended?

@@ -573,6 +590,124 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
if withConditionEnabled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly add comments to describe which fields should be updated when the status transitions?

@kevin85421
Copy link
Member

What's the YAML you use? I am wondering why "Desired CPU ..." is 0.

@@ -33,7 +33,7 @@ func listResourceFunc(ctx context.Context, workerPods *corev1.PodList, opt ...cl

count := 0
for _, aPod := range workerPods.Items {
if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && aPod.DeletionTimestamp == nil {
if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && (aPod.DeletionTimestamp == nil || len(aPod.Finalizers) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the following example. However, Pod.Status.Phase is running. Does the k8sClient.List do anything special for Pods with finalizers?

kubectl run curl --image=radial/busyboxplus:curl -i --tty

# Add a finalizer
kubectl patch pod curl --type='json' -p='[{"op": "add", "path": "/metadata/finalizers", "value":["ray.io/test"]}]'  

# Delete the Pod
kubectl delete pod curl

kubectl edit pod curl
# metadata.deletionTimestamp: "2024-10-08T17:49:04Z"
# status.phase: Running 

@@ -33,7 +33,7 @@ func listResourceFunc(ctx context.Context, workerPods *corev1.PodList, opt ...cl

count := 0
for _, aPod := range workerPods.Items {
if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && aPod.DeletionTimestamp == nil {
if (reflect.DeepEqual(aPod.Status.Phase, corev1.PodRunning) || reflect.DeepEqual(aPod.Status.Phase, corev1.PodPending)) && (aPod.DeletionTimestamp == nil || len(aPod.Finalizers) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it does something special, would you mind adding some comments to describe it?

@rueian
Copy link
Contributor Author

rueian commented Oct 8, 2024

What's the YAML you use? I am wondering why "Desired CPU ..." is 0.

I used a RayCluster CR with no k8s resource requests. That was the reason why its CR status showed 0s.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] REP 54: Replace rayv1.Suspended with RayClusterSuspended condition
2 participants