Skip to content

Commit

Permalink
use metadatainformer for replicaset to save storage cache
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <[email protected]>
  • Loading branch information
msherif1234 committed Nov 1, 2023
1 parent 926ccc6 commit 86730ae
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions pkg/pipeline/transform/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -267,7 +269,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
ips := make([]string, 0, len(svc.Spec.ClusterIPs))
for _, ip := range svc.Spec.ClusterIPs {
// ignoring None IPs
if ip != v1.ClusterIPNone {
if isServiceIPSet(ip) {
ips = append(ips, ip)
}
}
Expand All @@ -291,23 +293,8 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
return nil
}

func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error {
k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer()
// To save space, instead of storing a complete *appvs1.Replicaset instance, the
// informer's cache will store a *metav1.ObjectMeta with the minimal required fields
if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) {
rs, ok := i.(*appsv1.ReplicaSet)
if !ok {
return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i)
}
return &metav1.ObjectMeta{
Name: rs.Name,
Namespace: rs.Namespace,
OwnerReferences: rs.OwnerReferences,
}, nil
}); err != nil {
return fmt.Errorf("can't set ReplicaSets transform: %w", err)
}
func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error {
k.replicaSets = informerFactory.ForResource(appsv1.SchemeGroupVersion.WithResource("deployment")).Informer()
return nil
}

Expand All @@ -325,7 +312,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
return err
}

err = k.initInformers(kubeClient)
metaKubeClient, err := metadata.NewForConfig(config)
if err != nil {
return err
}

err = k.initInformers(kubeClient, metaKubeClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -360,8 +352,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *KubeData) initInformers(client kubernetes.Interface) error {
func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
err := k.initNodeInformer(informerFactory)
if err != nil {
return err
Expand All @@ -374,7 +367,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {
if err != nil {
return err
}
err = k.initReplicaSetInformer(informerFactory)
err = k.initReplicaSetInformer(metadataInformerFactory)
if err != nil {
return err
}
Expand All @@ -386,3 +379,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {

return nil
}

func isServiceIPSet(ip string) bool {
return ip != v1.ClusterIPNone && ip != ""
}

0 comments on commit 86730ae

Please sign in to comment.