Skip to content

Commit

Permalink
[Feature][RayCluster]: introduce RayClusterSuspending and RayClusterS…
Browse files Browse the repository at this point in the history
…uspended conditions

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Sep 27, 2024
1 parent b9d44f5 commit fc13eab
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 33 deletions.
8 changes: 4 additions & 4 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type RayClusterConditionType string
const (
AllPodRunningAndReadyFirstTime = "AllPodRunningAndReadyFirstTime"
RayClusterPodsProvisioning = "RayClusterPodsProvisioning"
RayClusterSuspended = "RayClusterSuspended"
RayClusterSuspending = "RayClusterSuspending"
HeadPodNotFound = "HeadPodNotFound"
HeadPodRunningAndReady = "HeadPodRunningAndReady"
// UnknownReason says that the reason for the condition is unknown.
Expand All @@ -186,10 +188,8 @@ const (
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
// RayClusterSuspending is added in a RayCluster when a user change its .Spec.Suspend to true
RayClusterSuspending RayClusterConditionType = "Suspending"
// RayClusterSuspended is added in a RayCluster when all of its pods have been stopped.
RayClusterSuspended RayClusterConditionType = "Suspended"
// RayClusterSuspend is added in a RayCluster when a user change its .Spec.Suspend to true
RayClusterSuspend RayClusterConditionType = "RayClusterSuspend"
)

// HeadInfo gives info about head
Expand Down
67 changes: 38 additions & 29 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ var (
// Definition of a index field for pod name
podUIDIndexField = "metadata.uid"

requeueMark = errstd.New("requeue")
errSuspendCondition = errstd.New("suspend condition changed")
errSuspending = fmt.Errorf("suspending: %w", errSuspendCondition)
errResuming = fmt.Errorf("resuming: %w", errSuspendCondition)
)

// getDiscoveryClient returns a discovery client for the current reconciler
Expand Down Expand Up @@ -332,7 +334,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
}

for _, fn := range reconcileFuncs {
if reconcileErr = fn(ctx, instance); reconcileErr != nil && !errstd.Is(reconcileErr, requeueMark) {
if reconcileErr = fn(ctx, instance); reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) {
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
break
Expand All @@ -357,7 +359,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
} else {
err = updateErr
}
if err != nil { // including the requeueMark
if err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

Expand Down Expand Up @@ -616,26 +618,9 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

if features.Enabled(features.RayClusterStatusConditions) {
if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspended)) {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
meta.RemoveStatusCondition(&instance.Status.Conditions, string(rayv1.RayClusterSuspended))
return requeueMark
}
} else if !meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending)) {
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionTrue,
})
return requeueMark
}
}
}

// if RayCluster is suspended, delete all pods and skip reconcile
if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending)) ||
// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
if suspendStatus == rayv1.RayClusterSuspending ||
(!features.Enabled(features.RayClusterStatusConditions) && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod),
Expand All @@ -650,6 +635,18 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

if features.Enabled(features.RayClusterStatusConditions) {
if suspendStatus == rayv1.RayClusterSuspended {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
return errResuming
}
} else {
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
return errSuspending
}
}
}

// check if all the pods exist
headPods := corev1.PodList{}
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
Expand Down Expand Up @@ -1191,15 +1188,27 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster, reconcileErr error) (*rayv1.RayCluster, error) {
// TODO: Replace this log and use reconcileErr to set the condition field.
logger := ctrl.LoggerFrom(ctx)
if reconcileErr != nil && !errstd.Is(reconcileErr, requeueMark) {
if reconcileErr != nil && !errstd.Is(reconcileErr, errSuspendCondition) {
logger.Info("Reconciliation error", "error", reconcileErr)
}

// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

if features.Enabled(features.RayClusterStatusConditions) {
if reconcileErr != nil {
if errstd.Is(reconcileErr, errSuspending) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspending,
Status: metav1.ConditionTrue,
})
} else if errstd.Is(reconcileErr, errResuming) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspended,
Status: metav1.ConditionFalse,
})
} else if reconcileErr != nil {
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterReplicaFailure),
Expand Down Expand Up @@ -1278,12 +1287,12 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}
}

if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending)) && len(runtimePods.Items) == 0 {
suspendStatus := utils.FindRayClusterSuspendStatus(newInstance)
if suspendStatus == rayv1.RayClusterSuspending && len(runtimePods.Items) == 0 {
meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned))
meta.RemoveStatusCondition(&instance.Status.Conditions, string(rayv1.RayClusterSuspending))
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Type: string(rayv1.RayClusterSuspend),
Reason: rayv1.RayClusterSuspended,
Status: metav1.ConditionTrue,
})
}
Expand Down
12 changes: 12 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ func FindHeadPodReadyCondition(headPod *corev1.Pod) metav1.Condition {
return headPodReadyCondition
}

func FindRayClusterSuspendStatus(instance *rayv1.RayCluster) string {
for _, cond := range instance.Status.Conditions {
if cond.Type == string(rayv1.RayClusterSuspend) {
if cond.Status == metav1.ConditionTrue {
return cond.Reason
}
break
}
}
return ""
}

// IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
func IsRunningAndReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
Expand Down

0 comments on commit fc13eab

Please sign in to comment.