From 86730ae568b78a8921a44d59b94d4c8009856a8d Mon Sep 17 00:00:00 2001 From: msherif1234 Date: Wed, 23 Aug 2023 09:52:18 -0400 Subject: [PATCH] use metadatainformer for replicaset to save storage cache Signed-off-by: msherif1234 --- .../transform/kubernetes/kubernetes.go | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/pkg/pipeline/transform/kubernetes/kubernetes.go b/pkg/pipeline/transform/kubernetes/kubernetes.go index 4916555c9..5ce4cf25b 100644 --- a/pkg/pipeline/transform/kubernetes/kubernetes.go +++ b/pkg/pipeline/transform/kubernetes/kubernetes.go @@ -31,6 +31,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -267,7 +269,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF ips := make([]string, 0, len(svc.Spec.ClusterIPs)) for _, ip := range svc.Spec.ClusterIPs { // ignoring None IPs - if ip != v1.ClusterIPNone { + if isServiceIPSet(ip) { ips = append(ips, ip) } } @@ -291,23 +293,8 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF return nil } -func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error { - k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer() - // To save space, instead of storing a complete *appvs1.Replicaset instance, the - // informer's cache will store a *metav1.ObjectMeta with the minimal required fields - if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) { - rs, ok := i.(*appsv1.ReplicaSet) - if !ok { - return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i) - } - return &metav1.ObjectMeta{ - Name: rs.Name, - Namespace: rs.Namespace, - OwnerReferences: rs.OwnerReferences, - }, nil - }); err != nil { - return fmt.Errorf("can't set ReplicaSets transform: %w", err) - } +func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error { + k.replicaSets = informerFactory.ForResource(appsv1.SchemeGroupVersion.WithResource("deployment")).Informer() return nil } @@ -325,7 +312,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error { return err } - err = k.initInformers(kubeClient) + metaKubeClient, err := metadata.NewForConfig(config) + if err != nil { + return err + } + + err = k.initInformers(kubeClient, metaKubeClient) if err != nil { return err } @@ -360,8 +352,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *KubeData) initInformers(client kubernetes.Interface) error { +func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { informerFactory := informers.NewSharedInformerFactory(client, syncTime) + metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory) if err != nil { return err @@ -374,7 +367,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error { if err != nil { return err } - err = k.initReplicaSetInformer(informerFactory) + err = k.initReplicaSetInformer(metadataInformerFactory) if err != nil { return err } @@ -386,3 +379,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error { return nil } + +func isServiceIPSet(ip string) bool { + return ip != v1.ClusterIPNone && ip != "" +}