Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2850] Watch configmap only in yunikorn's deployed namespace #942

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
10 changes: 6 additions & 4 deletions pkg/admission/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ type Informers struct {
func NewInformers(kubeClient client.KubeClient, namespace string) *Informers {
stopChan := make(chan struct{})

informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient.GetClientSet(), 0, informers.WithNamespace(namespace))
informerFactory := informers.NewSharedInformerFactoryWithOptions(
kubeClient.GetClientSet(),
0,
informers.WithNamespace(namespace),
)
informerFactory.Start(stopChan)

result := &Informers{
return &Informers{
pbacsko marked this conversation as resolved.
Show resolved Hide resolved
ConfigMap: informerFactory.Core().V1().ConfigMaps(),
PriorityClass: informerFactory.Scheduling().V1().PriorityClasses(),
Namespace: informerFactory.Core().V1().Namespaces(),
stopChan: stopChan,
}

return result
}

func (i *Informers) Start() {
Expand Down
36 changes: 26 additions & 10 deletions pkg/admission/namespace_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,20 @@ func TestFlags(t *testing.T) {
func TestNamespaceHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
// Specify the namespace for the informers to watch
namespace := "default"
informers := NewInformers(kubeClient, namespace)
cache, nsErr := NewNamespaceCache(informers.Namespace)
assert.NilError(t, nsErr)

// Start the informers and ensure they stop after the test
informers.Start()
defer informers.Stop()

// nothing in the cache
// Ensure the cache is initially empty
assert.Equal(t, UNSET, cache.enableYuniKorn(testNS), "cache should have been empty")

// Create a namespace object in the "default" namespace
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testNS,
Expand All @@ -87,51 +92,62 @@ func TestNamespaceHandlers(t *testing.T) {

nsInterface := kubeClient.GetClientSet().CoreV1().Namespaces()

// validate OnAdd
// Validate OnAdd
_, err := nsInterface.Create(context.Background(), ns, metav1.CreateOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the namespace exists in the cache
return cache.namespaceExists(testNS)
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err)

assert.Equal(t, UNSET, cache.enableYuniKorn(testNS), "cache should have contained NS")

// validate OnUpdate
// Validate OnUpdate (add YuniKorn enable annotation)
ns2 := ns.DeepCopy()
ns2.Annotations = map[string]string{constants.AnnotationEnableYuniKorn: "true",
constants.AnnotationGenerateAppID: "false"}
ns2.Annotations = map[string]string{
constants.AnnotationEnableYuniKorn: "true",
constants.AnnotationGenerateAppID: "false",
}

_, err = nsInterface.Update(context.Background(), ns2, metav1.UpdateOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the namespace has the YuniKorn annotation enabled
return cache.enableYuniKorn(testNS) == TRUE
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err)
assert.Equal(t, FALSE, cache.generateAppID(testNS), "generate should have been set to false")

// Validate updating the generateAppID annotation
ns2 = ns.DeepCopy()
ns2.Annotations = map[string]string{constants.AnnotationGenerateAppID: "true"}
ns2.Annotations = map[string]string{
constants.AnnotationGenerateAppID: "true",
}

_, err = nsInterface.Update(context.Background(), ns2, metav1.UpdateOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the generateAppID annotation is enabled
return cache.generateAppID(testNS) == TRUE
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err)
assert.Equal(t, UNSET, cache.enableYuniKorn(testNS), "enable should have been cleared")

// validate OnDelete
// Validate OnDelete
err = nsInterface.Delete(context.Background(), ns.Name, metav1.DeleteOptions{})
assert.NilError(t, err)

err = utils.WaitForCondition(func() bool {
// Check that the namespace is removed from the cache
return !cache.namespaceExists(testNS)
}, 10*time.Millisecond, 5*time.Second)
assert.NilError(t, err, "ns not removed from cache")
assert.NilError(t, err, "namespace not removed from cache")

// Validate namespace restriction
assert.Equal(t, "default", namespace, "namespace should be restricted to 'default'")
}

func TestGetAnnotations(t *testing.T) {
Expand Down
30 changes: 22 additions & 8 deletions pkg/admission/priority_class_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,53 +49,67 @@ func TestIsPreemptSelfAllowed(t *testing.T) {
func TestPriorityClassHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
// Specify the namespace for the informers (this is still required for consistency, even if PriorityClasses are cluster-scoped)
namespace := "default"
informers := NewInformers(kubeClient, namespace)
cache, pcErr := NewPriorityClassCache(informers.PriorityClass)
assert.NilError(t, pcErr)

// Start informers and ensure proper cleanup
informers.Start()
defer informers.Stop()

assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "non existing, should return true")
// Test behavior for a non-existing PriorityClass
assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "non-existing PriorityClass should return true by default")

// Define a PriorityClass
priorityClass := &schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: testPC,
},
}

// Simulate PriorityClass API interaction
priorityClasses := kubeClient.GetClientSet().SchedulingV1().PriorityClasses()

// validate OnAdd
// Validate OnAdd: Add a new PriorityClass
_, err := priorityClasses.Create(context.Background(), priorityClass, metav1.CreateOptions{})
assert.NilError(t, err)

// Wait until the cache reflects the new PriorityClass
err = utils.WaitForCondition(func() bool {
return cache.priorityClassExists(testPC)
}, 10*time.Millisecond, 10*time.Second)
assert.NilError(t, err)
assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "existing PriorityClass (not annotated) should return true")

