Skip to content

Commit

Permalink
Add e2e tests for manageJobsWithoutQueueName
Browse files Browse the repository at this point in the history
  • Loading branch information
kaisoz committed Jan 31, 2025
1 parent 198d276 commit 7b42d2f
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 0 deletions.
12 changes: 12 additions & 0 deletions Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ test-multikueue-e2e: kustomize ginkgo yq gomod-download dep-crds ginkgo-top run-
.PHONY: test-tas-e2e
test-tas-e2e: kustomize ginkgo yq gomod-download dep-crds kueuectl ginkgo-top run-test-tas-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%)

.PHONY: test-queuename-e2e
test-queuename-e2e: kustomize ginkgo yq gomod-download dep-crds kueuectl ginkgo-top run-test-queuename-e2e-$(E2E_KIND_VERSION:kindest/node:v%=%)

E2E_TARGETS := $(addprefix run-test-e2e-,${E2E_K8S_VERSIONS})
MULTIKUEUE-E2E_TARGETS := $(addprefix run-test-multikueue-e2e-,${E2E_K8S_VERSIONS})
.PHONY: test-e2e-all
Expand Down Expand Up @@ -151,6 +154,15 @@ run-test-tas-e2e-%: FORCE
./hack/e2e-test.sh
$(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml

run-test-queuename-e2e-%: K8S_VERSION = $(@:run-test-queuename-e2e-%=%)
run-test-queuename-e2e-%: FORCE
@echo Running e2e for k8s ${K8S_VERSION}
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) \
ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \
JOBSET_VERSION=$(JOBSET_VERSION) \
KIND_CLUSTER_FILE="kind-cluster.yaml" E2E_TARGET_FOLDER="queuename" \
./hack/e2e-test.sh
$(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml

SCALABILITY_RUNNER := $(PROJECT_DIR)/bin/performance-scheduler-runner
.PHONY: performance-scheduler-runner
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/config/default/manager_e2e_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
- op: add
path: /spec/template/spec/containers/0/args/-
value: --feature-gates=MultiKueueBatchJobWithManagedBy=true,TopologyAwareScheduling=true,LocalQueueMetrics=true
- op: add
path: /spec/strategy
value:
type: Recreate
145 changes: 145 additions & 0 deletions test/e2e/queuename/jobwithoutqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queuename

import (
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
"sigs.k8s.io/kueue/test/util"
)

var _ = ginkgo.Describe("ManageJobsWithoutQueueName", ginkgo.Ordered, func() {
var ns *corev1.Namespace

ginkgo.BeforeAll(func() {
configurationUpdate := time.Now()
config := util.GetKueueConfiguration(ctx, k8sClient)
config.ManageJobsWithoutQueueName = true
util.ApplyKueueConfiguration(ctx, k8sClient, config)
util.RestartKueueController(ctx, k8sClient)
ginkgo.GinkgoLogr.Info("Kueue configuration updated", "took", time.Since(configurationUpdate))
})

ginkgo.AfterAll(func() {
config := util.GetKueueConfiguration(ctx, k8sClient)
config.ManageJobsWithoutQueueName = false
util.ApplyKueueConfiguration(ctx, k8sClient, config)
util.RestartKueueController(ctx, k8sClient)
})

ginkgo.BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "e2e-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
})
ginkgo.When("creating a Job when manageJobsWithoutQueueName=true", func() {
var (
defaultRf *kueue.ResourceFlavor
localQueue *kueue.LocalQueue
clusterQueue *kueue.ClusterQueue
)
ginkgo.BeforeEach(func() {
defaultRf = testing.MakeResourceFlavor("default").Obj()
gomega.Expect(k8sClient.Create(ctx, defaultRf)).Should(gomega.Succeed())
clusterQueue = testing.MakeClusterQueue("cluster-queue").
ResourceGroup(
*testing.MakeFlavorQuotas(defaultRf.Name).
Resource(corev1.ResourceCPU, "2").
Resource(corev1.ResourceMemory, "2G").Obj()).Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
localQueue = testing.MakeLocalQueue("main", ns.Name).ClusterQueue("cluster-queue").Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteAllJobsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed())
// Force remove workloads to be sure that cluster queue can be removed.
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed())
gomega.Expect(util.DeleteObject(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, defaultRf, true)
})

ginkgo.It("should suspend it", func() {
var testJob *batchv1.Job
ginkgo.By("creating a job without queue name", func() {
testJob = testingjob.MakeJob("test-job", ns.Name).Suspend(false).Obj()
gomega.Expect(k8sClient.Create(ctx, testJob)).Should(gomega.Succeed())
})

ginkgo.By("verifying that the job is suspended", func() {
jobLookupKey := types.NamespacedName{Name: testJob.Name, Namespace: ns.Name}
createdJob := &batchv1.Job{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeTrue())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("should unsuspend it", func() {
var testJob *batchv1.Job
var jobLookupKey types.NamespacedName
createdJob := &batchv1.Job{}
ginkgo.By("creating a job without queue name", func() {
testJob = testingjob.MakeJob("test-job", ns.Name).Suspend(false).Obj()
gomega.Expect(k8sClient.Create(ctx, testJob)).Should(gomega.Succeed())
})
ginkgo.By("setting the queue-name label", func() {
jobLookupKey = types.NamespacedName{Name: testJob.Name, Namespace: ns.Name}
gomega.Eventually(func() error {
if err := k8sClient.Get(ctx, jobLookupKey, createdJob); err != nil {
return err
}
createdJob.Labels["kueue.x-k8s.io/queue-name"] = "main"
return k8sClient.Update(ctx, createdJob)
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
ginkgo.By("verifying that the job is unsuspended", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeFalse())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
ginkgo.By("verifying that the job has been admitted", func() {
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(testJob.Name, testJob.UID), Namespace: ns.Name}
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed())
g.Expect(createdWorkload.Status.Admission).ShouldNot(gomega.BeNil())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})
})
})
65 changes: 65 additions & 0 deletions test/e2e/queuename/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queuename

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

visibilityv1beta1 "sigs.k8s.io/kueue/client-go/clientset/versioned/typed/visibility/v1beta1"
"sigs.k8s.io/kueue/test/util"
)

var (
k8sClient client.WithWatch
ctx context.Context
visibilityClient visibilityv1beta1.VisibilityV1beta1Interface
impersonatedVisibilityClient visibilityv1beta1.VisibilityV1beta1Interface
)

func TestAPIs(t *testing.T) {
suiteName := "End To End Queue-name handling Suite"
if ver, found := os.LookupEnv("E2E_KIND_VERSION"); found {
suiteName = fmt.Sprintf("%s: %s", suiteName, ver)
}
gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t,
suiteName,
)
}

var _ = ginkgo.BeforeSuite(func() {
ctrl.SetLogger(util.NewTestingLogger(ginkgo.GinkgoWriter, -3))

k8sClient, _ = util.CreateClientUsingCluster("")
visibilityClient = util.CreateVisibilityClient("")
impersonatedVisibilityClient = util.CreateVisibilityClient("system:serviceaccount:kueue-system:default")
ctx = context.Background()

waitForAvailableStart := time.Now()
util.WaitForKueueAvailability(ctx, k8sClient)
util.WaitForJobSetAvailability(ctx, k8sClient)
ginkgo.GinkgoLogr.Info("Kueue and JobSet oprators are available in the cluster", "waitingTime", time.Since(waitForAvailableStart))
})
46 changes: 46 additions & 0 deletions test/util/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/google/go-cmp/cmp/cmpopts"
kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
Expand All @@ -21,7 +22,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/config"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"
"sigs.k8s.io/yaml"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
visibility "sigs.k8s.io/kueue/apis/visibility/v1beta1"
Expand Down Expand Up @@ -112,6 +115,21 @@ func CreateVisibilityClient(user string) visibilityv1beta1.VisibilityV1beta1Inte
return visibilityClient
}

func rolloutOperatorDeployment(ctx context.Context, k8sClient client.Client, key types.NamespacedName) {
deployment := &appsv1.Deployment{}
gomega.EventuallyWithOffset(2, func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, key, deployment)).To(gomega.Succeed())
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
g.Expect(k8sClient.Update(ctx, deployment)).To(gomega.Succeed())

// Ensure that the rollout has started by waiting for the deployment to be unavailable
g.Expect(deployment.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(
appsv1.DeploymentCondition{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionFalse},
cmpopts.IgnoreFields(appsv1.DeploymentCondition{}, "Reason", "Message", "LastUpdateTime", "LastTransitionTime")),
))
}, StartUpTimeout, Interval).Should(gomega.Succeed())
}

