Skip to content

Commit

Permalink
Add the RayJob Multikueue integration tests
Browse files Browse the repository at this point in the history
Fix path for Kuberay CRDs
  • Loading branch information
mszadkow committed Jan 2, 2025
1 parent 1f76919 commit b0dbe2d
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 34 deletions.
138 changes: 113 additions & 25 deletions pkg/util/testingjobs/rayjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,59 @@ func MakeJob(name, ns string) *JobWrapper {
Spec: rayv1.RayJobSpec{
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
RayStartParams: map[string]string{"p1": "v1"},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "head-container",
},
},
HeadGroupSpec: rayv1.HeadGroupSpec{},
WorkerGroupSpecs: make([]rayv1.WorkerGroupSpec, 0, 1),
},
Suspend: true,
},
}}
}

func (j *JobWrapper) RayJobSpecsDefault() *JobWrapper {
j.Spec.RayClusterSpec.HeadGroupSpec = rayv1.HeadGroupSpec{
RayStartParams: map[string]string{"p1": "v1"},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "head-container",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
NodeSelector: map[string]string{},
},
},
}

j.Spec.RayClusterSpec.WorkerGroupSpecs = append(j.Spec.RayClusterSpec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](1),
MinReplicas: ptr.To[int32](0),
MaxReplicas: ptr.To[int32](10),
RayStartParams: map[string]string{"p1": "v1"},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](1),
MinReplicas: ptr.To[int32](0),
MaxReplicas: ptr.To[int32](10),
RayStartParams: map[string]string{"p1": "v1"},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "worker-container",
},
},
},
Name: "worker-container",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
NodeSelector: map[string]string{},
},
Suspend: true,
},
}}
})
return j
}

// Obj returns the inner Job.
Expand Down Expand Up @@ -180,3 +199,72 @@ func (j *JobWrapper) Generation(num int64) *JobWrapper {
func (j *JobWrapper) Clone() *JobWrapper {
return &JobWrapper{*j.DeepCopy()}
}

// Label sets the label key and value
func (j *JobWrapper) Label(key, value string) *JobWrapper {
if j.Labels == nil {
j.Labels = make(map[string]string)
}
j.Labels[key] = value
return j
}

// JobDeploymentStatus sets a deployment status of the job
func (j *JobWrapper) JobDeploymentStatus(ds rayv1.JobDeploymentStatus) *JobWrapper {
j.Status.JobDeploymentStatus = ds
return j
}

// JobStatus sets a status of the job
func (j *JobWrapper) JobStatus(s rayv1.JobStatus) *JobWrapper {
j.Status.JobStatus = s
return j
}

// Request adds a resource request to the default container.
func (j *JobWrapper) Request(rayType rayv1.RayNodeType, r corev1.ResourceName, v string) *JobWrapper {
if rayType == rayv1.HeadNode {
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
} else if rayType == rayv1.WorkerNode {
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
}
return j
}

func (j *JobWrapper) Image(rayType rayv1.RayNodeType, image string, args []string) *JobWrapper {
if rayType == rayv1.HeadNode {
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image = image
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Args = args
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
} else if rayType == rayv1.WorkerNode {
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Image = image
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Args = args
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
}
return j
}

func (j *JobWrapper) Entrypoint(e string) *JobWrapper {
j.Spec.Entrypoint = e
return j
}

func (j *JobWrapper) RayVersion(rv string) *JobWrapper {
j.Spec.RayClusterSpec.RayVersion = rv
return j
}

func (j *JobWrapper) Env(rayType rayv1.RayNodeType, name, value string) *JobWrapper {
if rayType == rayv1.HeadNode {
if j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env == nil {
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0)
}
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value})
} else if rayType == rayv1.WorkerNode {
if j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env == nil {
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0)
}
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value})
}
return j
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ var _ = ginkgo.Describe("Job controller RayCluster for workloads when only jobs
ginkgo.It("Should suspend a cluster if the parent's workload does not exist or is not admitted", func() {
ginkgo.By("Creating the parent job which has a queue name")
parentJob := testingrayjob.MakeJob("parent-job", ns.Name).
RayJobSpecsDefault().
Queue("test").
Suspend(false).
Obj()
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/raycluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator-crds")
)

func TestAPIs(t *testing.T) {
Expand Down
21 changes: 16 additions & 5 deletions test/integration/controller/jobs/rayjob/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
}()

job := testingrayjob.MakeJob(jobName, ns.Name).
RayJobSpecsDefault().
Suspend(false).
WithPriorityClassName(priorityClassName).
Obj()
Expand Down Expand Up @@ -261,6 +262,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
ginkgo.It("A RayJob created in an unmanaged namespace is not suspended and a workload is not created", func() {
ginkgo.By("Creating an unsuspended job without a queue-name in unmanaged-ns")
job := testingrayjob.MakeJob(jobName, "unmanaged-ns").
RayJobSpecsDefault().
Suspend(false).
Obj()
err := k8sClient.Create(ctx, job)
Expand Down Expand Up @@ -303,7 +305,7 @@ var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue

ginkgo.It("Should reconcile jobs only when queue is set", func() {
ginkgo.By("checking the workload is not created when queue name is not set")
job := testingrayjob.MakeJob(jobName, ns.Name).Obj()
job := testingrayjob.MakeJob(jobName, ns.Name).RayJobSpecsDefault().Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name}
createdJob := &rayv1.RayJob{}
Expand Down Expand Up @@ -371,7 +373,10 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O
ginkgo.DescribeTable("Single job at different stages of progress towards completion",
func(podsReadyTestSpec podsReadyTestSpec) {
ginkgo.By("Create a job")
job := testingrayjob.MakeJob(jobName, ns.Name).WithSubmissionMode(rayv1.K8sJobMode).Obj()
job := testingrayjob.MakeJob(jobName, ns.Name).
RayJobSpecsDefault().
WithSubmissionMode(rayv1.K8sJobMode).
Obj()
jobQueueName := "test-queue"
job.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName}
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
Expand Down Expand Up @@ -576,7 +581,9 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())

ginkgo.By("checking a dev job starts")
job := testingrayjob.MakeJob("dev-job", ns.Name).Queue(localQueue.Name).
job := testingrayjob.MakeJob("dev-job", ns.Name).
RayJobSpecsDefault().
Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "3").
RequestWorkerGroup(corev1.ResourceCPU, "4").
Obj()
Expand Down Expand Up @@ -650,7 +657,9 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered

ginkgo.It("Should preempt lower priority rayJobs when resource insufficient", func() {
ginkgo.By("Create a low priority rayJob")
lowPriorityJob := testingrayjob.MakeJob("rayjob-with-low-priority", ns.Name).Queue(localQueue.Name).
lowPriorityJob := testingrayjob.MakeJob("rayjob-with-low-priority", ns.Name).
RayJobSpecsDefault().
Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "1").
RequestWorkerGroup(corev1.ResourceCPU, "2").
Obj()
Expand All @@ -665,7 +674,9 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Create a high priority rayJob which will preempt the lower one")
highPriorityJob := testingrayjob.MakeJob("rayjob-with-high-priority", ns.Name).Queue(localQueue.Name).
highPriorityJob := testingrayjob.MakeJob("rayjob-with-high-priority", ns.Name).
RayJobSpecsDefault().
Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "2").
WithPriorityClassName(priorityClassName).
RequestWorkerGroup(corev1.ResourceCPU, "2").
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/rayjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator-crds")
)

func TestAPIs(t *testing.T) {
Expand Down
54 changes: 54 additions & 0 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -49,13 +50,15 @@ import (
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/features"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob"
testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob"
testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -1442,6 +1445,57 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should run a RayJob on worker if admitted", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "head",
}, kueue.PodSetAssignment{
Name: "workers-group-0",
},
)
rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name).
RayJobSpecsDefault().
WithSubmissionMode(rayv1.UserMode).
Queue(managerLq.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, rayjob)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name}
gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())

