From 7d18bd6bf1c4b84b678dddf3776a108e231a8ce9 Mon Sep 17 00:00:00 2001 From: ujjawal-khare Date: Wed, 8 Oct 2025 18:43:18 +0530 Subject: [PATCH 1/5] wait ttl seconds added Signed-off-by: ujjawal-khare --- apiserver/pkg/util/job.go | 4 ++++ dashboard/src/types/v2/api/rayjob.ts | 1 + docs/reference/api.md | 1 + proto/go_client/job.pb.go | 10 ++++++++ proto/job.proto | 4 ++++ ray-operator/apis/ray/v1/rayjob_types.go | 3 +++ .../apis/ray/v1/zz_generated.deepcopy.go | 5 ++++ .../controllers/ray/rayjob_controller.go | 21 +++++++++++++++++ .../controllers/ray/utils/validation.go | 3 +++ .../applyconfiguration/ray/v1/rayjobspec.go | 9 ++++++++ ray-operator/test/e2erayjob/rayjob_test.go | 23 +++++++++++++++++++ 11 files changed, 84 insertions(+) diff --git a/apiserver/pkg/util/job.go b/apiserver/pkg/util/job.go index a2e5e282374..612c93d7ef4 100644 --- a/apiserver/pkg/util/job.go +++ b/apiserver/pkg/util/job.go @@ -100,6 +100,10 @@ func NewRayJob(apiJob *api.RayJob, computeTemplateMap map[string]*api.ComputeTem rayJob.Spec.ActiveDeadlineSeconds = &apiJob.ActiveDeadlineSeconds } + if apiJob.WaitingTTLSeconds > 0 { + rayJob.Spec.WaitingTTLSeconds = &apiJob.WaitingTTLSeconds + } + return &RayJob{rayJob}, nil } diff --git a/dashboard/src/types/v2/api/rayjob.ts b/dashboard/src/types/v2/api/rayjob.ts index 4a42957ca4a..c0e93540b7d 100644 --- a/dashboard/src/types/v2/api/rayjob.ts +++ b/dashboard/src/types/v2/api/rayjob.ts @@ -21,6 +21,7 @@ type JobSubmissionMode = interface RayJobAPISpec extends RayJobSpec { activeDeadlineSeconds: number; + waitingTtlSeconds: number; backoffLimit: number; submitterConfig: SubmitterConfig; managedBy: string; diff --git a/docs/reference/api.md b/docs/reference/api.md index 4b495fef69e..49deb1d9371 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -235,6 +235,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | | `activeDeadlineSeconds` _integer_ | ActiveDeadlineSeconds is the duration in seconds that the RayJob may be active before
KubeRay actively tries to terminate the RayJob; value must be positive integer. | | | +| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.
It's only working when ShutdownAfterJobFinishes set to true. | | | | `backoffLimit` _integer_ | Specifies the number of retries before marking this job failed.
Each retry creates a new RayCluster. | 0 | | | `rayClusterSpec` _[RayClusterSpec](#rayclusterspec)_ | RayClusterSpec is the cluster template to run the job | | | | `submitterPodTemplate` _[PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podtemplatespec-v1-core)_ | SubmitterPodTemplate is the template for the pod that will run `ray job submit`. | | | diff --git a/proto/go_client/job.pb.go b/proto/go_client/job.pb.go index b714abc17f1..7a3d30e448b 100644 --- a/proto/go_client/job.pb.go +++ b/proto/go_client/job.pb.go @@ -544,6 +544,8 @@ type RayJob struct { // KubeRay actively tries to terminate the RayJob; value must be positive integer. // If not set, the job may run indefinitely until it completes, fails, or is manually stopped. ActiveDeadlineSeconds int32 `protobuf:"varint,25,opt,name=activeDeadlineSeconds,proto3" json:"activeDeadlineSeconds,omitempty"` + // Optional waitingTtlSeconds is the duration in seconds before RayJob must come into active state + WaitingTTLSeconds int32 `protobuf:"varint,60,opt,name=waiting_ttl_seconds,json=waitingTtlSeconds,proto3" json:"waiting_ttl_seconds,omitempty"` // Output. The time that the job created. CreatedAt *timestamppb.Timestamp `protobuf:"bytes,12,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // Output. The time that the job deleted. @@ -713,6 +715,14 @@ func (x *RayJob) GetActiveDeadlineSeconds() int32 { return 0 } +func (x *RayJob) GetWaitingTTLSeconds() int32 { + if x != nil { + return x.WaitingTTLSeconds + } + + return 0 +} + func (x *RayJob) GetCreatedAt() *timestamppb.Timestamp { if x != nil { return x.CreatedAt diff --git a/proto/job.proto b/proto/job.proto index d86736209f2..a2d62ae4709 100644 --- a/proto/job.proto +++ b/proto/job.proto @@ -166,6 +166,10 @@ message RayJob { // KubeRay actively tries to terminate the RayJob; value must be positive integer. // If not set, the job may run indefinitely until it completes, fails, or is manually stopped. int32 activeDeadlineSeconds = 25; + // Optional waitingTtlSeconds is the duration in seconds before RayJob must come into active state + // Kuberay will try to terminate the RayJob if job is not in active state before waitingTtlSeconds + // If not set, the job may run indefinitely until it comes to active state + int32 waitingTtlSeconds = 60; // Output. The time that the job created. google.protobuf.Timestamp created_at = 12 [(google.api.field_behavior) = OUTPUT_ONLY]; // Output. The time that the job deleted. diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 8f78eceed07..994c047ec60 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -73,6 +73,7 @@ type JobFailedReason string const ( SubmissionFailed JobFailedReason = "SubmissionFailed" DeadlineExceeded JobFailedReason = "DeadlineExceeded" + JobDeploymentFailed JobFailedReason = "JobDeploymentFailed" AppFailed JobFailedReason = "AppFailed" JobDeploymentStatusTransitionGracePeriodExceeded JobFailedReason = "JobDeploymentStatusTransitionGracePeriodExceeded" ValidationFailed JobFailedReason = "ValidationFailed" @@ -129,6 +130,8 @@ type RayJobSpec struct { // KubeRay actively tries to terminate the RayJob; value must be positive integer. // +optional ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"` + // Time to wait for the job to come into a running state after the job is submitted. + WaitingTTLSeconds *int32 `json:"waitingTTLSeconds,omitempty"` // Specifies the number of retries before marking this job failed. // Each retry creates a new RayCluster. // +kubebuilder:default:=0 diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index b4cb5decf12..1660f846d4a 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -439,6 +439,11 @@ func (in *RayJobSpec) DeepCopyInto(out *RayJobSpec) { *out = new(int32) **out = **in } + if in.WaitingTTLSeconds != nil { + in, out := &in.WaitingTTLSeconds, &out.WaitingTTLSeconds + *out = new(int32) + **out = **in + } if in.BackoffLimit != nil { in, out := &in.BackoffLimit, &out.BackoffLimit *out = new(int32) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 1741507a586..a279c2bc6b7 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -184,6 +184,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } + if shouldUpdate := checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { + break + } + if r.options.BatchSchedulerManager != nil { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { if err := scheduler.DoBatchSchedulingOnSubmission(ctx, rayJobInstance); err != nil { @@ -242,6 +246,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } + if shouldUpdate := checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { + break + } + if shouldUpdate := checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { break } @@ -1160,6 +1168,19 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray return true } +func checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool { + logger := ctrl.LoggerFrom(ctx) + if rayJob.Spec.WaitingTTLSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.WaitingTTLSeconds)*time.Second)) { + return false + } + + logger.Info("The RayJob has passed the waitingTTLSeconds. Transition the status to `Failed`.", "StartTime", rayJob.Status.StartTime, "WaitingTTLSeconds", *rayJob.Spec.WaitingTTLSeconds) + rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed + rayJob.Status.Reason = rayv1.JobDeploymentFailed + rayJob.Status.Message = fmt.Sprintf("The RayJob has passed the waitingTTLSeconds. StartTime: %v. WaitingTTLSeconds: %d", rayJob.Status.StartTime, *rayJob.Spec.WaitingTTLSeconds) + return true +} + func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool { logger := ctrl.LoggerFrom(ctx) if rayv1.IsJobTerminal(rayJob.Status.JobStatus) && rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning { diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 74d2b4fe0e6..0d9c4d1123e 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -221,6 +221,9 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error { if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 { return fmt.Errorf("The RayJob spec is invalid: activeDeadlineSeconds must be a positive integer") } + if rayJob.Spec.WaitingTTLSeconds != nil && *rayJob.Spec.WaitingTTLSeconds <= 0 { + return fmt.Errorf("The RayJob spec is invalid: waitingTtlSeconds must be a positive integer") + } if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 { return fmt.Errorf("The RayJob spec is invalid: backoffLimit must be a positive integer") } diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go index 14083877a44..52f495761c5 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go @@ -11,6 +11,7 @@ import ( // with apply. type RayJobSpecApplyConfiguration struct { ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"` + WaitingTTLSeconds *int32 `json:"waitingTtlSeconds,omitempty"` BackoffLimit *int32 `json:"backoffLimit,omitempty"` RayClusterSpec *RayClusterSpecApplyConfiguration `json:"rayClusterSpec,omitempty"` SubmitterPodTemplate *corev1.PodTemplateSpecApplyConfiguration `json:"submitterPodTemplate,omitempty"` @@ -45,6 +46,14 @@ func (b *RayJobSpecApplyConfiguration) WithActiveDeadlineSeconds(value int32) *R return b } +// WithWaitingTTLSeconds sets the WithWaitingTTLSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the WithWaitingTTLSeconds field is set to the value of the last call. +func (b *RayJobSpecApplyConfiguration) WithWaitingTTLSeconds(value int32) *RayJobSpecApplyConfiguration { + b.WaitingTTLSeconds = &value + return b +} + // WithBackoffLimit sets the BackoffLimit field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the BackoffLimit field is set to the value of the last call. diff --git a/ray-operator/test/e2erayjob/rayjob_test.go b/ray-operator/test/e2erayjob/rayjob_test.go index 0667bfe1410..56c2ccfc8cc 100644 --- a/ray-operator/test/e2erayjob/rayjob_test.go +++ b/ray-operator/test/e2erayjob/rayjob_test.go @@ -274,6 +274,29 @@ env_vars: To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) }) + test.T().Run("RayJob has passed WaitingTTLSeconds", func(_ *testing.T) { + rayJobAC := rayv1ac.RayJob("long-running", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithShutdownAfterJobFinishes(true). + WithTTLSecondsAfterFinished(600). + WithActiveDeadlineSeconds(5). + WithWaitingTTLSeconds(5). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + + // The RayJob will transition to `Complete` because it has passed `ActiveDeadlineSeconds`. + LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name) + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) + }) + test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) { rayJobAC := rayv1ac.RayJob("delete-head-after-submit", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). From e772b03cd224ecada02e44fcce01de55a21071cb Mon Sep 17 00:00:00 2001 From: ujjawal-khare Date: Wed, 8 Oct 2025 21:52:58 +0530 Subject: [PATCH 2/5] wait ttl seconds added Signed-off-by: ujjawal-khare --- apiserver/pkg/util/job.go | 4 ++-- apiserver/pkg/util/job_test.go | 1 + .../kuberay-operator/crds/ray.io_rayjobs.yaml | 21 +++++++++++++++++++ proto/go_client/job.pb.go | 10 ++++++--- ray-operator/apis/ray/v1/rayjob_types.go | 2 +- .../apis/ray/v1/zz_generated.deepcopy.go | 4 ++-- .../controllers/ray/rayjob_controller.go | 6 +++--- .../controllers/ray/utils/validation.go | 2 +- .../applyconfiguration/ray/v1/rayjobspec.go | 10 ++++----- ray-operator/test/e2erayjob/rayjob_test.go | 8 +++---- 10 files changed, 47 insertions(+), 21 deletions(-) diff --git a/apiserver/pkg/util/job.go b/apiserver/pkg/util/job.go index 612c93d7ef4..cdf377eb7fb 100644 --- a/apiserver/pkg/util/job.go +++ b/apiserver/pkg/util/job.go @@ -100,8 +100,8 @@ func NewRayJob(apiJob *api.RayJob, computeTemplateMap map[string]*api.ComputeTem rayJob.Spec.ActiveDeadlineSeconds = &apiJob.ActiveDeadlineSeconds } - if apiJob.WaitingTTLSeconds > 0 { - rayJob.Spec.WaitingTTLSeconds = &apiJob.WaitingTTLSeconds + if apiJob.WaitingTtlSeconds > 0 { + rayJob.Spec.WaitingTtlSeconds = &apiJob.WaitingTtlSeconds } return &RayJob{rayJob}, nil diff --git a/apiserver/pkg/util/job_test.go b/apiserver/pkg/util/job_test.go index 8db4d3985c0..0584fc7e812 100644 --- a/apiserver/pkg/util/job_test.go +++ b/apiserver/pkg/util/job_test.go @@ -21,6 +21,7 @@ var apiJobNewCluster = &api.RayJob{ ShutdownAfterJobFinishes: true, ClusterSpec: rayCluster.ClusterSpec, ActiveDeadlineSeconds: 120, + WaitingTtlSeconds: 120, } var apiJobExistingCluster = &api.RayJob{ diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 8f8679ca607..22758e82841 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -47,6 +47,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int32 type: integer @@ -710,6 +713,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -4434,6 +4440,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -8125,6 +8134,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -12433,6 +12445,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -16141,6 +16156,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -19824,6 +19842,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer diff --git a/proto/go_client/job.pb.go b/proto/go_client/job.pb.go index 7a3d30e448b..121ad9f02ea 100644 --- a/proto/go_client/job.pb.go +++ b/proto/go_client/job.pb.go @@ -545,7 +545,7 @@ type RayJob struct { // If not set, the job may run indefinitely until it completes, fails, or is manually stopped. ActiveDeadlineSeconds int32 `protobuf:"varint,25,opt,name=activeDeadlineSeconds,proto3" json:"activeDeadlineSeconds,omitempty"` // Optional waitingTtlSeconds is the duration in seconds before RayJob must come into active state - WaitingTTLSeconds int32 `protobuf:"varint,60,opt,name=waiting_ttl_seconds,json=waitingTtlSeconds,proto3" json:"waiting_ttl_seconds,omitempty"` + WaitingTtlSeconds int32 `protobuf:"varint,60,opt,name=waiting_ttl_seconds,json=waitingTtlSeconds,proto3" json:"waiting_ttl_seconds,omitempty"` // Output. The time that the job created. CreatedAt *timestamppb.Timestamp `protobuf:"bytes,12,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // Output. The time that the job deleted. @@ -715,9 +715,13 @@ func (x *RayJob) GetActiveDeadlineSeconds() int32 { return 0 } -func (x *RayJob) GetWaitingTTLSeconds() int32 { +/* <<<<<<<<<<<<<< ✨ Windsurf Command ⭐ >>>>>>>>>>>>>>>> */ +// GetWaitingTtlSeconds returns the waitingTtlSeconds of the Ray job. +// If the ray job is nil, it returns 0. +/* <<<<<<<<<< ad50ff48-1fce-40c2-ab17-65e3badaee66 >>>>>>>>>>> */ +func (x *RayJob) GetWaitingTtlSeconds() int32 { if x != nil { - return x.WaitingTTLSeconds + return x.WaitingTtlSeconds } return 0 diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 994c047ec60..03932714618 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -131,7 +131,7 @@ type RayJobSpec struct { // +optional ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"` // Time to wait for the job to come into a running state after the job is submitted. - WaitingTTLSeconds *int32 `json:"waitingTTLSeconds,omitempty"` + WaitingTtlSeconds *int32 `json:"waitingTtlSeconds,omitempty"` // Specifies the number of retries before marking this job failed. // Each retry creates a new RayCluster. // +kubebuilder:default:=0 diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index 1660f846d4a..846c97ea031 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -439,8 +439,8 @@ func (in *RayJobSpec) DeepCopyInto(out *RayJobSpec) { *out = new(int32) **out = **in } - if in.WaitingTTLSeconds != nil { - in, out := &in.WaitingTTLSeconds, &out.WaitingTTLSeconds + if in.WaitingTtlSeconds != nil { + in, out := &in.WaitingTtlSeconds, &out.WaitingTtlSeconds *out = new(int32) **out = **in } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index a279c2bc6b7..54b508a62d4 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -1170,14 +1170,14 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray func checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool { logger := ctrl.LoggerFrom(ctx) - if rayJob.Spec.WaitingTTLSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.WaitingTTLSeconds)*time.Second)) { + if rayJob.Spec.WaitingTtlSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.WaitingTtlSeconds)*time.Second)) { return false } - logger.Info("The RayJob has passed the waitingTTLSeconds. Transition the status to `Failed`.", "StartTime", rayJob.Status.StartTime, "WaitingTTLSeconds", *rayJob.Spec.WaitingTTLSeconds) + logger.Info("The RayJob has passed the waitingTTLSeconds. Transition the status to `Failed`.", "StartTime", rayJob.Status.StartTime, "WaitingTtlSeconds", *rayJob.Spec.WaitingTtlSeconds) rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed rayJob.Status.Reason = rayv1.JobDeploymentFailed - rayJob.Status.Message = fmt.Sprintf("The RayJob has passed the waitingTTLSeconds. StartTime: %v. WaitingTTLSeconds: %d", rayJob.Status.StartTime, *rayJob.Spec.WaitingTTLSeconds) + rayJob.Status.Message = fmt.Sprintf("The RayJob has passed the waitingTTLSeconds. StartTime: %v. WaitingTtlSeconds: %d", rayJob.Status.StartTime, *rayJob.Spec.WaitingTtlSeconds) return true } diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 0d9c4d1123e..29416279905 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -221,7 +221,7 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error { if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 { return fmt.Errorf("The RayJob spec is invalid: activeDeadlineSeconds must be a positive integer") } - if rayJob.Spec.WaitingTTLSeconds != nil && *rayJob.Spec.WaitingTTLSeconds <= 0 { + if rayJob.Spec.WaitingTtlSeconds != nil && *rayJob.Spec.WaitingTtlSeconds <= 0 { return fmt.Errorf("The RayJob spec is invalid: waitingTtlSeconds must be a positive integer") } if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 { diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go index 52f495761c5..f89dc61e366 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go @@ -11,7 +11,7 @@ import ( // with apply. type RayJobSpecApplyConfiguration struct { ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"` - WaitingTTLSeconds *int32 `json:"waitingTtlSeconds,omitempty"` + WaitingTtlSeconds *int32 `json:"waitingTtlSeconds,omitempty"` BackoffLimit *int32 `json:"backoffLimit,omitempty"` RayClusterSpec *RayClusterSpecApplyConfiguration `json:"rayClusterSpec,omitempty"` SubmitterPodTemplate *corev1.PodTemplateSpecApplyConfiguration `json:"submitterPodTemplate,omitempty"` @@ -46,11 +46,11 @@ func (b *RayJobSpecApplyConfiguration) WithActiveDeadlineSeconds(value int32) *R return b } -// WithWaitingTTLSeconds sets the WithWaitingTTLSeconds field in the declarative configuration to the given value +// WithWaitingTtlSeconds sets the WithWaitingTtlSeconds field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the WithWaitingTTLSeconds field is set to the value of the last call. -func (b *RayJobSpecApplyConfiguration) WithWaitingTTLSeconds(value int32) *RayJobSpecApplyConfiguration { - b.WaitingTTLSeconds = &value +// If called multiple times, the WithWaitingTtlSeconds field is set to the value of the last call. +func (b *RayJobSpecApplyConfiguration) WithWaitingTtlSeconds(value int32) *RayJobSpecApplyConfiguration { + b.WaitingTtlSeconds = &value return b } diff --git a/ray-operator/test/e2erayjob/rayjob_test.go b/ray-operator/test/e2erayjob/rayjob_test.go index 56c2ccfc8cc..e02763ce9a1 100644 --- a/ray-operator/test/e2erayjob/rayjob_test.go +++ b/ray-operator/test/e2erayjob/rayjob_test.go @@ -274,7 +274,7 @@ env_vars: To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) }) - test.T().Run("RayJob has passed WaitingTTLSeconds", func(_ *testing.T) { + test.T().Run("RayJob has passed WaitingTtlSeconds", func(_ *testing.T) { rayJobAC := rayv1ac.RayJob("long-running", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). @@ -282,7 +282,7 @@ env_vars: WithShutdownAfterJobFinishes(true). WithTTLSecondsAfterFinished(600). WithActiveDeadlineSeconds(5). - WithWaitingTTLSeconds(5). + WithWaitingTtlSeconds(2). WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) @@ -354,13 +354,13 @@ env_vars: WithManagedBy("kueue.x-k8s.io/multikueue")) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - g.Expect(err).NotTo(HaveOccurred()) + //g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // Should not to be able to change managedBy field as it's immutable rayJobAC.Spec.WithManagedBy(utils.KubeRayController) _, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - g.Expect(err).To(HaveOccurred()) + //g.Expect(err).To(HaveOccurred()) g.Eventually(RayJob(test, *rayJobAC.Namespace, *rayJobAC.Name)). Should(WithTransform(RayJobManagedBy, Equal(ptr.To("kueue.x-k8s.io/multikueue")))) From 9ad5935c67d8cdb4ce0ef6ee136b579f647dc3e1 Mon Sep 17 00:00:00 2001 From: ujjawal-khare Date: Wed, 8 Oct 2025 21:59:21 +0530 Subject: [PATCH 3/5] wait ttl seconds added Signed-off-by: ujjawal-khare --- proto/go_client/job.pb.go | 3 --- proto/kuberay_api.swagger.json | 5 +++++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/proto/go_client/job.pb.go b/proto/go_client/job.pb.go index 121ad9f02ea..c0be77aaf29 100644 --- a/proto/go_client/job.pb.go +++ b/proto/go_client/job.pb.go @@ -715,15 +715,12 @@ func (x *RayJob) GetActiveDeadlineSeconds() int32 { return 0 } -/* <<<<<<<<<<<<<< ✨ Windsurf Command ⭐ >>>>>>>>>>>>>>>> */ // GetWaitingTtlSeconds returns the waitingTtlSeconds of the Ray job. // If the ray job is nil, it returns 0. -/* <<<<<<<<<< ad50ff48-1fce-40c2-ab17-65e3badaee66 >>>>>>>>>>> */ func (x *RayJob) GetWaitingTtlSeconds() int32 { if x != nil { return x.WaitingTtlSeconds } - return 0 } diff --git a/proto/kuberay_api.swagger.json b/proto/kuberay_api.swagger.json index 8c3ee488cd4..f82652d5c16 100644 --- a/proto/kuberay_api.swagger.json +++ b/proto/kuberay_api.swagger.json @@ -1949,6 +1949,11 @@ "format": "int32", "description": "Optional activeDeadlineSeconds is the duration in seconds that the RayJob may be active before\nKubeRay actively tries to terminate the RayJob; value must be positive integer.\nIf not set, the job may run indefinitely until it completes, fails, or is manually stopped." }, + "waitingTtlSeconds": { + "type": "integer", + "format": "int32", + "description": "Optional waitingTtlSeconds is the TTL to clean up RayJob when it is waiting to be scheduled." + }, "createdAt": { "type": "string", "format": "date-time", From 22a1c3fbea5b1d699991cb8d6ebd6f335b51aaae Mon Sep 17 00:00:00 2001 From: ujjawal-khare Date: Fri, 10 Oct 2025 11:24:13 +0530 Subject: [PATCH 4/5] wait ttl seconds test case added Signed-off-by: ujjawal-khare --- .../test/e2erayjob/rayjob_retry_test.go | 26 +++++++++++++++++++ ray-operator/test/e2erayjob/rayjob_test.go | 4 +-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/ray-operator/test/e2erayjob/rayjob_retry_test.go b/ray-operator/test/e2erayjob/rayjob_retry_test.go index 3cbb70ab48a..3fe3f8d51ab 100644 --- a/ray-operator/test/e2erayjob/rayjob_retry_test.go +++ b/ray-operator/test/e2erayjob/rayjob_retry_test.go @@ -173,6 +173,32 @@ func TestRayJobRetry(t *testing.T) { Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) }) + test.T().Run("WaitingTtlSeconds has passed", func(_ *testing.T) { + rayJobAC := rayv1ac.RayJob("waiting-stuck", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithBackoffLimit(2). + WithSubmitterConfig(rayv1ac.SubmitterConfig(). + WithBackoffLimit(0)). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithShutdownAfterJobFinishes(true). + WithTTLSecondsAfterFinished(600). + WithWaitingTtlSeconds(1). + WithActiveDeadlineSeconds(500). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Failed'", rayJob.Namespace, rayJob.Name) + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + To(WithTransform(RayJobReason, Equal(rayv1.JobDeploymentFailed))) + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) + }) + test.T().Run("Failing RayJob with HttpMode submission mode", func(_ *testing.T) { // Set up the RayJob with HTTP mode and a BackoffLimit rayJobAC := rayv1ac.RayJob("failing-rayjob-in-httpmode", namespace.Name). diff --git a/ray-operator/test/e2erayjob/rayjob_test.go b/ray-operator/test/e2erayjob/rayjob_test.go index e02763ce9a1..188ae16fc39 100644 --- a/ray-operator/test/e2erayjob/rayjob_test.go +++ b/ray-operator/test/e2erayjob/rayjob_test.go @@ -354,13 +354,13 @@ env_vars: WithManagedBy("kueue.x-k8s.io/multikueue")) rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - //g.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) // Should not to be able to change managedBy field as it's immutable rayJobAC.Spec.WithManagedBy(utils.KubeRayController) _, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) - //g.Expect(err).To(HaveOccurred()) + g.Expect(err).To(HaveOccurred()) g.Eventually(RayJob(test, *rayJobAC.Namespace, *rayJobAC.Name)). Should(WithTransform(RayJobManagedBy, Equal(ptr.To("kueue.x-k8s.io/multikueue")))) From 1e118cba8ded37ace9bb91f1693afb8ce83712c8 Mon Sep 17 00:00:00 2001 From: ujjawal-khare Date: Fri, 10 Oct 2025 12:03:01 +0530 Subject: [PATCH 5/5] changed api description Signed-off-by: ujjawal-khare --- docs/reference/api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/api.md b/docs/reference/api.md index 49deb1d9371..40822a23aba 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -235,7 +235,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | | `activeDeadlineSeconds` _integer_ | ActiveDeadlineSeconds is the duration in seconds that the RayJob may be active before
KubeRay actively tries to terminate the RayJob; value must be positive integer. | | | -| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.
It's only working when ShutdownAfterJobFinishes set to true. | | | +| `waitingTtlSeconds` _integer_ | WaitingTtlSeconds is the TTL to clean up RayJob when it is waiting to be scheduled. | | | | `backoffLimit` _integer_ | Specifies the number of retries before marking this job failed.
Each retry creates a new RayCluster. | 0 | | | `rayClusterSpec` _[RayClusterSpec](#rayclusterspec)_ | RayClusterSpec is the cluster template to run the job | | | | `submitterPodTemplate` _[PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podtemplatespec-v1-core)_ | SubmitterPodTemplate is the template for the pod that will run `ray job submit`. | | |