From 4f7d7a7abfc88ecb6cebb83a787fe7c7b026c976 Mon Sep 17 00:00:00 2001 From: msherif1234 Date: Wed, 23 Aug 2023 09:52:18 -0400 Subject: [PATCH] use metadatainformer for replicaset to save storage cache Signed-off-by: msherif1234 --- .../transform/kubernetes/kubernetes.go | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/pkg/pipeline/transform/kubernetes/kubernetes.go b/pkg/pipeline/transform/kubernetes/kubernetes.go index 4916555c9..eb0b46064 100644 --- a/pkg/pipeline/transform/kubernetes/kubernetes.go +++ b/pkg/pipeline/transform/kubernetes/kubernetes.go @@ -19,18 +19,22 @@ package kubernetes import ( "fmt" + appsv1 "k8s.io/api/apps/v1" "net" "os" "path" "time" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" + log "github.com/sirupsen/logrus" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -60,6 +64,7 @@ type KubeData struct { // replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers replicaSets cache.SharedIndexInformer stopChan chan struct{} + mdStopChan chan struct{} } type Owner struct { @@ -267,7 +272,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF ips := make([]string, 0, len(svc.Spec.ClusterIPs)) for _, ip := range svc.Spec.ClusterIPs { // ignoring None IPs - if ip != v1.ClusterIPNone { + if isServiceIPSet(ip) { ips = append(ips, ip) } } @@ -291,10 +296,15 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF return nil } -func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error { - k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer() - // To save space, instead of storing a complete *appvs1.Replicaset instance, the - // informer's cache will store a *metav1.ObjectMeta with the minimal required fields +func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error { + k.replicaSets = informerFactory.ForResource( + schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }).Informer() + // To save space, instead of storing a complete *metav1.ObjectMeta instance, the + // informer's cache will store only the minimal required fields if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) { rs, ok := i.(*appsv1.ReplicaSet) if !ok { @@ -314,6 +324,7 @@ func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInform func (k *KubeData) InitFromConfig(kubeConfigPath string) error { // Initialization variables k.stopChan = make(chan struct{}) + k.mdStopChan = make(chan struct{}) config, err := LoadConfig(kubeConfigPath) if err != nil { @@ -325,7 +336,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error { return err } - err = k.initInformers(kubeClient) + metaKubeClient, err := metadata.NewForConfig(config) + if err != nil { + return err + } + + err = k.initInformers(kubeClient, metaKubeClient) if err != nil { return err } @@ -360,8 +376,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *KubeData) initInformers(client kubernetes.Interface) error { +func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { informerFactory := informers.NewSharedInformerFactory(client, syncTime) + metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory) if err != nil { return err @@ -374,7 +391,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error { if err != nil { return err } - err = k.initReplicaSetInformer(informerFactory) + err = k.initReplicaSetInformer(metadataInformerFactory) if err != nil { return err } @@ -384,5 +401,13 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error { informerFactory.WaitForCacheSync(k.stopChan) log.Debugf("kubernetes informers started") + log.Debugf("starting kubernetes metadata informers, waiting for synchronization") + metadataInformerFactory.Start(k.mdStopChan) + metadataInformerFactory.WaitForCacheSync(k.mdStopChan) + log.Debugf("kubernetes metadata informers started") return nil } + +func isServiceIPSet(ip string) bool { + return ip != v1.ClusterIPNone && ip != "" +}