Skip to content

Commit

Permalink
[Feature][RayCluster]: Implement the HeadReady condition (ray-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
cchen777 authored Jul 28, 2024
1 parent 28c729f commit 5062a8c
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 11 deletions.
16 changes: 16 additions & 0 deletions helm-chart/kuberay-operator/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ Create the name of the service account to use
{{- end -}}
{{- end -}}


{{/*
FeatureGates
*/}}
{{- define "kuberay.featureGates" -}}
{{- $features := "" }}
{{- range .Values.featureGates }}
{{- $str := printf "%s=%t," .name .enabled }}
{{- $features = print $features $str }}
{{- end }}
{{- with .Values.featureGates }}
--feature-gates={{ $features | trimSuffix "," }}
{{- end }}
{{- end }}


{{/*
Create a template to ensure consistency for Role and ClusterRole.
*/}}
Expand Down
1 change: 1 addition & 0 deletions helm-chart/kuberay-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
- /manager
args:
{{- $argList := list -}}
{{- $argList = append $argList (include "kuberay.featureGates" . | trim) -}}
{{- if .Values.batchScheduler.enabled -}}
{{- $argList = append $argList "--enable-batch-scheduler" -}}
{{- end -}}
Expand Down
5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ readinessProbe:
batchScheduler:
enabled: false

featureGates:
- name: RayClusterStatusConditions
enabled: false


# Set up `securityContext` to improve Pod security.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/pod-security.md for further guidance.
podSecurityContext: {}
Expand Down
12 changes: 10 additions & 2 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,17 @@ type RayClusterStatus struct {

type RayClusterConditionType string

// Custom Reason for RayClusterCondition
const (
// HeadReady is added in a RayCluster when its Head Pod is ready for requests.
HeadReady RayClusterConditionType = "HeadReady"
// PodRunningAndReady says that the pod is running and ready.
PodRunningAndReady = "PodRunningAndReady"
// UnknownReason says that the reason for the condition is unknown.
UnknownReason = "Unknown"
)

const (
// HeadPodReady is added in a RayCluster when its Head Pod is ready for requests.
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
)
Expand Down
20 changes: 20 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,26 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.State = rayv1.Ready
}

// Check if the head node is running and ready by checking the head pod's status.
if features.Enabled(features.RayClusterStatusConditions) {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
if err != nil {
return nil, err
}
// GetRayClusterHeadPod can return nil, nil when pod is not found, we handle it separately.
if headPod == nil {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
Reason: "HeadPodNotFound",
Message: "Head Pod not found",
})
} else {
replicaHeadPodReadyCondition := utils.FindPodReadyCondition(headPod, rayv1.HeadPodReady)
meta.SetStatusCondition(&newInstance.Status.Conditions, replicaHeadPodReadyCondition)
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
newInstance.Status.State = rayv1.Suspended
}
Expand Down
41 changes: 40 additions & 1 deletion ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,12 @@ func TestCalculateStatus(t *testing.T) {
Status: corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
runtimeObjects := []runtime.Object{headPod, headService}
Expand Down Expand Up @@ -1705,8 +1711,41 @@ func TestCalculateStatus(t *testing.T) {
assert.Nil(t, err)
assert.Empty(t, newInstance.Status.Conditions)

// Test reconcilePodsErr with the feature gate enabled
// enable feature gate for the following tests
defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)()

// Test CheckRayHeadRunningAndReady with head pod running and ready
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionTrue))
condition := meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionTrue, condition.Status)

// Test CheckRayHeadRunningAndReady with head pod not ready
headPod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionFalse,
},
}
runtimeObjects = []runtime.Object{headPod, headService}
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r.Client = fakeClient
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionFalse, condition.Status)

// Test CheckRayHeadRunningAndReady with head pod not running
headPod.Status.Phase = corev1.PodFailed
runtimeObjects = []runtime.Object{headPod, headService}
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r.Client = fakeClient
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionFalse, condition.Status)