func waitForOperatorAvailability(ctx context.Context, k8sClient client.Client, key types.NamespacedName) {
deployment := &appsv1.Deployment{}
pods := &corev1.PodList{}
Expand Down Expand Up @@ -171,3 +189,31 @@ func WaitForKubeRayOperatorAvailability(ctx context.Context, k8sClient client.Cl
kroKey := types.NamespacedName{Namespace: "ray-system", Name: "kuberay-operator"}
waitForOperatorAvailability(ctx, k8sClient, kroKey)
}

func GetKueueConfiguration(ctx context.Context, k8sClient client.Client) *configapi.Configuration {
var kueueCfg configapi.Configuration
kcmKey := types.NamespacedName{Namespace: "kueue-system", Name: "kueue-manager-config"}
configMap := &corev1.ConfigMap{}

gomega.Expect(k8sClient.Get(ctx, kcmKey, configMap)).To(gomega.Succeed())
gomega.Expect(yaml.Unmarshal([]byte(configMap.Data["controller_manager_config.yaml"]), &kueueCfg)).To(gomega.Succeed())
return &kueueCfg
}

func ApplyKueueConfiguration(ctx context.Context, k8sClient client.Client, kueueCfg *configapi.Configuration) {
configMap := &corev1.ConfigMap{}
kcmKey := types.NamespacedName{Namespace: "kueue-system", Name: "kueue-manager-config"}
config, _ := yaml.Marshal(kueueCfg)

gomega.Eventually(func(g gomega.Gomega) {
gomega.Expect(k8sClient.Get(ctx, kcmKey, configMap)).To(gomega.Succeed())
configMap.Data["controller_manager_config.yaml"] = string(config)
g.Expect(k8sClient.Update(ctx, configMap)).To(gomega.Succeed())
}, Timeout, Interval).Should(gomega.Succeed())
}

func RestartKueueController(ctx context.Context, k8sClient client.Client) {
kcmKey := types.NamespacedName{Namespace: "kueue-system", Name: "kueue-controller-manager"}
rolloutOperatorDeployment(ctx, k8sClient, kcmKey)
waitForOperatorAvailability(ctx, k8sClient, kcmKey)
}

0 comments on commit 7b42d2f

Please sign in to comment.