Skip to content

[kubectl-plugin] Support node selectors for kubectl ray job submit #3562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

CheyuWu
Copy link
Contributor

@CheyuWu CheyuWu commented May 7, 2025

Why are these changes needed?

Support setting node selectors for kubectl ray job submit

Manual Testing

Here, I demonstrate the use of --worker-node-selectors and --head-node-selectors on GKE.

  • The worker pod is Pending because my_script.py doesn’t require GPU execution.
  • The head pod is also pending because I mistakenly assigned the GPU node to the head pod.

Use both --head-node-selectors and --worker-node-selectors

$ kubectl ray job submit --name rayjob-sample --working-dir . --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4 --worker-node-selectors cloud.google.com/gke-accelerator=nvidia-l4 -- python my_script.py

image

Use only --head-node-selectors

$ kubectl ray job submit --name rayjob-sample --working-dir .  --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4 -- python my_script.py

image

Use only --worker-node-selectors

$ kubectl ray job submit --name rayjob-sample --working-dir .  --worker-node-selectors cloud.google.com/gke-accelerator=nvidia-l4 -- python my_script.py

image

Related issue number

Closes #3550

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@CheyuWu CheyuWu force-pushed the feat/rayjob/nodeSelectors branch 2 times, most recently from a95d6ca to ac87a4d Compare May 7, 2025 18:26
@CheyuWu CheyuWu changed the title [kubectl-plugin] Support node selectors for kubectl ray job submit [need to test][kubectl-plugin] Support node selectors for kubectl ray job submit May 7, 2025
@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 8, 2025

Hi @kevin85421 PTAL

@CheyuWu CheyuWu changed the title [need to test][kubectl-plugin] Support node selectors for kubectl ray job submit [kubectl-plugin] Support node selectors for kubectl ray job submit May 8, 2025
MortalHappiness
MortalHappiness previously approved these changes May 8, 2025
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add any tests?

@kevin85421
Copy link
Member

also cc @andrewsykim for review.

@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 11, 2025

Hi @kevin85421
I have added test cases for the RayJob submit. If any additional test cases are needed, please let me know.

