Skip to content

Commit

Permalink
use metadatainformer for replicaset to save storage cache
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <[email protected]>
  • Loading branch information
msherif1234 committed Nov 3, 2023
1 parent 926ccc6 commit 4f7d7a7
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions pkg/pipeline/transform/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package kubernetes

import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
"net"
"os"
"path"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni"

log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -60,6 +64,7 @@ type KubeData struct {
// replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers
replicaSets cache.SharedIndexInformer
stopChan chan struct{}
mdStopChan chan struct{}
}

type Owner struct {
Expand Down Expand Up @@ -267,7 +272,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
ips := make([]string, 0, len(svc.Spec.ClusterIPs))
for _, ip := range svc.Spec.ClusterIPs {
// ignoring None IPs
if ip != v1.ClusterIPNone {
if isServiceIPSet(ip) {
ips = append(ips, ip)
}
}
Expand All @@ -291,10 +296,15 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
return nil
}

func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error {
k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer()
// To save space, instead of storing a complete *appvs1.Replicaset instance, the
// informer's cache will store a *metav1.ObjectMeta with the minimal required fields
func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error {
k.replicaSets = informerFactory.ForResource(
schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "replicasets",
}).Informer()
// To save space, instead of storing a complete *metav1.ObjectMeta instance, the
// informer's cache will store only the minimal required fields
if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) {
rs, ok := i.(*appsv1.ReplicaSet)
if !ok {
Expand All @@ -314,6 +324,7 @@ func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInform
func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
// Initialization variables
k.stopChan = make(chan struct{})
k.mdStopChan = make(chan struct{})

config, err := LoadConfig(kubeConfigPath)
if err != nil {
Expand All @@ -325,7 +336,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
return err
}

err = k.initInformers(kubeClient)
metaKubeClient, err := metadata.NewForConfig(config)
if err != nil {
return err
}

err = k.initInformers(kubeClient, metaKubeClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -360,8 +376,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *KubeData) initInformers(client kubernetes.Interface) error {
func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
err := k.initNodeInformer(informerFactory)
if err != nil {
return err
Expand All @@ -374,7 +391,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {
if err != nil {
return err
}
err = k.initReplicaSetInformer(informerFactory)
err = k.initReplicaSetInformer(metadataInformerFactory)
if err != nil {
return err
}
Expand All @@ -384,5 +401,13 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {
informerFactory.WaitForCacheSync(k.stopChan)
log.Debugf("kubernetes informers started")

log.Debugf("starting kubernetes metadata informers, waiting for synchronization")
metadataInformerFactory.Start(k.mdStopChan)
metadataInformerFactory.WaitForCacheSync(k.mdStopChan)
log.Debugf("kubernetes metadata informers started")
return nil
}

func isServiceIPSet(ip string) bool {
return ip != v1.ClusterIPNone && ip != ""
}

0 comments on commit 4f7d7a7

Please sign in to comment.