admitWorkloadAndCheckWorkerCopies(multikueueAC.Name, wlLookupKey, admission)

ginkgo.By("changing the status of the RayJob in the worker, updates the manager's RayJob status", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusRunning))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("finishing the worker RayJob, the manager's wl is marked as finished and the worker2 wl removed", func() {
finishJobReason := ""
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
createdRayJob.Status.JobStatus = rayv1.JobStatusSucceeded
createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey, finishJobReason)
})
})
})

var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down
14 changes: 14 additions & 0 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
Expand Down Expand Up @@ -95,6 +96,7 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c
DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator"),
filepath.Join("..", "..", "..", "dep-crds", "training-operator-crds"),
filepath.Join("..", "..", "..", "dep-crds", "mpi-operator"),
filepath.Join("..", "..", "..", "dep-crds", "ray-operator-crds"),
},
APIServerFeatureGates: apiFeatureGates,
}
Expand Down Expand Up @@ -206,6 +208,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {

err = workloadmpijob.SetupMPIJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadrayjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

rayJobReconciler := workloadrayjob.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName))
err = rayJobReconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadrayjob.SetupRayJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) {
Expand Down
1 change: 1 addition & 0 deletions test/integration/webhook/jobs/raycluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() {
ginkgo.It("Should not suspend a cluster if the parent's workload exist and is admitted", func() {
ginkgo.By("Creating the parent job which has a queue name")
parentJob := testingrayjob.MakeJob("parent-job", ns.Name).
RayJobSpecsDefault().
Queue("test").
Suspend(false).
Obj()
Expand Down
3 changes: 2 additions & 1 deletion test/integration/webhook/jobs/rayjob_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ var _ = ginkgo.Describe("RayJob Webhook", func() {
})

ginkgo.It("the creation doesn't succeed if the queue name is invalid", func() {
job := testingjob.MakeJob("rayjob", ns.Name).Queue("indexed_job").Obj()
job := testingjob.MakeJob("rayjob", ns.Name).Queue("indexed_job").RayJobSpecsDefault().Obj()
err := k8sClient.Create(ctx, job)
gomega.Expect(err).Should(gomega.HaveOccurred())
gomega.Expect(err).Should(testing.BeForbiddenError())
})

ginkgo.It("invalid configuration shutdown after job finishes", func() {
job := testingjob.MakeJob("rayjob", ns.Name).
RayJobSpecsDefault().
Queue("queue-name").
ShutdownAfterJobFinishes(false).
Obj()
Expand Down
Loading

0 comments on commit b0dbe2d

Please sign in to comment.