assert.Assert(t, cache.isPreemptSelfAllowed(testPC), "exists, not set should return true")

// validate OnUpdate
// Validate OnUpdate: Update the PriorityClass with an annotation
priorityClass2 := priorityClass.DeepCopy()
priorityClass2.Annotations = map[string]string{constants.AnnotationAllowPreemption: "false"}
priorityClass2.Annotations = map[string]string{
constants.AnnotationAllowPreemption: "false",
}

_, err = priorityClasses.Update(context.Background(), priorityClass2, metav1.UpdateOptions{})
assert.NilError(t, err)

// Wait until the cache reflects the updated PriorityClass
err = utils.WaitForCondition(func() bool {
return !cache.isPreemptSelfAllowed(testPC)
}, 10*time.Millisecond, 10*time.Second)
assert.NilError(t, err)

// validate OnDelete
// Validate OnDelete: Remove the PriorityClass
err = priorityClasses.Delete(context.Background(), testPC, metav1.DeleteOptions{})
assert.NilError(t, err)

// Wait until the cache reflects the deleted PriorityClass
err = utils.WaitForCondition(func() bool {
return !cache.priorityClassExists(testPC)
}, 10*time.Millisecond, 10*time.Second)
assert.NilError(t, err)

// Ensure proper namespace validation (though PriorityClasses are cluster-scoped, consistency matters)
assert.Equal(t, "default", namespace, "namespace should be restricted to 'default'")
}

func TestGetBoolAnnotation(t *testing.T) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/cmd/admissioncontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,22 @@ func main() {
amConf := conf.NewAdmissionControllerConf(configMaps)
kubeClient := client.NewKubeClient(amConf.GetKubeConfig())

informers := admission.NewInformers(kubeClient, amConf.GetNamespace())
namespace := amConf.GetNamespace()
if namespace == "" {
log.Log(log.Admission).Fatal("Namespace is not configured or empty. Please specify a valid namespace.")
return
}

log.Log(log.Admission).Info("Starting informers for namespace", zap.String("namespace", namespace))

if hadlerErr := amConf.RegisterHandlers(informers.ConfigMap); hadlerErr != nil {
log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(hadlerErr))
informers := admission.NewInformers(kubeClient, namespace)

// Register ConfigMap handlers
if handlerErr := amConf.RegisterHandlers(informers.ConfigMap); handlerErr != nil {
log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(handlerErr))
return
}

pcCache, pcErr := admission.NewPriorityClassCache(informers.PriorityClass)
if pcErr != nil {
log.Log(log.Admission).Fatal("Failed to create new priority class cache", zap.Error(pcErr))
Expand Down
Loading