Skip to content

Commit

Permalink
[Feature][RayCluster]: introduce RayClusterSuspending and RayClusterS…
Browse files Browse the repository at this point in the history
…uspended conditions

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Oct 5, 2024
1 parent 8efab0c commit ee39d7e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 52 deletions.
8 changes: 4 additions & 4 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ type RayClusterConditionType string
const (
AllPodRunningAndReadyFirstTime = "AllPodRunningAndReadyFirstTime"
RayClusterPodsProvisioning = "RayClusterPodsProvisioning"
RayClusterSuspended = "RayClusterSuspended"
RayClusterSuspending = "RayClusterSuspending"
HeadPodNotFound = "HeadPodNotFound"
HeadPodRunningAndReady = "HeadPodRunningAndReady"
// UnknownReason says that the reason for the condition is unknown.
Expand All @@ -188,8 +186,10 @@ const (
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
// RayClusterSuspend is added in a RayCluster when a user change its .Spec.Suspend to true
RayClusterSuspend RayClusterConditionType = "RayClusterSuspend"
// RayClusterSuspending is added in a RayCluster when a user change its .Spec.Suspend to true.
RayClusterSuspending RayClusterConditionType = "RayClusterSuspending"
// RayClusterSuspended is added in a RayCluster when it is suspended from the suspending condition.
RayClusterSuspended RayClusterConditionType = "RayClusterSuspended"
)

// HeadInfo gives info about head
Expand Down
96 changes: 57 additions & 39 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ var (

// Definition of a index field for pod name
podUIDIndexField = "metadata.uid"

// errSuspendCondition, errSuspending and errResuming are internal errors for shortcutting the reconcilePods.
errSuspendCondition = errstd.New("suspend condition changed")
errSuspending = fmt.Errorf("suspending: %w", errSuspendCondition)
errResuming = fmt.Errorf("resuming: %w", errSuspendCondition)
)

// getDiscoveryClient returns a discovery client for the current reconciler
Expand Down Expand Up @@ -335,7 +330,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
}

for _, fn := range reconcileFuncs {
if reconcileErr = fn(ctx, instance); reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) {
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
break
Expand All @@ -345,10 +340,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr)
var updateErr error
var inconsistent bool
if calculateErr != nil {
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
} else {
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
inconsistent, updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
}

// Return error based on order.
Expand All @@ -360,7 +356,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
} else {
err = updateErr
}
if err != nil {
if err != nil || inconsistent {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

Expand Down Expand Up @@ -621,8 +617,9 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv

// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions)
if suspendStatus == rayv1.RayClusterSuspending ||
(!features.Enabled(features.RayClusterStatusConditions) && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
(!statusConditionGateEnabled && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod),
"Failed deleting Pods due to suspension for RayCluster %s/%s, %v",
Expand All @@ -636,16 +633,13 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

if features.Enabled(features.RayClusterStatusConditions) {
if statusConditionGateEnabled {
if suspendStatus == rayv1.RayClusterSuspended {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
return errResuming
}
return nil // stop reconcilePods because the cluster is suspended.
}
// (suspendStatus != rayv1.RayClusterSuspending) is always true here because it has been checked above.
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
return errSuspending
return nil // stop reconcilePods because the cluster is going to suspend.
}
}

Expand Down Expand Up @@ -1190,27 +1184,16 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster, reconcileErr error) (*rayv1.RayCluster, error) {
// TODO: Replace this log and use reconcileErr to set the condition field.
logger := ctrl.LoggerFrom(ctx)
if reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) {
if reconcileErr != nil {
logger.Info("Reconciliation error", "error", reconcileErr)
}

// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

if features.Enabled(features.RayClusterStatusConditions) {
if errstd.Is(reconcileErr, errSuspending) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspending,
Status: metav1.ConditionTrue,
})
} else if errstd.Is(reconcileErr, errResuming) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspended,
Status: metav1.ConditionFalse,
})
} else if reconcileErr != nil {
statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions)
if statusConditionGateEnabled {
if reconcileErr != nil {
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterReplicaFailure),
Expand Down Expand Up @@ -1251,7 +1234,7 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}

// Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended.
if features.Enabled(features.RayClusterStatusConditions) {
if statusConditionGateEnabled {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1290,13 +1273,47 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}

suspendStatus := utils.FindRayClusterSuspendStatus(newInstance)
if suspendStatus == rayv1.RayClusterSuspending && len(runtimePods.Items) == 0 {
meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned))
if suspendStatus == rayv1.RayClusterSuspending {
if len(runtimePods.Items) == 0 {
meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned))
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionFalse,
})
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionTrue,
})
}
} else if suspendStatus == rayv1.RayClusterSuspended {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionFalse,
})
}
} else {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspended,
Status: metav1.ConditionTrue,
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionFalse,
})
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionTrue,
})
} else {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionFalse,
})
}
}
}

Expand Down Expand Up @@ -1555,17 +1572,18 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
return nil
}

func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) (bool, error) {
logger := ctrl.LoggerFrom(ctx)
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
return nil
inconsistent := r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status)
if !inconsistent {
return inconsistent, nil
}
logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
err := r.Status().Update(ctx, newInstance)
if err != nil {
logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance)
}
return err
return inconsistent, err
}

// sumGPUs sums the GPUs in the given resource list.
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ var _ = Context("Inside the default namespace", func() {
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Suspended))
if withConditionEnabled {
Eventually(
func() string {
func() rayv1.RayClusterConditionType {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return ""
}
Expand Down Expand Up @@ -554,7 +554,7 @@ var _ = Context("Inside the default namespace", func() {

if withConditionEnabled {
Eventually(
func() string {
func() rayv1.RayClusterConditionType {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return ""
}
Expand Down Expand Up @@ -603,7 +603,7 @@ var _ = Context("Inside the default namespace", func() {
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
if withConditionEnabled {
Eventually(
func() string {
func() rayv1.RayClusterConditionType {
if err := getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster)(); err != nil {
return ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,8 +1283,9 @@ func TestReconcile_UpdateClusterReason(t *testing.T) {

newTestRayCluster := testRayCluster.DeepCopy()
newTestRayCluster.Status.Reason = reason
err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster)
inconsistent, err := testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster)
assert.Nil(t, err, "Fail to update cluster reason")
assert.True(t, inconsistent)

err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster after updating reason")
Expand Down Expand Up @@ -1563,8 +1564,9 @@ func TestReconcile_UpdateClusterState(t *testing.T) {
state := rayv1.Ready
newTestRayCluster := testRayCluster.DeepCopy()
newTestRayCluster.Status.State = state //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster)
inconsistent, err := testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster)
assert.Nil(t, err, "Fail to update cluster state")
assert.True(t, inconsistent)

err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster after updating state")
Expand Down
7 changes: 3 additions & 4 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,12 @@ func FindHeadPodReadyCondition(headPod *corev1.Pod) metav1.Condition {
return headPodReadyCondition
}

func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) string {
func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) rayv1.RayClusterConditionType {
for _, cond := range instance.Status.Conditions {
if cond.Type == string(rayv1.RayClusterSuspend) {
if cond.Type == string(rayv1.RayClusterSuspending) || cond.Type == string(rayv1.RayClusterSuspended) {
if cond.Status == metav1.ConditionTrue {
return cond.Reason
return rayv1.RayClusterConditionType(cond.Type)
}
break
}
}
return ""
Expand Down

0 comments on commit ee39d7e

Please sign in to comment.