Skip to content

[Feature] [kubectl-plugin] Expose setting shutdownAfterJobFinishes and ttlSecondsAfterFinished in ray job submit #3627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions apiserver/pkg/model/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,10 @@ var JobNewClusterTest = rayv1api.RayJob{
Metadata: map[string]string{
"job_submission_id": "123",
},
RuntimeEnvYAML: "mytest yaml",
TTLSecondsAfterFinished: secondsValue,
RayClusterSpec: &ClusterSpecTest.Spec,
RuntimeEnvYAML: "mytest yaml",
ShutdownAfterJobFinishes: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change apiserver tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I have modified the ray operator that ShutdownAfterJobFinishes should be true; otherwise, ttlSecondsAfterFinishedcannot cannot be set.
reference

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unrelated to this PR? Could you do apiserver-related changes in a separate PR?

TTLSecondsAfterFinished: secondsValue,
RayClusterSpec: &ClusterSpecTest.Spec,
},
}

Expand All @@ -318,9 +319,10 @@ var JobExistingClusterTest = rayv1api.RayJob{
},
},
Spec: rayv1api.RayJobSpec{
Entrypoint: "python /home/ray/samples/sample_code.py",
RuntimeEnvYAML: "mytest yaml",
TTLSecondsAfterFinished: secondsValue,
Entrypoint: "python /home/ray/samples/sample_code.py",
RuntimeEnvYAML: "mytest yaml",
ShutdownAfterJobFinishes: true,
TTLSecondsAfterFinished: secondsValue,
ClusterSelector: map[string]string{
util.RayClusterUserLabelKey: "test",
},
Expand All @@ -336,9 +338,10 @@ var JobExistingClusterSubmitterTest = rayv1api.RayJob{
},
},
Spec: rayv1api.RayJobSpec{
Entrypoint: "python /home/ray/samples/sample_code.py",
RuntimeEnvYAML: "mytest yaml",
TTLSecondsAfterFinished: secondsValue,
Entrypoint: "python /home/ray/samples/sample_code.py",
RuntimeEnvYAML: "mytest yaml",
ShutdownAfterJobFinishes: true,
TTLSecondsAfterFinished: secondsValue,
ClusterSelector: map[string]string{
util.RayClusterUserLabelKey: "test",
},
Expand Down Expand Up @@ -375,10 +378,11 @@ var JobWithOutputTest = rayv1api.RayJob{
},
},
Spec: rayv1api.RayJobSpec{
Entrypoint: "python /home/ray/samples/sample_code.py",
RuntimeEnvYAML: "mytest yaml",
TTLSecondsAfterFinished: secondsValue,
RayClusterSpec: &ClusterSpecTest.Spec,
Entrypoint: "python /home/ray/samples/sample_code.py",
RuntimeEnvYAML: "mytest yaml",
ShutdownAfterJobFinishes: true,
TTLSecondsAfterFinished: secondsValue,
RayClusterSpec: &ClusterSpecTest.Spec,
},
Status: rayv1api.RayJobStatus{
JobStatus: "RUNNING",
Expand Down
19 changes: 10 additions & 9 deletions apiserver/test/e2e/job_server_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,16 @@ func TestCreateJobWithClusterSelector(t *testing.T) {
Name: "Submit a correct job on an already running cluster",
Input: &api.CreateRayJobRequest{
Job: &api.RayJob{
Name: tCtx.GetNextName(),
Namespace: tCtx.GetNamespaceName(),
User: "r2d2",
Version: tCtx.GetRayVersion(),
Entrypoint: "python /home/ray/samples/counter_sample.py",
Metadata: map[string]string{},
RuntimeEnv: "pip:\n - requests==2.26.0\n - pendulum==2.1.2\nenv_vars:\n counter_name: test_counter\n",
ClusterSelector: map[string]string{"ray.io/cluster": cluster.Name},
TtlSecondsAfterFinished: 60,
Name: tCtx.GetNextName(),
Namespace: tCtx.GetNamespaceName(),
User: "r2d2",
Version: tCtx.GetRayVersion(),
Entrypoint: "python /home/ray/samples/counter_sample.py",
Metadata: map[string]string{},
RuntimeEnv: "pip:\n - requests==2.26.0\n - pendulum==2.1.2\nenv_vars:\n counter_name: test_counter\n",
ClusterSelector: map[string]string{"ray.io/cluster": cluster.Name},
ShutdownAfterJobFinishes: true,
TtlSecondsAfterFinished: 60,
JobSubmitter: &api.RayJobSubmitter{
Image: cluster.ClusterSpec.HeadGroupSpec.Image,
},
Expand Down
89 changes: 52 additions & 37 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,42 @@ const (
)

type SubmitJobOptions struct {
cmdFactory cmdutil.Factory
dashboardClient utils.RayDashboardClientInterface
ioStreams *genericiooptions.IOStreams
RayJob *rayv1.RayJob
logColor string
image string
fileName string
workingDir string
runtimeEnv string
headers string
verify string
cluster string
runtimeEnvJson string
entryPointResource string
metadataJson string
logStyle string
submissionID string
rayjobName string
rayVersion string
entryPoint string
headCPU string
headMemory string
headGPU string
workerCPU string
workerMemory string
workerGPU string
namespace string
entryPointMemory int
entryPointGPU float32
workerReplicas int32
entryPointCPU float32
noWait bool
dryRun bool
verbose bool
cmdFactory cmdutil.Factory
dashboardClient utils.RayDashboardClientInterface
ioStreams *genericiooptions.IOStreams
RayJob *rayv1.RayJob
logColor string
image string
fileName string
workingDir string
runtimeEnv string
headers string
verify string
cluster string
runtimeEnvJson string
entryPointResource string
metadataJson string
logStyle string
submissionID string
rayjobName string
rayVersion string
entryPoint string
headCPU string
headMemory string
headGPU string
workerCPU string
workerMemory string
workerGPU string
namespace string
entryPointMemory int
entryPointGPU float32
workerReplicas int32
entryPointCPU float32
noWait bool
dryRun bool
verbose bool
shutdownAfterJobFinishes bool
ttlSecondsAfterFinished int32
Comment on lines +77 to +78
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add shutdownAfterJobFinishes and ttlSecondsAfterFinished

}

type JobInfo struct {
Expand Down Expand Up @@ -171,6 +173,8 @@ func NewJobSubmitCommand(cmdFactory cmdutil.Factory, streams genericclioptions.I
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", "0", "number of GPUs in each worker group replica")
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "print the generated YAML instead of creating the cluster. Only works when filename is not provided")
cmd.Flags().BoolVarP(&options.verbose, "verbose", "v", false, "Passing the '--verbose' flag to the 'ray job submit' command")
cmd.Flags().BoolVar(&options.shutdownAfterJobFinishes, "shutdown-after-job-finishes", false, "Shutdown the cluster after the job finishes")
cmd.Flags().Int32Var(&options.ttlSecondsAfterFinished, "ttl-seconds-after-finished", 0, "TTL seconds after finished. Only works when filename is not provided")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still works when filename is provided right?

Copy link
Contributor Author

@CheyuWu CheyuWu May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no, however I will make sure it works in the next commit.


return cmd
}
Expand Down Expand Up @@ -254,8 +258,17 @@ func (options *SubmitJobOptions) Validate() error {
}
options.runtimeEnvJson = string(runtimeJson)
}

if !options.RayJob.Spec.ShutdownAfterJobFinishes && options.RayJob.Spec.TTLSecondsAfterFinished != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR, but when I was testing this change, I ran kubectl ray job submit -f ray-job.sample.yaml --working-dir . --shutdown-after-job-finishes --ttl-seconds-after-finished 600 -- python /home/ray/samples/sample_code.py and wondered why the new flags weren't used. I had to look at this source to understand -f/--filename makes the command ignore many of these other flags.

Copy link
Member

@MortalHappiness MortalHappiness May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I think we should take the filename as a base, and flags should override the values in the file.

I created an issue to track this #3639

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of kubectl ray create cluster is to error with an actionable error message if declarative file and CLI flags are used together. We can consider doing that here too if we think it's not worth it to have override logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth overriding the values in the file. I will change this.

return fmt.Errorf("ttl-seconds-after-finished is only supported when shutdown-after-job-finishes is set to true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to use the YAML field names ttlSecondsAfterFinished and shutdownAfterJobFinishes here

Suggested change
return fmt.Errorf("ttl-seconds-after-finished is only supported when shutdown-after-job-finishes is set to true")
return fmt.Errorf("ttlSecondsAfterFinished is only supported when shutdownAfterJobFinishes is 'true'")

}

Comment on lines +262 to +265
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement validation logic when --filename is provided

if shutdownAfterJobFinishes is False, then ttlSecondsAfterFinished must be 0 or unset.

} else if strings.TrimSpace(options.rayjobName) == "" {
return fmt.Errorf("Must set either yaml file (--filename) or set Ray job name (--name)")
} else if options.fileName == "" {
if !options.shutdownAfterJobFinishes && options.ttlSecondsAfterFinished != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check ttlSecondsAfterFinished ≥ 0 in both cases? Right now this command will create a RayJob with ttlSecondsAfterFinished: -10

kubectl ray --context gke_kubeflow-platform_europe-west4-b_ml-compute-1 -n hyperkube job submit --name dxia-test --shutdown-after-job-finishes --ttl-seconds-after-finished -10 --working-dir . --dry-run  -- python /home/ray/samples/sample_code.py | grep -B 1 ttlSeconds
  submissionMode: InteractiveMode
  ttlSecondsAfterFinished: -10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix this, thx

return fmt.Errorf("ttl-seconds-after-finished is only supported when shutdown-after-job-finishes is set to true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("ttl-seconds-after-finished is only supported when shutdown-after-job-finishes is set to true")
return fmt.Errorf("--ttl-seconds-after-finished is only supported when --shutdown-after-job-finishes is set to true")

}
Comment on lines +268 to +271
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement validation logic when --filename is not provided

if shutdownAfterJobFinishes is False, then ttlSecondsAfterFinished must be 0 or unset.

}

if options.workingDir == "" {
Expand Down Expand Up @@ -289,9 +302,11 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
if options.fileName == "" {
// Genarate the Ray job.
rayJobObject := generation.RayJobYamlObject{
RayJobName: options.rayjobName,
Namespace: options.namespace,
SubmissionMode: "InteractiveMode",
RayJobName: options.rayjobName,
Namespace: options.namespace,
ShutdownAfterJobFinishes: options.shutdownAfterJobFinishes,
TTLSecondsAfterFinished: options.ttlSecondsAfterFinished,
SubmissionMode: "InteractiveMode",
// Prior to kuberay 1.2.2, the entry point is required. To maintain
// backwards compatibility with 1.2.x, we submit the entry point
// here, even though it will be ignored.
Expand Down
71 changes: 70 additions & 1 deletion kubectl-plugin/pkg/cmd/job/job_submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRayJobSubmitComplete(t *testing.T) {
assert.Equal(t, "fake/path/to/env/yaml", fakeSubmitJobOptions.runtimeEnv)
}

func TestRayJobSubmitValidate(t *testing.T) {
func TestRayJobSubmitWithYamlValidate(t *testing.T) {
Copy link
Contributor Author

@CheyuWu CheyuWu May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split TestRayJobSubmitValidate into two test cases: one for YAML-based submission and one for non-YAML-based submission.

testStreams, _, _, _ := genericclioptions.NewTestIOStreams()
cmdFactory := cmdutil.NewFactory(genericclioptions.NewConfigFlags(true))

Expand Down Expand Up @@ -71,6 +71,29 @@ spec:
submissionMode: 'InteractiveMode'
backoffLimit: 0`,
},
{
name: "shutdownAfterJobFinishes is false and ttlSecondsAfterFinished is not zero",
yamlContent: `apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
spec:
shutdownAfterJobFinishes: false
ttlSecondsAfterFinished: 10
submissionMode: 'InteractiveMode'`,
expectError: "ttl-seconds-after-finished is only supported when shutdown-after-job-finishes is set to true",
},
{
name: "shutdownAfterJobFinishes is false and ttlSecondsAfterFinished is not zero",
yamlContent: `apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
spec:
shutdownAfterJobFinishes: true
ttlSecondsAfterFinished: 10
submissionMode: 'InteractiveMode'`,
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -99,6 +122,52 @@ spec:
}
}

func TestRayJobSubmitWithoutYamlValidate(t *testing.T) {
Copy link
Contributor Author

@CheyuWu CheyuWu May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split TestRayJobSubmitValidate into two test cases: one for YAML-based submission and one for non-YAML-based submission.

testStreams, _, _, _ := genericclioptions.NewTestIOStreams()
cmdFactory := cmdutil.NewFactory(genericclioptions.NewConfigFlags(true))

test := []struct {
name string
rayjobName string
expectError string
ttlSecondsAfterFinished int32
shutdownAfterJobFinishes bool
}{
{
name: "shutdownAfterJobFinishes is false and ttlSecondsAfterFinished is not zero",
rayjobName: "rayjob-sample",
shutdownAfterJobFinishes: false,
ttlSecondsAfterFinished: 10,
expectError: "ttl-seconds-after-finished is only supported when shutdown-after-job-finishes is set to true",
},
{
name: "shutdownAfterJobFinishes is true and ttlSecondsAfterFinished is not zero",
rayjobName: "rayjob-sample",
shutdownAfterJobFinishes: true,
ttlSecondsAfterFinished: 10,
},
}

for _, tc := range test {
t.Run(tc.name, func(t *testing.T) {
opts := &SubmitJobOptions{
cmdFactory: cmdFactory,
ioStreams: &testStreams,
rayjobName: tc.rayjobName,
workingDir: "Fake/File/Path",
shutdownAfterJobFinishes: tc.shutdownAfterJobFinishes,
ttlSecondsAfterFinished: tc.ttlSecondsAfterFinished,
}
err := opts.Validate()
if tc.expectError != "" {
require.EqualError(t, err, tc.expectError)
} else {
require.NoError(t, err)
}
})
}
}

func TestDecodeRayJobYaml(t *testing.T) {
rayjobtmpfile, err := os.CreateTemp("./", "rayjob-temp-*.yaml")
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions kubectl-plugin/pkg/util/generation/generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type RayJobYamlObject struct {
SubmissionMode string
Entrypoint string
RayClusterConfig
TTLSecondsAfterFinished int32
ShutdownAfterJobFinishes bool
}

func (rayClusterConfig *RayClusterConfig) GenerateRayClusterApplyConfig() *rayv1ac.RayClusterApplyConfiguration {
Expand All @@ -138,6 +140,8 @@ func (rayJobObject *RayJobYamlObject) GenerateRayJobApplyConfig() *rayv1ac.RayJo
WithSpec(rayv1ac.RayJobSpec().
WithSubmissionMode(rayv1.JobSubmissionMode(rayJobObject.SubmissionMode)).
WithEntrypoint(rayJobObject.Entrypoint).
WithTTLSecondsAfterFinished(rayJobObject.TTLSecondsAfterFinished).
WithShutdownAfterJobFinishes(rayJobObject.ShutdownAfterJobFinishes).
WithRayClusterSpec(rayJobObject.generateRayClusterSpec()))

return rayJobApplyConfig
Expand Down
14 changes: 9 additions & 5 deletions kubectl-plugin/pkg/util/generation/generation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ func TestGenerateRayClusterApplyConfig(t *testing.T) {

func TestGenerateRayJobApplyConfig(t *testing.T) {
testRayJobYamlObject := RayJobYamlObject{
RayJobName: "test-ray-job",
Namespace: "default",
SubmissionMode: "InteractiveMode",
RayJobName: "test-ray-job",
Namespace: "default",
SubmissionMode: "InteractiveMode",
TTLSecondsAfterFinished: 100,
ShutdownAfterJobFinishes: true,
RayClusterConfig: RayClusterConfig{
RayVersion: ptr.To(util.RayVersion),
Image: ptr.To(util.RayImage),
Expand Down Expand Up @@ -204,8 +206,10 @@ func TestGenerateRayJobApplyConfig(t *testing.T) {
Namespace: ptr.To("default"),
},
Spec: &rayv1ac.RayJobSpecApplyConfiguration{
SubmissionMode: ptr.To(rayv1.JobSubmissionMode(testRayJobYamlObject.SubmissionMode)),
Entrypoint: ptr.To(""),
SubmissionMode: ptr.To(rayv1.JobSubmissionMode(testRayJobYamlObject.SubmissionMode)),
Entrypoint: ptr.To(""),
TTLSecondsAfterFinished: ptr.To(int32(100)),
ShutdownAfterJobFinishes: ptr.To(true),
RayClusterSpec: &rayv1ac.RayClusterSpecApplyConfiguration{
RayVersion: ptr.To(util.RayVersion),
HeadGroupSpec: &rayv1ac.HeadGroupSpecApplyConfiguration{
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/controllers/ray/utils/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
}

if !rayJob.Spec.ShutdownAfterJobFinishes && rayJob.Spec.TTLSecondsAfterFinished != 0 {
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false cannot have TTLSecondsAfterFinished is non-zero")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false cannot have TTLSecondsAfterFinished is non-zero")
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false cannot have TTLSecondsAfterFinished")

}

Comment on lines +147 to +150
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement validation logic in ray operator

if shutdownAfterJobFinishes is False, then ttlSecondsAfterFinished must be 0 or unset.

isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
if rayJob.Spec.Suspend && isClusterSelectorMode {
return fmt.Errorf("the ClusterSelector mode doesn't support the suspend operation")
Expand Down
18 changes: 18 additions & 0 deletions ray-operator/controllers/ray/utils/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,24 @@ func TestValidateRayJobSpec(t *testing.T) {
},
expectError: false,
},
{
name: "ShutdownAfterJobFinishes is false and TTLSecondsAfterFinished is not zero",
spec: rayv1.RayJobSpec{
ShutdownAfterJobFinishes: false,
TTLSecondsAfterFinished: 5,
RayClusterSpec: createBasicRayClusterSpec(),
},
expectError: true,
},
{
name: "ShutdownAfterJobFinishes is true and TTLSecondsAfterFinished is not zero",
spec: rayv1.RayJobSpec{
ShutdownAfterJobFinishes: true,
TTLSecondsAfterFinished: 5,
RayClusterSpec: createBasicRayClusterSpec(),
},
expectError: false,
},
}

for _, tt := range tests {
Expand Down
Loading