diff --git a/apiserver/pkg/util/job.go b/apiserver/pkg/util/job.go index a2e5e282374..cdf377eb7fb 100644 --- a/apiserver/pkg/util/job.go +++ b/apiserver/pkg/util/job.go @@ -100,6 +100,10 @@ func NewRayJob(apiJob *api.RayJob, computeTemplateMap map[string]*api.ComputeTem rayJob.Spec.ActiveDeadlineSeconds = &apiJob.ActiveDeadlineSeconds } + if apiJob.WaitingTtlSeconds > 0 { + rayJob.Spec.WaitingTtlSeconds = &apiJob.WaitingTtlSeconds + } + return &RayJob{rayJob}, nil } diff --git a/apiserver/pkg/util/job_test.go b/apiserver/pkg/util/job_test.go index 8db4d3985c0..0584fc7e812 100644 --- a/apiserver/pkg/util/job_test.go +++ b/apiserver/pkg/util/job_test.go @@ -21,6 +21,7 @@ var apiJobNewCluster = &api.RayJob{ ShutdownAfterJobFinishes: true, ClusterSpec: rayCluster.ClusterSpec, ActiveDeadlineSeconds: 120, + WaitingTtlSeconds: 120, } var apiJobExistingCluster = &api.RayJob{ diff --git a/dashboard/src/types/v2/api/rayjob.ts b/dashboard/src/types/v2/api/rayjob.ts index 4a42957ca4a..c0e93540b7d 100644 --- a/dashboard/src/types/v2/api/rayjob.ts +++ b/dashboard/src/types/v2/api/rayjob.ts @@ -21,6 +21,7 @@ type JobSubmissionMode = interface RayJobAPISpec extends RayJobSpec { activeDeadlineSeconds: number; + waitingTtlSeconds: number; backoffLimit: number; submitterConfig: SubmitterConfig; managedBy: string; diff --git a/docs/reference/api.md b/docs/reference/api.md index 4b495fef69e..40822a23aba 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -235,6 +235,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | | `activeDeadlineSeconds` _integer_ | ActiveDeadlineSeconds is the duration in seconds that the RayJob may be active before
KubeRay actively tries to terminate the RayJob; value must be positive integer. | | | +| `waitingTtlSeconds` _integer_ | WaitingTtlSeconds is the TTL to clean up RayJob when it is waiting to be scheduled. | | | | `backoffLimit` _integer_ | Specifies the number of retries before marking this job failed.
Each retry creates a new RayCluster. | 0 | | | `rayClusterSpec` _[RayClusterSpec](#rayclusterspec)_ | RayClusterSpec is the cluster template to run the job | | | | `submitterPodTemplate` _[PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podtemplatespec-v1-core)_ | SubmitterPodTemplate is the template for the pod that will run `ray job submit`. | | | diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 8f8679ca607..22758e82841 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -47,6 +47,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int32 type: integer @@ -710,6 +713,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -4434,6 +4440,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -8125,6 +8134,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -12433,6 +12445,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -16141,6 +16156,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer @@ -19824,6 +19842,9 @@ spec: type: object spec: properties: + waitingTtlSeconds: + format: int32 + type: integer activeDeadlineSeconds: format: int64 type: integer diff --git a/proto/go_client/job.pb.go b/proto/go_client/job.pb.go index b714abc17f1..c0be77aaf29 100644 --- a/proto/go_client/job.pb.go +++ b/proto/go_client/job.pb.go @@ -544,6 +544,8 @@ type RayJob struct { // KubeRay actively tries to terminate the RayJob; value must be positive integer. // If not set, the job may run indefinitely until it completes, fails, or is manually stopped. ActiveDeadlineSeconds int32 `protobuf:"varint,25,opt,name=activeDeadlineSeconds,proto3" json:"activeDeadlineSeconds,omitempty"` + // Optional waitingTtlSeconds is the duration in seconds before RayJob must come into active state + WaitingTtlSeconds int32 `protobuf:"varint,60,opt,name=waiting_ttl_seconds,json=waitingTtlSeconds,proto3" json:"waiting_ttl_seconds,omitempty"` // Output. The time that the job created. CreatedAt *timestamppb.Timestamp `protobuf:"bytes,12,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // Output. The time that the job deleted. @@ -713,6 +715,15 @@ func (x *RayJob) GetActiveDeadlineSeconds() int32 { return 0 } +// GetWaitingTtlSeconds returns the waitingTtlSeconds of the Ray job. +// If the ray job is nil, it returns 0. +func (x *RayJob) GetWaitingTtlSeconds() int32 { + if x != nil { + return x.WaitingTtlSeconds + } + return 0 +} + func (x *RayJob) GetCreatedAt() *timestamppb.Timestamp { if x != nil { return x.CreatedAt diff --git a/proto/job.proto b/proto/job.proto index d86736209f2..a2d62ae4709 100644 --- a/proto/job.proto +++ b/proto/job.proto @@ -166,6 +166,10 @@ message RayJob { // KubeRay actively tries to terminate the RayJob; value must be positive integer. // If not set, the job may run indefinitely until it completes, fails, or is manually stopped. int32 activeDeadlineSeconds = 25; + // Optional waitingTtlSeconds is the duration in seconds before RayJob must come into active state + // Kuberay will try to terminate the RayJob if job is not in active state before waitingTtlSeconds + // If not set, the job may run indefinitely until it comes to active state + int32 waitingTtlSeconds = 60; // Output. The time that the job created. google.protobuf.Timestamp created_at = 12 [(google.api.field_behavior) = OUTPUT_ONLY]; // Output. The time that the job deleted. diff --git a/proto/kuberay_api.swagger.json b/proto/kuberay_api.swagger.json index 8c3ee488cd4..f82652d5c16 100644 --- a/proto/kuberay_api.swagger.json +++ b/proto/kuberay_api.swagger.json @@ -1949,6 +1949,11 @@ "format": "int32", "description": "Optional activeDeadlineSeconds is the duration in seconds that the RayJob may be active before\nKubeRay actively tries to terminate the RayJob; value must be positive integer.\nIf not set, the job may run indefinitely until it completes, fails, or is manually stopped." }, + "waitingTtlSeconds": { + "type": "integer", + "format": "int32", + "description": "Optional waitingTtlSeconds is the TTL to clean up RayJob when it is waiting to be scheduled." + }, "createdAt": { "type": "string", "format": "date-time", diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 8f78eceed07..03932714618 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -73,6 +73,7 @@ type JobFailedReason string const ( SubmissionFailed JobFailedReason = "SubmissionFailed" DeadlineExceeded JobFailedReason = "DeadlineExceeded" + JobDeploymentFailed JobFailedReason = "JobDeploymentFailed" AppFailed JobFailedReason = "AppFailed" JobDeploymentStatusTransitionGracePeriodExceeded JobFailedReason = "JobDeploymentStatusTransitionGracePeriodExceeded" ValidationFailed JobFailedReason = "ValidationFailed" @@ -129,6 +130,8 @@ type RayJobSpec struct { // KubeRay actively tries to terminate the RayJob; value must be positive integer. // +optional ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"` + // Time to wait for the job to come into a running state after the job is submitted. + WaitingTtlSeconds *int32 `json:"waitingTtlSeconds,omitempty"` // Specifies the number of retries before marking this job failed. // Each retry creates a new RayCluster. // +kubebuilder:default:=0 diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index b4cb5decf12..846c97ea031 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -439,6 +439,11 @@ func (in *RayJobSpec) DeepCopyInto(out *RayJobSpec) { *out = new(int32) **out = **in } + if in.WaitingTtlSeconds != nil { + in, out := &in.WaitingTtlSeconds, &out.WaitingTtlSeconds + *out = new(int32) + **out = **in + } if in.BackoffLimit != nil { in, out := &in.BackoffLimit, &out.BackoffLimit *out = new(int32) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 1741507a586..54b508a62d4 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -184,6 +184,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } + if shouldUpdate := checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { + break + } + if r.options.BatchSchedulerManager != nil { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { if err := scheduler.DoBatchSchedulingOnSubmission(ctx, rayJobInstance); err != nil { @@ -242,6 +246,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } + if shouldUpdate := checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { + break + } + if shouldUpdate := checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { break } @@ -1160,6 +1168,19 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray return true } +func checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool { + logger := ctrl.LoggerFrom(ctx) + if rayJob.Spec.WaitingTtlSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.WaitingTtlSeconds)*time.Second)) { + return false + } + + logger.Info("The RayJob has passed the waitingTTLSeconds. Transition the status to `Failed`.", "StartTime", rayJob.Status.StartTime, "WaitingTtlSeconds", *rayJob.Spec.WaitingTtlSeconds) + rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed + rayJob.Status.Reason = rayv1.JobDeploymentFailed + rayJob.Status.Message = fmt.Sprintf("The RayJob has passed the waitingTTLSeconds. StartTime: %v. WaitingTtlSeconds: %d", rayJob.Status.StartTime, *rayJob.Spec.WaitingTtlSeconds) + return true +} + func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool { logger := ctrl.LoggerFrom(ctx) if rayv1.IsJobTerminal(rayJob.Status.JobStatus) && rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning { diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 74d2b4fe0e6..29416279905 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -221,6 +221,9 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error { if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 { return fmt.Errorf("The RayJob spec is invalid: activeDeadlineSeconds must be a positive integer") } + if rayJob.Spec.WaitingTtlSeconds != nil && *rayJob.Spec.WaitingTtlSeconds <= 0 { + return fmt.Errorf("The RayJob spec is invalid: waitingTtlSeconds must be a positive integer") + } if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 { return fmt.Errorf("The RayJob spec is invalid: backoffLimit must be a positive integer") } diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go index 14083877a44..f89dc61e366 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go @@ -11,6 +11,7 @@ import ( // with apply. type RayJobSpecApplyConfiguration struct { ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"` + WaitingTtlSeconds *int32 `json:"waitingTtlSeconds,omitempty"` BackoffLimit *int32 `json:"backoffLimit,omitempty"` RayClusterSpec *RayClusterSpecApplyConfiguration `json:"rayClusterSpec,omitempty"` SubmitterPodTemplate *corev1.PodTemplateSpecApplyConfiguration `json:"submitterPodTemplate,omitempty"` @@ -45,6 +46,14 @@ func (b *RayJobSpecApplyConfiguration) WithActiveDeadlineSeconds(value int32) *R return b } +// WithWaitingTtlSeconds sets the WithWaitingTtlSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the WithWaitingTtlSeconds field is set to the value of the last call. +func (b *RayJobSpecApplyConfiguration) WithWaitingTtlSeconds(value int32) *RayJobSpecApplyConfiguration { + b.WaitingTtlSeconds = &value + return b +} + // WithBackoffLimit sets the BackoffLimit field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the BackoffLimit field is set to the value of the last call. diff --git a/ray-operator/test/e2erayjob/rayjob_retry_test.go b/ray-operator/test/e2erayjob/rayjob_retry_test.go index 3cbb70ab48a..e1bc59e00e8 100644 --- a/ray-operator/test/e2erayjob/rayjob_retry_test.go +++ b/ray-operator/test/e2erayjob/rayjob_retry_test.go @@ -173,6 +173,32 @@ func TestRayJobRetry(t *testing.T) { Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) }) + //test.T().Run("WaitingTtlSeconds has passed", func(_ *testing.T) { + // rayJobAC := rayv1ac.RayJob("waiting-stuck", namespace.Name). + // WithSpec(rayv1ac.RayJobSpec(). + // WithBackoffLimit(2). + // WithSubmitterConfig(rayv1ac.SubmitterConfig(). + // WithBackoffLimit(0)). + // WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + // WithEntrypoint("python /home/ray/jobs/long_running.py"). + // WithShutdownAfterJobFinishes(true). + // WithTTLSecondsAfterFinished(600). + // WithWaitingTtlSeconds(1). + // WithActiveDeadlineSeconds(500). + // WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + // + // rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + // g.Expect(err).NotTo(HaveOccurred()) + // LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + // LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Failed'", rayJob.Namespace, rayJob.Name) + // g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). + // Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + // g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + // To(WithTransform(RayJobReason, Equal(rayv1.JobDeploymentFailed))) + // g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + // Should(WithTransform(RayJobSucceeded, Equal(int32(0)))) + //}) + test.T().Run("Failing RayJob with HttpMode submission mode", func(_ *testing.T) { // Set up the RayJob with HTTP mode and a BackoffLimit rayJobAC := rayv1ac.RayJob("failing-rayjob-in-httpmode", namespace.Name). diff --git a/ray-operator/test/e2erayjob/rayjob_test.go b/ray-operator/test/e2erayjob/rayjob_test.go index 0667bfe1410..188ae16fc39 100644 --- a/ray-operator/test/e2erayjob/rayjob_test.go +++ b/ray-operator/test/e2erayjob/rayjob_test.go @@ -274,6 +274,29 @@ env_vars: To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) }) + test.T().Run("RayJob has passed WaitingTtlSeconds", func(_ *testing.T) { + rayJobAC := rayv1ac.RayJob("long-running", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithShutdownAfterJobFinishes(true). + WithTTLSecondsAfterFinished(600). + WithActiveDeadlineSeconds(5). + WithWaitingTtlSeconds(2). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + + // The RayJob will transition to `Complete` because it has passed `ActiveDeadlineSeconds`. + LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name) + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). + To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) + }) + test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) { rayJobAC := rayv1ac.RayJob("delete-head-after-submit", namespace.Name). WithSpec(rayv1ac.RayJobSpec().