// Test reconcilePodsErr with the feature gate enabled
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
assert.Nil(t, err)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))
Expand Down
28 changes: 20 additions & 8 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@ import (
networkingv1 "k8s.io/api/networking/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

cmap "github.com/orcaman/concurrent-map/v2"

"github.com/go-logr/logr"
fmtErrors "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -1055,13 +1058,22 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
// after the head pod is running and ready. Hence, some requests to the Dashboard (e.g. `UpdateDeployments`) may fail.
// This is not an issue since `UpdateDeployments` is an idempotent operation.
logger.Info("Check the head Pod status of the pending RayCluster", "RayCluster name", rayClusterInstance.Name)
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")

// check the latest condition of the head Pod to see if it is ready.
if features.Enabled(features.RayClusterStatusConditions) {
if !meta.IsStatusConditionTrue(rayClusterInstance.Status.Conditions, string(rayv1.HeadPodReady)) {
logger.Info("The head Pod is not ready, requeue the resource event to avoid redundant custom resource status updates.")
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, nil
}
} else {
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")
}
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

// TODO(architkulkarni): Check the RayVersion. If < 2.8.0, error.
Expand Down
32 changes: 32 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,38 @@ func IsCreated(pod *corev1.Pod) bool {
return pod.Status.Phase != ""
}

func FindPodReadyCondition(pod *corev1.Pod, condType rayv1.RayClusterConditionType) metav1.Condition {
replicaPodReadyCondition := metav1.Condition{
Type: string(condType),
Status: metav1.ConditionFalse,
Reason: rayv1.UnknownReason,
}

for _, cond := range pod.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
// Set the status based on the PodReady condition
replicaPodReadyCondition.Status = metav1.ConditionStatus(cond.Status)
replicaPodReadyCondition.Message = cond.Message

// Determine the reason; default to PodRunningAndReady if the pod is ready but no specific reason is provided
reason := cond.Reason
if cond.Status == corev1.ConditionTrue && reason == "" {
reason = rayv1.PodRunningAndReady
}

// Update the reason if it's not empty
if reason != "" {
replicaPodReadyCondition.Reason = reason
}

// Since we're only interested in the PodReady condition, break after processing it
break
}
return replicaPodReadyCondition
}

// IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
func IsRunningAndReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
Expand Down
61 changes: 61 additions & 0 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,31 @@ func createSomePodWithCondition(typ corev1.PodConditionType, status corev1.Condi
}
}

func createRayHeadPodWithPhaseAndCondition(phase corev1.PodPhase, typ corev1.PodConditionType, status corev1.ConditionStatus) (pod *corev1.Pod) {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-sample-head",
Namespace: "default",
Labels: map[string]string{
"ray.io/node-type": string(rayv1.HeadNode),
},
},
Status: corev1.PodStatus{
Phase: phase,
Conditions: []corev1.PodCondition{
{
Type: typ,
Status: status,
},
},
},
}
}

func TestGetHeadGroupServiceAccountName(t *testing.T) {
tests := map[string]struct {
input *rayv1.RayCluster
Expand Down Expand Up @@ -588,6 +613,42 @@ env_vars:
}
}

func TestFindHeadPodReadyCondition(t *testing.T) {
tests := map[string]struct {
pod *corev1.Pod
expected metav1.Condition
}{
"condition true if Ray head pod is running and ready": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionTrue),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionTrue,
},
},
"condition false if Ray head pod is not running": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodPending, corev1.PodReady, corev1.ConditionFalse),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
},
},
"condition false if Ray head pod is not ready": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionFalse),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
replicaHeadPodReadyCondition := FindPodReadyCondition(tc.pod, rayv1.HeadPodReady)
assert.Equal(t, tc.expected.Status, replicaHeadPodReadyCondition.Status)
})
}
}

func TestErrRayClusterReplicaFailureReason(t *testing.T) {
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod")
Expand Down

0 comments on commit 5062a8c

Please sign in to comment.