Skip to content

Commit

Permalink
WIP on integration and e2e test for multikueue appwrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss committed Jan 17, 2025
1 parent 799fea8 commit 9fc46ca
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 3 deletions.
79 changes: 79 additions & 0 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -45,6 +46,7 @@ import (
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingaw "sigs.k8s.io/kueue/pkg/util/testingjobs/appwrapper"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob"
Expand Down Expand Up @@ -373,6 +375,83 @@ var _ = ginkgo.Describe("MultiKueue", func() {
})
})

ginkgo.It("Should run an appwrapper containing a job on worker if admitted", func() {
// Since it requires 2 CPU in total, this appwrapper can only be admitted in worker 1.
aw := testingaw.MakeAppWrapper("aw", managerNs.Name).
Component(testingjob.MakeJob("job-1", managerNs.Name).
SetTypeMeta().
Suspend(false).
Image(util.E2eTestSleepImage, []string{"5s"}). // Give it the time to be observed Active in the live status update step.
Parallelism(2).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "200m").
SetTypeMeta().Obj()).
Suspend(false).
Obj()

ginkgo.By("Creating the appwrapper", func() {
gomega.Expect(k8sManagerClient.Create(ctx, aw)).Should(gomega.Succeed())
})

createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(aw.Name, aw.UID), Namespace: managerNs.Name}

// the execution should be given to the worker
ginkgo.By("Waiting to be admitted in worker1 and manager", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
Name: multiKueueAc.Name,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the appwrapper to get status updates", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdAppWrapper := &awv1beta2.AppWrapper{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(aw), createdAppWrapper)).To(gomega.Succeed())
g.Expect(createdAppWrapper.Status.Phase).To(gomega.Equal(awv1beta2.AppWrapperRunning))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the appwrapper to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())

g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadFinishedReasonSucceeded,
Message: "AppWrapper completed successfully",
}, util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking no objects are left in the worker clusters and the appwrapper is completed", func() {
gomega.Eventually(func(g gomega.Gomega) {
workerWl := &kueue.Workload{}
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
workerAppWrapper := &awv1beta2.AppWrapper{}
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(aw), workerAppWrapper)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(aw), workerAppWrapper)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

createdAppWrapper := &awv1beta2.AppWrapper{}
gomega.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(aw), createdAppWrapper)).To(gomega.Succeed())
gomega.Expect(createdAppWrapper.Spec.Suspend).To(gomega.BeFalse())
gomega.Expect(createdAppWrapper.Status.Phase).To(gomega.Equal(awv1beta2.AppWrapperSucceeded))
})
})

ginkgo.It("Should run a kubeflow PyTorchJob on worker if admitted", func() {
// Since it requires 1600M of memory, this job can only be admitted in worker 2.
pyTorchJob := testingpytorchjob.MakePyTorchJob("pytorchjob1", managerNs.Name).
Expand Down
7 changes: 6 additions & 1 deletion test/e2e/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
authenticationv1 "k8s.io/api/authentication/v1"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -97,6 +98,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig
policyRule(kftraining.SchemeGroupVersion.Group, "pytorchjobs/status", "get"),
policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs", resourceVerbs...),
policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs/status", "get"),
policyRule(awv1beta2.GroupVersion.Group, "appwrapper", resourceVerbs...),
policyRule(awv1beta2.GroupVersion.Group, "appwrapper/status", "get"),
policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs", resourceVerbs...),
policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs/status", "get"),
policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs", resourceVerbs...),
Expand Down Expand Up @@ -276,11 +279,13 @@ var _ = ginkgo.BeforeSuite(func() {
util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker1Client)
util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker2Client)

util.WaitForAppWrapperAvailability(ctx, k8sManagerClient)
util.WaitForAppWrapperAvailability(ctx, k8sWorker2Client)

util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker1Client)
util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker2Client)

ginkgo.GinkgoLogr.Info("Kueue and all integration operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart))

discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
managerK8SVersion, err = kubeversion.FetchServerVersion(discoveryClient)
Expand Down
5 changes: 3 additions & 2 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
workloadappwrapper "sigs.k8s.io/kueue/pkg/controller/jobs/appwrapper"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob"
Expand Down Expand Up @@ -1059,14 +1060,14 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
})
})

ginkgo.It("Should run an appwrapper on worker if admitted", func() {
ginkgo.It("Should run an appwrapper on worker if admitted", ginkgo.Focus, func() {
aw := testingaw.MakeAppWrapper("aw", managerNs.Name).
Component(testingjob.MakeJob("job-1", managerNs.Name).SetTypeMeta().Parallelism(1).Obj()).
Suspend(false).
Obj()

gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, aw)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(aw.Name, aw.UID), Namespace: managerNs.Name}
wlLookupKey := types.NamespacedName{Name: workloadappwrapper.GetWorkloadNameForAppWrapper(aw.Name, aw.UID), Namespace: managerNs.Name}

admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Expand Down
14 changes: 14 additions & 0 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
workloadaw "sigs.k8s.io/kueue/pkg/controller/jobs/appwrapper"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob"
Expand Down Expand Up @@ -95,6 +96,7 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c
DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator"),
filepath.Join("..", "..", "..", "dep-crds", "training-operator-crds"),
filepath.Join("..", "..", "..", "dep-crds", "mpi-operator"),
filepath.Join("..", "..", "..", "dep-crds", "appwrapper-crds"),
},
APIServerFeatureGates: apiFeatureGates,
}
Expand Down Expand Up @@ -206,6 +208,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {

err = workloadmpijob.SetupMPIJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadaw.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

appwrapperReconciler := workloadaw.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName))
err = appwrapperReconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadaw.SetupAppWrapperWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) {
Expand Down

0 comments on commit 9fc46ca

Please sign in to comment.