Skip to content

Commit

Permalink
[RayJob]: Add RayJob with RayCluster spec e2e test (ray-project#1636)
Browse files Browse the repository at this point in the history
This PR adds e2e test following up ray-project#1539 and leverages ray-project#1575.

It also factorises configuration across the RayJob e2e tests.
  • Loading branch information
astefanutti authored and blublinsky committed Nov 16, 2023
1 parent d4fc840 commit afda7a3
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 166 deletions.
166 changes: 4 additions & 162 deletions ray-operator/test/e2e/rayjob_cluster_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (

. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand All @@ -21,21 +19,7 @@ func TestRayJobWithClusterSelector(t *testing.T) {
namespace := test.NewTestNamespace()

// Job scripts
jobs := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: "jobs",
Namespace: namespace.Name,
},
BinaryData: map[string][]byte{
"counter.py": ReadFile(test, "counter.py"),
"fail.py": ReadFile(test, "fail.py"),
},
Immutable: Ptr(true),
}
jobs := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), jobs, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -50,109 +34,7 @@ func TestRayJobWithClusterSelector(t *testing.T) {
Name: "raycluster",
Namespace: namespace.Name,
},
Spec: rayv1.RayClusterSpec{
RayVersion: GetRayVersion(),
HeadGroupSpec: rayv1.HeadGroupSpec{
RayStartParams: map[string]string{
"dashboard-host": "0.0.0.0",
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Image: GetRayImage(),
Ports: []corev1.ContainerPort{
{
ContainerPort: 6379,
Name: "gcs",
},
{
ContainerPort: 8265,
Name: "dashboard",
},
{
ContainerPort: 10001,
Name: "client",
},
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "ray stop"},
},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
corev1.ResourceMemory: resource.MustParse("1G"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("2G"),
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "jobs",
MountPath: "/home/ray/jobs",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "jobs",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: jobs.Name,
},
},
},
},
},
},
},
},
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: Ptr(int32(1)),
MinReplicas: Ptr(int32(1)),
MaxReplicas: Ptr(int32(1)),
GroupName: "small-group",
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-worker",
Image: GetRayImage(),
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "ray stop"},
},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
corev1.ResourceMemory: resource.MustParse("1G"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("1G"),
},
},
},
},
},
},
},
},
},
Spec: *newRayClusterSpec(mountConfigMap(jobs, "/home/ray/jobs")),
}
rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Create(test.Ctx(), rayCluster, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -185,27 +67,7 @@ env_vars:
counter_name: test_counter
`,
ShutdownAfterJobFinishes: false,
SubmitterPodTemplate: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
Image: GetRayImage(),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("500Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
SubmitterPodTemplate: jobSubmitterPodTemplate(),
},
}
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
Expand Down Expand Up @@ -240,27 +102,7 @@ env_vars:
},
Entrypoint: "python /home/ray/jobs/fail.py",
ShutdownAfterJobFinishes: false,
SubmitterPodTemplate: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
Image: GetRayImage(),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("500Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
SubmitterPodTemplate: jobSubmitterPodTemplate(),
},
}
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
Expand Down
131 changes: 131 additions & 0 deletions ray-operator/test/e2e/rayjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package e2e

import (
"testing"
"time"

. "github.com/onsi/gomega"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayJob(t *testing.T) {
test := With(t)

// Create a namespace
namespace := test.NewTestNamespace()

// Job scripts
jobs := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Create(test.Ctx(), jobs, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)

test.T().Run("Successful RayJob", func(t *testing.T) {
// RayJob
rayJob := &rayv1.RayJob{
TypeMeta: metav1.TypeMeta{
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
},
ObjectMeta: metav1.ObjectMeta{
Name: "counter",
Namespace: namespace.Name,
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: newRayClusterSpec(mountConfigMap(jobs, "/home/ray/jobs")),
Entrypoint: "python /home/ray/jobs/counter.py",
RuntimeEnvYAML: `
env_vars:
counter_name: test_counter
`,
ShutdownAfterJobFinishes: true,
SubmitterPodTemplate: jobSubmitterPodTemplate(),
},
}
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))

// Assert the RayJob has completed successfully
test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))

// And the RayJob deployment status is updated accordingly
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))

// Refresh the RayJob status
rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name)

// Assert the RayCluster has been torn down
_, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayJob.Status.RayClusterName, metav1.GetOptions{})
test.Expect(err).To(MatchError(k8serrors.NewNotFound(rayv1.Resource("rayclusters"), rayJob.Status.RayClusterName)))

// Delete the RayJob
err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

// Assert the submitter Job has been cascade deleted
test.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty())
})

test.T().Run("Failing RayJob without cluster shutdown after finished", func(t *testing.T) {
// RayJob
rayJob := &rayv1.RayJob{
TypeMeta: metav1.TypeMeta{
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
},
ObjectMeta: metav1.ObjectMeta{
Name: "fail",
Namespace: namespace.Name,
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: newRayClusterSpec(mountConfigMap(jobs, "/home/ray/jobs")),
Entrypoint: "python /home/ray/jobs/fail.py",
ShutdownAfterJobFinishes: false,
SubmitterPodTemplate: jobSubmitterPodTemplate(),
},
}
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

test.T().Logf("Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
test.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))

// Assert the Ray job has failed
test.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed)))

// And the RayJob deployment status is updated accordingly
test.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name), 10*time.Second).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning)))

// Refresh the RayJob status
rayJob = GetRayJob(test, rayJob.Namespace, rayJob.Name)

// Delete the RayJob
err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

// Assert the RayCluster has been cascade deleted
test.Eventually(NotFound(RayClusterOrError(test, namespace.Name, rayJob.Status.RayClusterName))).
Should(BeTrue())

// Assert the submitter Job has been cascade deleted
test.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty())
})
}
Loading

0 comments on commit afda7a3

Please sign in to comment.