Skip to content

Commit

Permalink
[Feature][RayCluster]: introduce RayClusterSuspending and RayClusterS…
Browse files Browse the repository at this point in the history
…uspended conditions
  • Loading branch information
rueian committed Sep 28, 2024
1 parent 9c37889 commit 8efab0c
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 8 deletions.
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type RayClusterConditionType string
const (
AllPodRunningAndReadyFirstTime = "AllPodRunningAndReadyFirstTime"
RayClusterPodsProvisioning = "RayClusterPodsProvisioning"
RayClusterSuspended = "RayClusterSuspended"
RayClusterSuspending = "RayClusterSuspending"
HeadPodNotFound = "HeadPodNotFound"
HeadPodRunningAndReady = "HeadPodRunningAndReady"
// UnknownReason says that the reason for the condition is unknown.
Expand All @@ -186,6 +188,8 @@ const (
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
// RayClusterSuspend is added in a RayCluster when a user change its .Spec.Suspend to true
RayClusterSuspend RayClusterConditionType = "RayClusterSuspend"
)

// HeadInfo gives info about head
Expand Down
55 changes: 48 additions & 7 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ var (

// Definition of a index field for pod name
podUIDIndexField = "metadata.uid"

// errSuspendCondition, errSuspending and errResuming are internal errors for shortcutting the reconcilePods.
errSuspendCondition = errstd.New("suspend condition changed")
errSuspending = fmt.Errorf("suspending: %w", errSuspendCondition)
errResuming = fmt.Errorf("resuming: %w", errSuspendCondition)
)

// getDiscoveryClient returns a discovery client for the current reconciler
Expand Down Expand Up @@ -330,7 +335,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
}

for _, fn := range reconcileFuncs {
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
if reconcileErr = fn(ctx, instance); reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) {
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
break
Expand Down Expand Up @@ -614,8 +619,10 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
if suspendStatus == rayv1.RayClusterSuspending ||
(!features.Enabled(features.RayClusterStatusConditions) && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod),
"Failed deleting Pods due to suspension for RayCluster %s/%s, %v",
Expand All @@ -629,6 +636,19 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

if features.Enabled(features.RayClusterStatusConditions) {
if suspendStatus == rayv1.RayClusterSuspended {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
return errResuming
}
return nil // stop reconcilePods because the cluster is suspended.
}
// (suspendStatus != rayv1.RayClusterSuspending) is always true here because it has been checked above.
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
return errSuspending
}
}

// check if all the pods exist
headPods := corev1.PodList{}
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
Expand Down Expand Up @@ -1170,15 +1190,27 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster, reconcileErr error) (*rayv1.RayCluster, error) {
// TODO: Replace this log and use reconcileErr to set the condition field.
logger := ctrl.LoggerFrom(ctx)
if reconcileErr != nil {
if reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) {
logger.Info("Reconciliation error", "error", reconcileErr)
}

// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

if features.Enabled(features.RayClusterStatusConditions) {
if reconcileErr != nil {
if errstd.Is(reconcileErr, errSuspending) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspending,
Status: metav1.ConditionTrue,
})
} else if errstd.Is(reconcileErr, errResuming) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspended,
Status: metav1.ConditionFalse,
})
} else if reconcileErr != nil {
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterReplicaFailure),
Expand Down Expand Up @@ -1218,7 +1250,7 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.State = rayv1.Ready //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
}

// Check if the head node is running and ready by checking the head pod's status.
// Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended.
if features.Enabled(features.RayClusterStatusConditions) {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
if err != nil {
Expand All @@ -1239,7 +1271,7 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra

if !meta.IsStatusConditionTrue(newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) {
// RayClusterProvisioned indicates whether all Ray Pods are ready when the RayCluster is first created.
// Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time.
// Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time. Unless the cluster has been suspended.
if utils.CheckAllPodsRunning(ctx, runtimePods) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterProvisioned),
Expand All @@ -1257,6 +1289,15 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}
}

suspendStatus := utils.FindRayClusterSuspendStatus(newInstance)
if suspendStatus == rayv1.RayClusterSuspending && len(runtimePods.Items) == 0 {
meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned))
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspended,
Status: metav1.ConditionTrue,
})
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
Expand Down
53 changes: 52 additions & 1 deletion ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("Suspend RayCluster", Ordered, func() {
testSuspendRayCluster := func(withConditionEnabled bool) {
ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-suspend", namespace)
Expand All @@ -425,6 +425,13 @@ var _ = Context("Inside the default namespace", func() {
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()
allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions()

BeforeAll(func() {
if withConditionEnabled {
cleanUpFunc := features.SetFeatureGateDuringTest(GinkgoTB(), features.RayClusterStatusConditions, true)
DeferCleanup(cleanUpFunc)
}
})

It("Verify RayCluster spec", func() {
// These test are designed based on the following assumptions:
// (1) Ray Autoscaler is disabled.
Expand Down Expand Up @@ -479,6 +486,16 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended))
if withConditionEnabled {
Eventually(
func() string {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return ""
}
return utils.FindRayClusterSuspendStatus(rayCluster)
},
time.Second*10, time.Millisecond*1000).Should(Equal(rayv1.RayClusterSuspended))
}
})

It("Set suspend to false and then revert it to true before all Pods are running", func() {
Expand Down Expand Up @@ -534,6 +551,17 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended))

if withConditionEnabled {
Eventually(
func() string {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return ""
}
return utils.FindRayClusterSuspendStatus(rayCluster)
},
time.Second*10, time.Millisecond*1000).Should(Equal(rayv1.RayClusterSuspended))
}
})

It("Should run all head and worker pods if un-suspended", func() {
Expand Down Expand Up @@ -573,7 +601,30 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getClusterState(ctx, namespace, rayCluster.Name),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
if withConditionEnabled {
Eventually(
func() string {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return ""
}
return utils.FindRayClusterSuspendStatus(rayCluster)
},
time.Second*10, time.Millisecond*1000).Should(BeEmpty())
}
})

It("Delete the cluster", func() {
err := k8sClient.Delete(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred())
})
}

Describe("Suspend RayCluster", Ordered, func() {
testSuspendRayCluster(false)
})

Describe("Suspend RayCluster with Condition", Ordered, func() {
testSuspendRayCluster(true)
})

Describe("RayCluster with a multi-host worker group", Ordered, func() {
Expand Down
12 changes: 12 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ func FindHeadPodReadyCondition(headPod *corev1.Pod) metav1.Condition {
return headPodReadyCondition
}

func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) string {
for _, cond := range instance.Status.Conditions {
if cond.Type == string(rayv1.RayClusterSuspend) {
if cond.Status == metav1.ConditionTrue {
return cond.Reason
}
break
}
}
return ""
}

// IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
func IsRunningAndReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
Expand Down

0 comments on commit 8efab0c

Please sign in to comment.