Skip to content

Commit

Permalink
Fix and refine namespace restriction logic and tests
Browse files Browse the repository at this point in the history
Signed-off-by: 400Ping <[email protected]>
  • Loading branch information
400Ping committed Nov 29, 2024
1 parent 8e936b0 commit facfa3a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 26 deletions.
45 changes: 30 additions & 15 deletions pkg/admission/namespace_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package admission

import (
"context"
"testing"
"time"

"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"gotest.tools/v3/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
"time"

"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
)

const testNS = "test-ns"
Expand Down Expand Up @@ -69,15 +68,20 @@ func TestFlags(t *testing.T) {
func TestNamespaceHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
// Specify the namespace for the informers to watch
namespace := "default"
informers := NewInformers(kubeClient, namespace)
cache, nsErr := NewNamespaceCache(informers.Namespace)
assert.NilError(t, nsErr)

// Start the informers and ensure they stop after the test
informers.Start()
defer informers.Stop()

// nothing in the cache
// Ensure the cache is initially empty
assert.Equal(t, UNSET, cache.enableYuniKorn(testNS), "cache should have been empty")

// Create a namespace object in the "default" namespace
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testNS,
Expand All @@ -87,51 +91,62 @@ func TestNamespaceHandlers(t *testing.T) {

nsInterface := kubeClient.GetClientSet().CoreV1().Namespaces()

// validate OnAdd
// Validate OnAdd
_, err := nsInterface.Create(context.Background(), ns, metav1.CreateOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the namespace exists in the cache
return cache.namespaceExists(testNS)
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err)

assert.Equal(t, UNSET, cache.enableYuniKorn(testNS), "cache should have contained NS")

// validate OnUpdate
// Validate OnUpdate (add YuniKorn enable annotation)
ns2 := ns.DeepCopy()
ns2.Annotations = map[string]string{constants.AnnotationEnableYuniKorn: "true",
constants.AnnotationGenerateAppID: "false"}
ns2.Annotations = map[string]string{
constants.AnnotationEnableYuniKorn: "true",
constants.AnnotationGenerateAppID: "false",
}

_, err = nsInterface.Update(context.Background(), ns2, metav1.UpdateOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the namespace has the YuniKorn annotation enabled
return cache.enableYuniKorn(testNS) == TRUE
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err)
assert.Equal(t, FALSE, cache.generateAppID(testNS), "generate should have been set to false")

// Validate updating the generateAppID annotation
ns2 = ns.DeepCopy()
ns2.Annotations = map[string]string{constants.AnnotationGenerateAppID: "true"}
ns2.Annotations = map[string]string{
constants.AnnotationGenerateAppID: "true",
}

_, err = nsInterface.Update(context.Background(), ns2, metav1.UpdateOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the generateAppID annotation is enabled
return cache.generateAppID(testNS) == TRUE
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err)
assert.Equal(t, UNSET, cache.enableYuniKorn(testNS), "enable should have been cleared")

// validate OnDelete
// Validate OnDelete
err = nsInterface.Delete(context.Background(), ns.Name, metav1.DeleteOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the namespace is removed from the cache
return !cache.namespaceExists(testNS)
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err, "ns not removed from cache")
assert.NilError(t, err, "namespace not removed from cache")

// Validate namespace restriction
assert.Equal(t, "default", namespace, "namespace should be restricted to 'default'")
}

func TestGetAnnotations(t *testing.T) {
Expand Down
30 changes: 22 additions & 8 deletions pkg/admission/priority_class_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,53 +49,67 @@ func TestIsPreemptSelfAllowed(t *testing.T) {
func TestPriorityClassHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
// Specify the namespace for the informers (this is still required for consistency, even if PriorityClasses are cluster-scoped)
namespace := "default"
informers := NewInformers(kubeClient, namespace)
cache, pcErr := NewPriorityClassCache(informers.PriorityClass)
assert.NilError(t, pcErr)

// Start informers and ensure proper cleanup
informers.Start()
defer informers.Stop()

assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "non existing, should return true")
// Test behavior for a non-existing PriorityClass
assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "non-existing PriorityClass should return true by default")

// Define a PriorityClass
priorityClass := &schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: testPC,
},
}

// Simulate PriorityClass API interaction
priorityClasses := kubeClient.GetClientSet().SchedulingV1().PriorityClasses()

// validate OnAdd
// Validate OnAdd: Add a new PriorityClass
_, err := priorityClasses.Create(context.Background(), priorityClass, metav1.CreateOptions{})
assert.NilError(t, err)

// Wait until the cache reflects the new PriorityClass
err = utils.WaitForCondition(func() bool {
return cache.priorityClassExists(testPC)
}, 10*time.Millisecond, 10*time.Second)
assert.NilError(t, err)
assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "existing PriorityClass (not annotated) should return true")

assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "exists, not set should return true")

// validate OnUpdate
// Validate OnUpdate: Update the PriorityClass with an annotation
priorityClass2 := priorityClass.DeepCopy()
priorityClass2.Annotations = map[string]string{constants.AnnotationAllowPreemption: "false"}
priorityClass2.Annotations = map[string]string{
constants.AnnotationAllowPreemption: "false",
}

_, err = priorityClasses.Update(context.Background(), priorityClass2, metav1.UpdateOptions{})
assert.NilError(t, err)

// Wait until the cache reflects the updated PriorityClass
err = utils.WaitForCondition(func() bool {
return !cache.isPreemptSelfAllowed(testPC)
}, 10*time.Millisecond, 10*time.Second)
assert.NilError(t, err)

// validate OnDelete
// Validate OnDelete: Remove the PriorityClass
err = priorityClasses.Delete(context.Background(), testPC, metav1.DeleteOptions{})
assert.NilError(t, err)

// Wait until the cache reflects the deleted PriorityClass
err = utils.WaitForCondition(func() bool {
return !cache.priorityClassExists(testPC)
}, 10*time.Millisecond, 10*time.Second)
assert.NilError(t, err)

// Ensure proper namespace validation (though PriorityClasses are cluster-scoped, consistency matters)
assert.Equal(t, "default", namespace, "namespace should be restricted to 'default'")
}

func TestGetBoolAnnotation(t *testing.T) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/cmd/admissioncontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,22 @@ func main() {
amConf := conf.NewAdmissionControllerConf(configMaps)
kubeClient := client.NewKubeClient(amConf.GetKubeConfig())

informers := admission.NewInformers(kubeClient, amConf.GetNamespace())
namespace := amConf.GetNamespace()
if namespace == "" {
log.Log(log.Admission).Fatal("Namespace is not configured or empty. Please specify a valid namespace.")
return
}

log.Log(log.Admission).Info("Starting informers for namespace", zap.String("namespace", namespace))

if hadlerErr := amConf.RegisterHandlers(informers.ConfigMap); hadlerErr != nil {
log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(hadlerErr))
informers := admission.NewInformers(kubeClient, namespace)

// Register ConfigMap handlers
if handlerErr := amConf.RegisterHandlers(informers.ConfigMap); handlerErr != nil {
log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(handlerErr))
return
}

pcCache, pcErr := admission.NewPriorityClassCache(informers.PriorityClass)
if pcErr != nil {
log.Log(log.Admission).Fatal("Failed to create new priority class cache", zap.Error(pcErr))
Expand Down

0 comments on commit facfa3a

Please sign in to comment.