Skip to content

Commit

Permalink
Update integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Jan 31, 2025
1 parent 1826b61 commit c69b9fc
Showing 1 changed file with 63 additions and 17 deletions.
80 changes: 63 additions & 17 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
gomegatypes "github.com/onsi/gomega/types"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -909,6 +910,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,

ginkgo.It("Should run a PyTorchJob on worker if admitted", func() {
pyTorchJob := testingpytorchjob.MakePyTorchJob("pytorchjob1", managerNs.Name).
ManagedBy(kueue.MultiKueueControllerName).
Queue(managerLq.Name).
PyTorchReplicaSpecs(
testingpytorchjob.PyTorchReplicaSpecRequirement{
Expand Down Expand Up @@ -987,6 +989,41 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
})
})

ginkgo.It("Should not run a PyTorchJob on worker if set to be managed by wrong external controller", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "launcher",
}, kueue.PodSetAssignment{
Name: "worker",
},
)
pyTorchJob := testingpytorchjob.MakePyTorchJob("pytorchjob-not-managed", managerNs.Name).
Queue(managerLq.Name).
ManagedBy("example.com/other-controller-not-training-operator").
PyTorchReplicaSpecs(
testingpytorchjob.PyTorchReplicaSpecRequirement{
ReplicaType: kftraining.PyTorchJobReplicaTypeMaster,
ReplicaCount: 1,
Name: "pytorchjob-master",
RestartPolicy: "OnFailure",
},
testingpytorchjob.PyTorchReplicaSpecRequirement{
ReplicaType: kftraining.PyTorchJobReplicaTypeWorker,
ReplicaCount: 1,
Name: "pytorchjob-worker",
RestartPolicy: "Never",
},
).
Obj()
ginkgo.By("create a pytorchjob with external managedBy", func() {
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, pyTorchJob)).Should(gomega.Succeed())
})

wlLookupKeyNoManagedBy := types.NamespacedName{Name: workloadpytorchjob.GetWorkloadNameForPyTorchJob(pyTorchJob.Name, pyTorchJob.UID), Namespace: managerNs.Name}
setQuotaReservationInCluster(wlLookupKeyNoManagedBy, admission)
checkingTheWorkloadCreation(wlLookupKeyNoManagedBy, gomega.Not(gomega.Succeed()))
})

ginkgo.It("Should run a XGBoostJob on worker if admitted", framework.RedundantSpec, func() {
xgBoostJob := testingxgboostjob.MakeXGBoostJob("xgboostjob1", managerNs.Name).
Queue(managerLq.Name).
Expand Down Expand Up @@ -1096,23 +1133,8 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
})

wlLookupKeyNoManagedBy := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(mpijobNoManagedBy.Name, mpijobNoManagedBy.UID), Namespace: managerNs.Name}
ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKeyNoManagedBy, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("workload was not created in the worker clusters", func() {
managerWl := &kueue.Workload{}
createdWorkload := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKeyNoManagedBy, managerWl)).To(gomega.Succeed())
gomega.Consistently(func(g gomega.Gomega) {
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKeyNoManagedBy, createdWorkload)).ToNot(gomega.Succeed())
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKeyNoManagedBy, createdWorkload)).ToNot(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
setQuotaReservationInCluster(wlLookupKeyNoManagedBy, admission)
checkingTheWorkloadCreation(wlLookupKeyNoManagedBy, gomega.Not(gomega.Succeed()))
})

ginkgo.It("Should run a MPIJob on worker if admitted", func() {
Expand Down Expand Up @@ -2223,3 +2245,27 @@ func waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey types.Names
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
}

func setQuotaReservationInCluster(wlLookupKey types.NamespacedName, admission *utiltesting.AdmissionWrapper) {
ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
}

func checkingTheWorkloadCreation(wlLookupKey types.NamespacedName, matcher gomegatypes.GomegaMatcher) {
ginkgo.By("checking the workload creation in the worker clusters", func() {
managerWl := &kueue.Workload{}
createdWorkload := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(matcher)
})
}

0 comments on commit c69b9fc

Please sign in to comment.