Skip to content

Commit

Permalink
first cut at multikueue aw integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss committed Jan 17, 2025
1 parent ae1c9a6 commit 799fea8
Showing 1 changed file with 48 additions and 1 deletion.
49 changes: 48 additions & 1 deletion test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -52,6 +53,7 @@ import (
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
"sigs.k8s.io/kueue/pkg/features"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingaw "sigs.k8s.io/kueue/pkg/util/testingjobs/appwrapper"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob"
Expand All @@ -67,7 +69,8 @@ import (
var defaultEnabledIntegrations sets.Set[string] = sets.New(
"batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster",
"jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob",
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob")
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob",
"codeflare.dev/appwrapper")

var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
Expand Down Expand Up @@ -1056,6 +1059,50 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
})
})

ginkgo.It("Should run an appwrapper on worker if admitted", func() {
aw := testingaw.MakeAppWrapper("aw", managerNs.Name).
Component(testingjob.MakeJob("job-1", managerNs.Name).SetTypeMeta().Parallelism(1).Obj()).
Suspend(false).
Obj()

gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, aw)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(aw.Name, aw.UID), Namespace: managerNs.Name}

admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "aw-0-0",
},
)

admitWorkloadAndCheckWorkerCopies(multikueueAC.Name, wlLookupKey, admission)

ginkgo.By("changing the status of the appwrapper in the worker, updates the manager's appwrappers status", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdAppWrapper := awv1beta2.AppWrapper{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(aw), &createdAppWrapper)).To(gomega.Succeed())
createdAppWrapper.Status.Phase = awv1beta2.AppWrapperRunning
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdAppWrapper)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdAppWrapper := awv1beta2.AppWrapper{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(aw), &createdAppWrapper)).To(gomega.Succeed())
g.Expect(createdAppWrapper.Status.Phase).To(gomega.Equal(awv1beta2.AppWrapperRunning))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("finishing the worker appwrapper, the manager's wl is marked as finished and the worker2 wl removed", func() {
finishJobReason := "AppWrapper finished successfully"
gomega.Eventually(func(g gomega.Gomega) {
createdAppWrapper := awv1beta2.AppWrapper{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(aw), &createdAppWrapper)).To(gomega.Succeed())
createdAppWrapper.Status.Phase = awv1beta2.AppWrapperSucceeded
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdAppWrapper)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey, finishJobReason)
})
})

ginkgo.It("Should not run a MPIJob on worker if set to be managed by external controller", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Expand Down

0 comments on commit 799fea8

Please sign in to comment.