Skip to content

Commit

Permalink
[GCS FT] Improve GCS FT cleanup UX (#1592)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 authored Nov 1, 2023
1 parent 1f728c5 commit 6fa2d3a
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
Expand All @@ -28,8 +29,10 @@ import (
"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -282,8 +285,12 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
return ctrl.Result{}, nil
}
if redisCleanupJob.Status.Failed > 0 {
r.Log.Info("If the Redis cleanup Job has failed, we will requeue the RayCluster CR after 1 minute.")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+
"You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+
"Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.",
redisCleanupJob.Name, redisCleanupJob.Annotations[common.RayExternalStorageNSAnnotationKey]))
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}
} else {
redisCleanupJob := r.buildRedisCleanupJob(*instance)
Expand Down Expand Up @@ -1034,6 +1041,7 @@ func (r *RayClusterReconciler) buildWorkerPod(instance rayv1.RayCluster, worker
func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) batchv1.Job {
pod := r.buildHeadPod(instance)
pod.Labels[common.RayNodeTypeLabelKey] = string(rayv1.RedisCleanupNode)

// Only keep the Ray container in the Redis cleanup Job.
pod.Spec.Containers = []corev1.Container{pod.Spec.Containers[common.RayContainerIndex]}
pod.Spec.Containers[common.RayContainerIndex].Command = []string{"/bin/bash", "-lc", "--"}
Expand All @@ -1048,9 +1056,34 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b
"parsed = urlparse(redis_address); " +
"sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"",
}

// Disable liveness and readiness probes because the Job will not launch processes like Raylet and GCS.
pod.Spec.Containers[common.RayContainerIndex].LivenessProbe = nil
pod.Spec.Containers[common.RayContainerIndex].ReadinessProbe = nil

// Set the environment variables to ensure that the cleanup Job has at least 60s.
pod.Spec.Containers[common.RayContainerIndex].Env = append(pod.Spec.Containers[common.RayContainerIndex].Env, corev1.EnvVar{
Name: "RAY_redis_db_connect_retries",
Value: "120",
})
pod.Spec.Containers[common.RayContainerIndex].Env = append(pod.Spec.Containers[common.RayContainerIndex].Env, corev1.EnvVar{
Name: "RAY_redis_db_connect_wait_milliseconds",
Value: "500",
})

// The container's resource consumption remains constant. so hard-coding the resources is acceptable.
// In addition, avoid using the GPU for the Redis cleanup Job.
pod.Spec.Containers[common.RayContainerIndex].Resources = v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("256Mi"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("256Mi"),
},
}

// For Kubernetes Job, the valid values for Pod's `RestartPolicy` are `Never` and `OnFailure`.
pod.Spec.RestartPolicy = corev1.RestartPolicyNever

Expand All @@ -1062,6 +1095,7 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b
Annotations: pod.Annotations,
},
Spec: batchv1.JobSpec{
BackoffLimit: pointer.Int32(0),
Template: corev1.PodTemplateSpec{
ObjectMeta: pod.ObjectMeta,
Spec: pod.Spec,
Expand Down

0 comments on commit 6fa2d3a

Please sign in to comment.