Skip to content

Commit

Permalink
Add the RayJob Multikueue integration tests
Browse files Browse the repository at this point in the history
Fix path for Kuberay CRDs
  • Loading branch information
mszadkow committed Dec 30, 2024
1 parent ac2d7f8 commit fefb95a
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 28 deletions.
138 changes: 113 additions & 25 deletions pkg/util/testingjobs/rayjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,59 @@ func MakeJob(name, ns string) *JobWrapper {
Spec: rayv1.RayJobSpec{
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
RayStartParams: map[string]string{"p1": "v1"},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "head-container",
},
},
HeadGroupSpec: rayv1.HeadGroupSpec{},
WorkerGroupSpecs: make([]rayv1.WorkerGroupSpec, 1),
},
Suspend: true,
},
}}
}

func (j *JobWrapper) RayJobSpecsDefault() *JobWrapper {
j.Spec.RayClusterSpec.HeadGroupSpec = rayv1.HeadGroupSpec{
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "head-container",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
NodeSelector: map[string]string{},
},
},
}

j.Spec.RayClusterSpec.WorkerGroupSpecs[0] = rayv1.WorkerGroupSpec{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](1),
MinReplicas: ptr.To[int32](0),
MaxReplicas: ptr.To[int32](10),
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](1),
MinReplicas: ptr.To[int32](0),
MaxReplicas: ptr.To[int32](10),
RayStartParams: map[string]string{"p1": "v1"},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "worker-container",
},
},
},
Name: "worker-container",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
NodeSelector: map[string]string{},
},
Suspend: true,
},
}}
}
return j
}

// Obj returns the inner Job.
Expand Down Expand Up @@ -180,3 +199,72 @@ func (j *JobWrapper) Generation(num int64) *JobWrapper {
func (j *JobWrapper) Clone() *JobWrapper {
return &JobWrapper{*j.DeepCopy()}
}

// Label sets the label key and value
func (j *JobWrapper) Label(key, value string) *JobWrapper {
if j.Labels == nil {
j.Labels = make(map[string]string)
}
j.Labels[key] = value
return j
}

// JobDeploymentStatus sets a deployment status of the job
func (j *JobWrapper) JobDeploymentStatus(ds rayv1.JobDeploymentStatus) *JobWrapper {
j.Status.JobDeploymentStatus = ds
return j
}

// JobStatus sets a status of the job
func (j *JobWrapper) JobStatus(s rayv1.JobStatus) *JobWrapper {
j.Status.JobStatus = s
return j
}

// Request adds a resource request to the default container.
func (j *JobWrapper) Request(rayType rayv1.RayNodeType, r corev1.ResourceName, v string) *JobWrapper {
if rayType == rayv1.HeadNode {
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
} else if rayType == rayv1.WorkerNode {
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
}
return j
}

func (j *JobWrapper) Image(rayType rayv1.RayNodeType, image string, args []string) *JobWrapper {
if rayType == rayv1.HeadNode {
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image = image
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Args = args
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
} else if rayType == rayv1.WorkerNode {
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Image = image
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Args = args
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
}
return j
}

func (j *JobWrapper) Entrypoint(e string) *JobWrapper {
j.Spec.Entrypoint = e
return j
}

func (j *JobWrapper) RayVersion(rv string) *JobWrapper {
j.Spec.RayClusterSpec.RayVersion = rv
return j
}

func (j *JobWrapper) Env(rayType rayv1.RayNodeType, name, value string) *JobWrapper {
if rayType == rayv1.HeadNode {
if j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env == nil {
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0)
}
j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value})
} else if rayType == rayv1.WorkerNode {
if j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env == nil {
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0)
}
j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value})
}
return j
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ var _ = ginkgo.Describe("Job controller RayCluster for workloads when only jobs
ginkgo.It("Should suspend a cluster if the parent's workload does not exist or is not admitted", func() {
ginkgo.By("Creating the parent job which has a queue name")
parentJob := testingrayjob.MakeJob("parent-job", ns.Name).
RayJobSpecsDefault().
Queue("test").
Suspend(false).
Obj()
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/raycluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases")
)

func TestAPIs(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/rayjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases")
)

func TestAPIs(t *testing.T) {
Expand Down
53 changes: 53 additions & 0 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -49,13 +50,15 @@ import (
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/features"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob"
testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob"
testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -1442,6 +1445,56 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should run a RayJob on worker if admitted", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "head",
}, kueue.PodSetAssignment{
Name: "workers-group-0",
},
)
rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name).
Queue(managerLq.Name).
RayJobSpecsDefault().
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, rayjob)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name}
gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())

admitWorkloadAndCheckWorkerCopies(multikueueAC.Name, wlLookupKey, admission)

ginkgo.By("changing the status of the RayJob in the worker, updates the manager's RayJob status", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusRunning))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("finishing the worker RayJob, the manager's wl is marked as finished and the worker2 wl removed", func() {
finishJobReason := ""
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
createdRayJob.Status.JobStatus = rayv1.JobStatusSucceeded
createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey, finishJobReason)
})
})
})

var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down
14 changes: 14 additions & 0 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
Expand Down Expand Up @@ -95,6 +96,7 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c
DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator"),
filepath.Join("..", "..", "..", "dep-crds", "training-operator-crds"),
filepath.Join("..", "..", "..", "dep-crds", "mpi-operator"),
filepath.Join("..", "..", "..", "dep-crds", "ray-operator", "crd", "bases"),
},
APIServerFeatureGates: apiFeatureGates,
}
Expand Down Expand Up @@ -206,6 +208,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {

err = workloadmpijob.SetupMPIJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadrayjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

rayJobReconciler := workloadrayjob.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName))
err = rayJobReconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadrayjob.SetupRayJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/webhook/jobs/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
webhookPath = filepath.Join("..", "..", "..", "..", "config", "components", "webhook")
mpiCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "mpi-operator")
jobsetCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "jobset-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases")
kubeflowCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "training-operator-crds")
)

Expand Down

0 comments on commit fefb95a

Please sign in to comment.