Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apiserver/pkg/util/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func NewRayJob(apiJob *api.RayJob, computeTemplateMap map[string]*api.ComputeTem
rayJob.Spec.ActiveDeadlineSeconds = &apiJob.ActiveDeadlineSeconds
}

if apiJob.WaitingTtlSeconds > 0 {
rayJob.Spec.WaitingTtlSeconds = &apiJob.WaitingTtlSeconds
}

return &RayJob{rayJob}, nil
}

Expand Down
1 change: 1 addition & 0 deletions apiserver/pkg/util/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var apiJobNewCluster = &api.RayJob{
ShutdownAfterJobFinishes: true,
ClusterSpec: rayCluster.ClusterSpec,
ActiveDeadlineSeconds: 120,
WaitingTtlSeconds: 120,
}

var apiJobExistingCluster = &api.RayJob{
Expand Down
1 change: 1 addition & 0 deletions dashboard/src/types/v2/api/rayjob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type JobSubmissionMode =

interface RayJobAPISpec extends RayJobSpec {
activeDeadlineSeconds: number;
waitingTtlSeconds: number;
backoffLimit: number;
submitterConfig: SubmitterConfig;
managedBy: string;
Expand Down
1 change: 1 addition & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `activeDeadlineSeconds` _integer_ | ActiveDeadlineSeconds is the duration in seconds that the RayJob may be active before<br />KubeRay actively tries to terminate the RayJob; value must be positive integer. | | |
| `waitingTtlSeconds` _integer_ | WaitingTtlSeconds is the TTL to clean up RayJob when it is waiting to be scheduled. | | |
| `backoffLimit` _integer_ | Specifies the number of retries before marking this job failed.<br />Each retry creates a new RayCluster. | 0 | |
| `rayClusterSpec` _[RayClusterSpec](#rayclusterspec)_ | RayClusterSpec is the cluster template to run the job | | |
| `submitterPodTemplate` _[PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podtemplatespec-v1-core)_ | SubmitterPodTemplate is the template for the pod that will run `ray job submit`. | | |
Expand Down
21 changes: 21 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions proto/go_client/job.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ message RayJob {
// KubeRay actively tries to terminate the RayJob; value must be positive integer.
// If not set, the job may run indefinitely until it completes, fails, or is manually stopped.
int32 activeDeadlineSeconds = 25;
// Optional waitingTtlSeconds is the duration in seconds before RayJob must come into active state
// Kuberay will try to terminate the RayJob if job is not in active state before waitingTtlSeconds
// If not set, the job may run indefinitely until it comes to active state
int32 waitingTtlSeconds = 60;
// Output. The time that the job created.
google.protobuf.Timestamp created_at = 12 [(google.api.field_behavior) = OUTPUT_ONLY];
// Output. The time that the job deleted.
Expand Down
5 changes: 5 additions & 0 deletions proto/kuberay_api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1949,6 +1949,11 @@
"format": "int32",
"description": "Optional activeDeadlineSeconds is the duration in seconds that the RayJob may be active before\nKubeRay actively tries to terminate the RayJob; value must be positive integer.\nIf not set, the job may run indefinitely until it completes, fails, or is manually stopped."
},
"waitingTtlSeconds": {
"type": "integer",
"format": "int32",
"description": "Optional waitingTtlSeconds is the TTL to clean up RayJob when it is waiting to be scheduled."
},
"createdAt": {
"type": "string",
"format": "date-time",
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type JobFailedReason string
const (
SubmissionFailed JobFailedReason = "SubmissionFailed"
DeadlineExceeded JobFailedReason = "DeadlineExceeded"
JobDeploymentFailed JobFailedReason = "JobDeploymentFailed"
AppFailed JobFailedReason = "AppFailed"
JobDeploymentStatusTransitionGracePeriodExceeded JobFailedReason = "JobDeploymentStatusTransitionGracePeriodExceeded"
ValidationFailed JobFailedReason = "ValidationFailed"
Expand Down Expand Up @@ -129,6 +130,8 @@ type RayJobSpec struct {
// KubeRay actively tries to terminate the RayJob; value must be positive integer.
// +optional
ActiveDeadlineSeconds *int32 `json:"activeDeadlineSeconds,omitempty"`
// Time to wait for the job to come into a running state after the job is submitted.
WaitingTtlSeconds *int32 `json:"waitingTtlSeconds,omitempty"`
// Specifies the number of retries before marking this job failed.
// Each retry creates a new RayCluster.
// +kubebuilder:default:=0
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}

if shouldUpdate := checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate {
break
}

if r.options.BatchSchedulerManager != nil {
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
if err := scheduler.DoBatchSchedulingOnSubmission(ctx, rayJobInstance); err != nil {
Expand Down Expand Up @@ -242,6 +246,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}

if shouldUpdate := checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate {
break
}

if shouldUpdate := checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate {
break
}
Expand Down Expand Up @@ -1160,6 +1168,19 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray
return true
}

func checkWaitingTtlSecondsAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
logger := ctrl.LoggerFrom(ctx)
if rayJob.Spec.WaitingTtlSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.WaitingTtlSeconds)*time.Second)) {
return false
}

logger.Info("The RayJob has passed the waitingTTLSeconds. Transition the status to `Failed`.", "StartTime", rayJob.Status.StartTime, "WaitingTtlSeconds", *rayJob.Spec.WaitingTtlSeconds)
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
rayJob.Status.Reason = rayv1.JobDeploymentFailed
rayJob.Status.Message = fmt.Sprintf("The RayJob has passed the waitingTTLSeconds. StartTime: %v. WaitingTtlSeconds: %d", rayJob.Status.StartTime, *rayJob.Spec.WaitingTtlSeconds)
return true
}

func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
logger := ctrl.LoggerFrom(ctx)
if rayv1.IsJobTerminal(rayJob.Status.JobStatus) && rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning {
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/utils/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 {
return fmt.Errorf("The RayJob spec is invalid: activeDeadlineSeconds must be a positive integer")
}
if rayJob.Spec.WaitingTtlSeconds != nil && *rayJob.Spec.WaitingTtlSeconds <= 0 {
return fmt.Errorf("The RayJob spec is invalid: waitingTtlSeconds must be a positive integer")
}
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 {
return fmt.Errorf("The RayJob spec is invalid: backoffLimit must be a positive integer")
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions ray-operator/test/e2erayjob/rayjob_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ func TestRayJobRetry(t *testing.T) {
Should(WithTransform(RayJobSucceeded, Equal(int32(0))))
})

//test.T().Run("WaitingTtlSeconds has passed", func(_ *testing.T) {
// rayJobAC := rayv1ac.RayJob("waiting-stuck", namespace.Name).
// WithSpec(rayv1ac.RayJobSpec().
// WithBackoffLimit(2).
// WithSubmitterConfig(rayv1ac.SubmitterConfig().
// WithBackoffLimit(0)).
// WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
// WithEntrypoint("python /home/ray/jobs/long_running.py").
// WithShutdownAfterJobFinishes(true).
// WithTTLSecondsAfterFinished(600).
// WithWaitingTtlSeconds(1).
// WithActiveDeadlineSeconds(500).
// WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))
//
// rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
// g.Expect(err).NotTo(HaveOccurred())
// LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
// LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Failed'", rayJob.Namespace, rayJob.Name)
// g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort).
// Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
// g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
// To(WithTransform(RayJobReason, Equal(rayv1.JobDeploymentFailed)))
// g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
// Should(WithTransform(RayJobSucceeded, Equal(int32(0))))
//})

test.T().Run("Failing RayJob with HttpMode submission mode", func(_ *testing.T) {
// Set up the RayJob with HTTP mode and a BackoffLimit
rayJobAC := rayv1ac.RayJob("failing-rayjob-in-httpmode", namespace.Name).
Expand Down
23 changes: 23 additions & 0 deletions ray-operator/test/e2erayjob/rayjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,29 @@ env_vars:
To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded)))
})

test.T().Run("RayJob has passed WaitingTtlSeconds", func(_ *testing.T) {
rayJobAC := rayv1ac.RayJob("long-running", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running.py").
WithShutdownAfterJobFinishes(true).
WithTTLSecondsAfterFinished(600).
WithActiveDeadlineSeconds(5).
WithWaitingTtlSeconds(2).
WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

// The RayJob will transition to `Complete` because it has passed `ActiveDeadlineSeconds`.
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name)
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded)))
})

test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) {
rayJobAC := rayv1ac.RayJob("delete-head-after-submit", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
Expand Down