From 6fa2d3a59464207f36093a3409d00286aef37471 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Wed, 1 Nov 2023 16:19:01 -0700 Subject: [PATCH] [GCS FT] Improve GCS FT cleanup UX (#1592) --- .../controllers/ray/raycluster_controller.go | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 12efc3cd02..6c64add484 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -10,6 +10,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" @@ -28,8 +29,10 @@ import ( "k8s.io/client-go/rest" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -282,8 +285,12 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request return ctrl.Result{}, nil } if redisCleanupJob.Status.Failed > 0 { - r.Log.Info("If the Redis cleanup Job has failed, we will requeue the RayCluster CR after 1 minute.") - return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil + r.Log.Info(fmt.Sprintf( + "The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+ + "You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+ + "Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.", + redisCleanupJob.Name, redisCleanupJob.Annotations[common.RayExternalStorageNSAnnotationKey])) + return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil } } else { redisCleanupJob := r.buildRedisCleanupJob(*instance) @@ -1034,6 +1041,7 @@ func (r *RayClusterReconciler) buildWorkerPod(instance rayv1.RayCluster, worker func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) batchv1.Job { pod := r.buildHeadPod(instance) pod.Labels[common.RayNodeTypeLabelKey] = string(rayv1.RedisCleanupNode) + // Only keep the Ray container in the Redis cleanup Job. pod.Spec.Containers = []corev1.Container{pod.Spec.Containers[common.RayContainerIndex]} pod.Spec.Containers[common.RayContainerIndex].Command = []string{"/bin/bash", "-lc", "--"} @@ -1048,9 +1056,34 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b "parsed = urlparse(redis_address); " + "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"", } + // Disable liveness and readiness probes because the Job will not launch processes like Raylet and GCS. pod.Spec.Containers[common.RayContainerIndex].LivenessProbe = nil pod.Spec.Containers[common.RayContainerIndex].ReadinessProbe = nil + + // Set the environment variables to ensure that the cleanup Job has at least 60s. + pod.Spec.Containers[common.RayContainerIndex].Env = append(pod.Spec.Containers[common.RayContainerIndex].Env, corev1.EnvVar{ + Name: "RAY_redis_db_connect_retries", + Value: "120", + }) + pod.Spec.Containers[common.RayContainerIndex].Env = append(pod.Spec.Containers[common.RayContainerIndex].Env, corev1.EnvVar{ + Name: "RAY_redis_db_connect_wait_milliseconds", + Value: "500", + }) + + // The container's resource consumption remains constant. so hard-coding the resources is acceptable. + // In addition, avoid using the GPU for the Redis cleanup Job. + pod.Spec.Containers[common.RayContainerIndex].Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("256Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("256Mi"), + }, + } + // For Kubernetes Job, the valid values for Pod's `RestartPolicy` are `Never` and `OnFailure`. pod.Spec.RestartPolicy = corev1.RestartPolicyNever @@ -1062,6 +1095,7 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b Annotations: pod.Annotations, }, Spec: batchv1.JobSpec{ + BackoffLimit: pointer.Int32(0), Template: corev1.PodTemplateSpec{ ObjectMeta: pod.ObjectMeta, Spec: pod.Spec,