Skip to content

Commit

Permalink
use metadatainformer for replicaset to save storage cache
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <[email protected]>
  • Loading branch information
msherif1234 committed Nov 3, 2023
1 parent 926ccc6 commit 9dfd200
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions pkg/pipeline/transform/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni"

log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -267,7 +270,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
ips := make([]string, 0, len(svc.Spec.ClusterIPs))
for _, ip := range svc.Spec.ClusterIPs {
// ignoring None IPs
if ip != v1.ClusterIPNone {
if isServiceIPSet(ip) {
ips = append(ips, ip)
}
}
Expand All @@ -291,23 +294,13 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
return nil
}

func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error {
k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer()
// To save space, instead of storing a complete *appvs1.Replicaset instance, the
// informer's cache will store a *metav1.ObjectMeta with the minimal required fields
if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) {
rs, ok := i.(*appsv1.ReplicaSet)
if !ok {
return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i)
}
return &metav1.ObjectMeta{
Name: rs.Name,
Namespace: rs.Namespace,
OwnerReferences: rs.OwnerReferences,
}, nil
}); err != nil {
return fmt.Errorf("can't set ReplicaSets transform: %w", err)
}
func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error {
k.replicaSets = informerFactory.ForResource(
schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "replicasets",
}).Informer()
return nil
}

Expand All @@ -325,7 +318,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
return err
}

err = k.initInformers(kubeClient)
metaKubeClient, err := metadata.NewForConfig(config)
if err != nil {
return err
}

err = k.initInformers(kubeClient, metaKubeClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -360,8 +358,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *KubeData) initInformers(client kubernetes.Interface) error {
func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
err := k.initNodeInformer(informerFactory)
if err != nil {
return err
Expand All @@ -374,7 +373,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {
if err != nil {
return err
}
err = k.initReplicaSetInformer(informerFactory)
err = k.initReplicaSetInformer(metadataInformerFactory)
if err != nil {
return err
}
Expand All @@ -386,3 +385,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {

return nil
}

func isServiceIPSet(ip string) bool {
return ip != v1.ClusterIPNone && ip != ""
}

0 comments on commit 9dfd200

Please sign in to comment.