@@ -96,6 +98,9 @@ var (
# Submit generated Ray job with default values and with runtime Env file and working directory
kubectl ray job submit --name rayjob-sample --working-dir /path/to/working-dir/ --runtime-env /runtimeEnv.yaml -- python my_script.py

# Submit Ray job on GCP with GPU node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature isn't just for GCP, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — this feature isn't limited to GCP. My mistake on that part.

@@ -162,6 +167,8 @@ func NewJobSubmitCommand(cmdFactory cmdutil.Factory, streams genericclioptions.I
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", "0", "number of GPUs in each worker group replica")
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "print the generated YAML instead of creating the cluster. Only works when filename is not provided")
cmd.Flags().BoolVarP(&options.verbose, "verbose", "v", false, "Passing the '--verbose' flag to the 'ray job submit' command")
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to all head pods in the cluster (e.g. --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to all head pods in the cluster (e.g. --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")
cmd.Flags().StringToStringVar(&options.headNodeSelectors, "head-node-selectors", nil, "Node selectors to apply to the head pod in the cluster (e.g. --head-node-selectors cloud.google.com/gke-accelerator=nvidia-l4,cloud.google.com/gke-nodepool=my-node-pool)")

@@ -557,6 +566,22 @@ func (options *SubmitJobOptions) raySubmitCmd() ([]string, error) {
raySubmitCmd = append(raySubmitCmd, "--verbose")
}

if len(options.headNodeSelectors) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, does ray job submit support these two options?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @MortalHappiness Would you mind shepherding this PR and making sure it's well tested? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is already well tested because this PR reuses the generation.WorkerGroup object, which already has tests for NodeSelectors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MortalHappiness L569 - L583 adds --head-node-selectors and --worker-node-selectors to ray job submit command if I understand correctly. Does ray job submit support these two options?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, ray job submit does not have those two flags. But I think it's worth adding them, because it's common for data scientists to select GPU nodes. We don't need to make all the flags identical to ray job submit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if it doesn't exist, the ray job submit command will fail, right?

@MortalHappiness MortalHappiness dismissed their stale review May 12, 2025 13:25

Require to add more tests.

noWait bool
dryRun bool
verbose bool
cmdFactory cmdutil.Factory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chiayi @MortalHappiness @davidxia we should probably figure out a way to share the same struct we use for kubectl ray create cluster so we don't have to duplicate the same arguments each time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, can either be done in this PR or follow-up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do it in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a follow up issue #3582

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 13, 2025

Hi @kevin85421, I have addressed the issues regarding this #3562 (comment). Need to wait for @MortalHappiness to respond.

@MortalHappiness
Copy link
Member

Hi @kevin85421, I have addressed the issues regarding this #3562 (comment). Need to wait for @MortalHappiness to respond.

What did you change? The comment you mentioned looks like a question and not a request for change.

@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 13, 2025

Hi @kevin85421, I have addressed the issues regarding this #3562 (comment). Need to wait for @MortalHappiness to respond.

What did you change? The comment you mentioned looks like a question and not a request for change.

Sorry, this is my fault. I type the wrong sentence

@MortalHappiness
Copy link
Member

@CheyuWu After offline discussion with @kevin85421, please add an e2e test to kubectl-plugin/test/e2e/kubectl_ray_job_submit_test.go to cover this case. Thanks.

@CheyuWu CheyuWu force-pushed the feat/rayjob/nodeSelectors branch 2 times, most recently from 571590a to ce12965 Compare May 15, 2025 16:49
@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 15, 2025

Hi @kevin85421, @MortalHappiness, I have addressed the issue. PTAL

  • Add e2e test for node selectors
  • Revert the wrong rayjob submit with node selectors
  • Manually testing the node selectors command with
$ cd kubectl-plugin/test/e2e
$ kubectl ray job submit --name rayjob-sample --runtime-env testdata/rayjob-submit-working-dir/runtime-env-sample.yaml --head-cpu 1 --head-memory 2Gi --worker-cpu 1 --worker-memory 2Gi --head-node-selectors kubernetes.io/os=linux --worker-node-selectors kubernetes.io/os=linux -- python entrypoint-python-sample.py

image

@kevin85421
Copy link
Member

cc @MortalHappiness

@@ -105,6 +107,9 @@ var (
# Submit generated Ray job with default values and with runtime Env file and working directory
kubectl ray job submit --name rayjob-sample --working-dir /path/to/working-dir/ --runtime-env /runtimeEnv.yaml -- python my_script.py

# Submit a Ray job with specific head-node-selectors and worker-node-selectors using kubectl ray job submit
kubectl ray job submit --name rayjob-sample --working-dir /path/to/working-dir/ --head-node-selectors kubernetes.io/os=linux --worker-node-selectors kubernetes.io/os=linux -- python my_script.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this example, we can still show the user how to select GPU nodes instead of selecting Linux nodes.

output, err = cmd.CombinedOutput()
Expect(err).ToNot(HaveOccurred())

Expect(string(output)).To(Equal("Complete"))
})

It("succeed in submitting RayJob with headNodeSelectors and workerNodeSelectors", func() {
killKubectlCmd := exec.Command("pkill", "-9", "kubectl")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s this for? This seems like a risky anti-pattern to me, as it might kill processes not created by these tests. Do we really need this?

cc @MortalHappiness

Copy link
Contributor Author

@CheyuWu CheyuWu May 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is necessary, because if we remove the code and run the test, the error will occur

Waiting for portforwarding...Port Forwarding service rayjob-sample-sm8j9-head-svc
Unable to listen on port 8265: Listeners failed to create with the following errors: [unable to create listener: Error listen tcp4 127.0.0.1:8265: bind: address already in use unable to create listener: Error listen tcp6 [::1]:8265: bind: address already in use]
error: unable to listen on any of the requested ports: [{8265 8265}]

Copy link
Contributor Author

@CheyuWu CheyuWu May 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, we just only need to kill the port-forwarding

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #3124. We want to make sure no port-forwarding processes alive.

"python",
entrypointSampleFileName,
)
output, err := cmd.CombinedOutput()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a blocking operation?

Copy link
Contributor Author

@CheyuWu CheyuWu May 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's

// Retrieve the Job ID from the output
regexExp := regexp.MustCompile(`'([^']*raysubmit[^']*)'`)
matches := regexExp.FindStringSubmatch(string(output))
Expect(len(matches)).To(BeNumerically(">=", 2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why >= 2? Can you add a utility function for it?

Copy link
Contributor Author

@CheyuWu CheyuWu May 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This will retrieve the Job ID from the output and check whether it exists.
    image

Perhaps, I can use >=1 instead

  1. OK, no problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to use >=2, I will change it to >=1

matches := regexExp.FindStringSubmatch(string(output))
Expect(len(matches)).To(BeNumerically(">=", 2))
cmdOutputJobID := matches[1]
// Use kubectl to check status of the rayjob
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can you only execute the kubectl command once and get the whole CR instead of executing it multiple times?

  2. I also open a follow up issue to avoid using kubectl: [kubectl-plugin] Use a more Golang-native approach to retrieve the CR status for testing #3626

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@CheyuWu CheyuWu force-pushed the feat/rayjob/nodeSelectors branch from 2f989d2 to cb923e4 Compare May 18, 2025 18:41
@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 18, 2025

Hi @kevin85421, I have addressed this comment. PTAL

  1. change killKubectlCmd := exec.Command("pkill", "-9", "kubectl") to KillPortForwardOn8265()
  2. Handle the blocking operation
  3. Use >= 1 instead when getting the rayjob ID, and add a utility function for it. (comment)[https://github.com/[kubectl-plugin] Support node selectors for kubectl ray job submit #3562#discussion_r2094578737]
  4. Execute the kubectl command once and get the whole CR instead of executing it multiple times

@kevin85421
Copy link
Member

@CheyuWu can you fix the CI error? It seems to be related to your PR.

killKubectlCmd := exec.Command("pkill", "-9", "kubectl")
_ = killKubectlCmd.Run()
cmd := exec.Command("kubectl", "ray", "job", "submit", "--namespace", namespace, "-f", rayJobFilePath, "--working-dir", kubectlRayJobWorkingDir, "--", "python", entrypointSampleFileName)
// To avoid port conflict, kill any existing port-forward process on 8265
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand why a port conflict is possible? Could it be a side effect of other tests, or are other users running other workloads? If it's the latter, maybe we shouldn't kill the process.

Copy link
Contributor Author

@CheyuWu CheyuWu May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand why a port conflict is possible? Could it be a side effect of other tests, or are other users running other workloads? If it's the latter, maybe we shouldn't kill the process.

The reason for the port conflict is that the test case — succeed in forwarding RayCluster and should be able to cancel in kubectl-plugin/test/e2e/kubectl_ray_session_test.go — does not fully clean up its child processes with os.Interrupt. It only terminates the main process.

As a result, when I run the command lsof -i :8265 after the test case finishes, there are still child processes remain:

COMMAND    PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
kubectl 322356 user    8u  IPv4 2180543      0t0  TCP localhost:8265 (LISTEN)
kubectl 322356 user    9u  IPv6 2180544      0t0  TCP ip6-localhost:8265 (LISTEN)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, in that case, we need to fix the tests which leak the processes. Could you openan issue to track the progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already fixed this in this PR. Do I need to revert it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required to make this PR's CI passes? If yes, we don't need to split them into different PRs.

Copy link
Contributor Author

@CheyuWu CheyuWu May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide not to address the port-forwarding issue in this PR, using killKubectlCmd := exec.Command("pkill", "-9", "kubectl") will be necessary to ensure the ci passes.

If we want to remove killKubectlCmd := exec.Command("pkill", "-9", "kubectl"), we cannot split them into different PRs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I am fine to separate them into different PRs or in a single PR. It depends on you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can handle them in this PR

})
})

// extractRayJobID extracts the Ray Job ID from the kubectl ray job submit output.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// extractRayJobID extracts the Ray Job ID from the kubectl ray job submit output.
// `extractRayJobID` extracts the Ray Job ID from the output of `kubectl ray job submit`.

return ""
}

func assertRayJobSucceeded(ctx context.Context, namespace, jobName, cmdOutputJobID string) (rayjob rayv1.RayJob) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not limit the function to specific status only.

Suggested change
func assertRayJobSucceeded(ctx context.Context, namespace, jobName, cmdOutputJobID string) (rayjob rayv1.RayJob) {
func getAndCheckRayJob(ctx context.Context, namespace, name, expectedJobID, expectedJobStatus, expectedJobDeploymentStatus string) (rayjob rayv1.RayJob) {

cmd := exec.Command("kubectl", "ray", "job", "submit", "--namespace", namespace, "-f", rayJobFilePath, "--working-dir", kubectlRayJobWorkingDir, "--", "python", entrypointSampleFileName)
// To avoid port conflict, kill any existing port-forward process on 8265
KillPortForwardOn8265()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the motivation to add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to ensure the operations are not stuck in some unexpected state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s necessary to add a timeout in these tests. The execution time of kubectl ray job submit can vary significantly across different environments. For example, if some developers don’t have high-speed internet, it might take more than 3 minutes to execute the command.

In addition, our CI has out-of-box timeout go test -timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Don't add timeout.

… change the comment, and remove KillPortForwardOn8265 which is not necessary anymore
@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 19, 2025

@CheyuWu can you fix the CI error? It seems to be related to your PR.

Sorry about this. This won't happened again

}
// Check if the port-forwarding process is still running
_, err = exec.CommandContext(ctx, "lsof", "-i", ":8265").CombinedOutput()
Expect(err).To(HaveOccurred())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an error occurs, it means the port-forwarding does not exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these lines be put in the defer above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MortalHappiness would you mind reviewing this PR again especially the ray session part? Thanks!

err = json.Unmarshal(output, &rayJob)
Expect(err).ToNot(HaveOccurred())

// Retrieve Job ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments don't provide any additional information.

Suggested change
// Retrieve Job ID


// Retrieve Job ID
Expect(rayJob.Status.JobId).To(Equal(expectedJobID))
// Retrieve Job Status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Retrieve Job Status

Expect(rayJob.Status.JobId).To(Equal(expectedJobID))
// Retrieve Job Status
Expect(string(rayJob.Status.JobStatus)).To(Equal(expectedJobStatus))
// Retrieve Job Deployment Status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Retrieve Job Deployment Status

cmd := exec.Command("kubectl", "ray", "job", "submit", "--namespace", namespace, "-f", rayJobFilePath, "--working-dir", kubectlRayJobWorkingDir, "--", "python", entrypointSampleFileName)
// To avoid port conflict, kill any existing port-forward process on 8265
KillPortForwardOn8265()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s necessary to add a timeout in these tests. The execution time of kubectl ray job submit can vary significantly across different environments. For example, if some developers don’t have high-speed internet, it might take more than 3 minutes to execute the command.

In addition, our CI has out-of-box timeout go test -timeout.

killKubectlCmd := exec.Command("pkill", "-9", "kubectl")
_ = killKubectlCmd.Run()
cmd := exec.Command("kubectl", "ray", "job", "submit", "--namespace", namespace, "-f", rayJobFilePath, "--working-dir", kubectlRayJobWorkingDir, "--", "python", entrypointSampleFileName)
// To avoid port conflict, kill any existing port-forward process on 8265
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, in that case, we need to fix the tests which leak the processes. Could you openan issue to track the progress?


// Use kubectl to check status of the rayjob
// Retrieve Job ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! The code becomes much cleaner!

}
case <-time.After(3 * time.Second):
// force kill the process if it doesn't exit within the timeout
_ = syscall.Kill(-pgid, syscall.SIGKILL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-pgid means all process group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to delete this line and rely on the defer above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. This is not necessary.

@CheyuWu
Copy link
Contributor Author

CheyuWu commented May 19, 2025

Hi @kevin85421, PTAL

  • I have addressed the side effect in kubectl-plugin/test/e2e/kubectl_ray_session_test.go`, which could cause a port-forwarding conflict. comment
  1. Use process groups to ensure that both the main process and child processes receive the interrupt signal.
  2. Add lsof -i :8265 to verify whether port-forwarding is active or not.
  • Replaced some exec.Command calls with exec.CommandContext to prevent commands from getting stuck in unexpected states.
  • Move func getAndCheckRayJob to support.go

@@ -39,58 +43,73 @@ var _ = Describe("Calling ray plugin `session` command", Ordered, func() {

// Send a request to localhost:8265, it should succeed
Eventually(func() error {
_, err := exec.Command("curl", "http://localhost:8265").CombinedOutput()
_, err := exec.CommandContext(ctx, "curl", "http://localhost:8265").CombinedOutput()
return err
}, 3*time.Second, 500*time.Millisecond).ShouldNot(HaveOccurred())

// Send a signal to cancel the command
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Send a signal to cancel the command

}
// Check if the port-forwarding process is still running
_, err = exec.CommandContext(ctx, "lsof", "-i", ":8265").CombinedOutput()
Expect(err).To(HaveOccurred())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these lines be put in the defer above?

}
case <-time.After(3 * time.Second):
// force kill the process if it doesn't exit within the timeout
_ = syscall.Kill(-pgid, syscall.SIGKILL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to delete this line and rely on the defer above?


err := sessionCmd.Start()
Expect(err).NotTo(HaveOccurred())

// Send a request to localhost:8265, it should succeed
Eventually(func() error {
_, err := exec.Command("curl", "http://localhost:8265").CombinedOutput()
_, err := exec.CommandContext(ctx, "curl", "http://localhost:8265").CombinedOutput()
Copy link
Member

@MortalHappiness MortalHappiness May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. If you pass ctx to multiple commands, they'll share the same context and have a total 1 minute timeout. So if you pass ctx to 7 commands and each command takes 9 seconds, then the last command will timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will fix this.

Comment on lines 58 to 65
func getAndCheckRayJob(
ctx context.Context,
namespace,
name,
expectedJobID,
expectedJobStatus,
expectedJobDeploymentStatus string,
) (rayjob rayv1.RayJob) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a helper function, call GinkgoHelper() inside this function.

cmd := exec.Command("kubectl", "ray", "job", "submit", "--namespace", namespace, "-f", rayJobFilePath, "--working-dir", kubectlRayJobWorkingDir, "--", "python", entrypointSampleFileName)
// To avoid port conflict, kill any existing port-forward process on 8265
KillPortForwardOn8265()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Don't add timeout.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] [kubectl-plugin] Support node selectors for kubectl ray job submit
5 participants