Skip to content

Commit

Permalink
NETOBSERV-1247: using Meta informer for replicaSets (netobserv#474)
Browse files Browse the repository at this point in the history
* update go packages to include metadata informer

Signed-off-by: msherif1234 <[email protected]>

* use metadatainformer for replicaset to save storage cache

Signed-off-by: msherif1234 <[email protected]>

* Fix Transform func converting from PartialObjectMetadata

(cherry picked from commit 0da2955e9738d2b5a53d716e7d1cbe5fddffc5f3)

---------

Signed-off-by: msherif1234 <[email protected]>
Co-authored-by: Joel Takvorian <[email protected]>
  • Loading branch information
msherif1234 and jotak authored Nov 7, 2023
1 parent e3a97f1 commit 47c2095
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 10 deletions.
44 changes: 34 additions & 10 deletions pkg/pipeline/transform/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni"

log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -60,6 +63,7 @@ type KubeData struct {
// replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers
replicaSets cache.SharedIndexInformer
stopChan chan struct{}
mdStopChan chan struct{}
}

type Owner struct {
Expand Down Expand Up @@ -267,7 +271,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
ips := make([]string, 0, len(svc.Spec.ClusterIPs))
for _, ip := range svc.Spec.ClusterIPs {
// ignoring None IPs
if ip != v1.ClusterIPNone {
if isServiceIPSet(ip) {
ips = append(ips, ip)
}
}
Expand All @@ -291,12 +295,17 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF
return nil
}

func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error {
k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer()
// To save space, instead of storing a complete *appvs1.Replicaset instance, the
// informer's cache will store a *metav1.ObjectMeta with the minimal required fields
func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error {
k.replicaSets = informerFactory.ForResource(
schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "replicasets",
}).Informer()
// To save space, instead of storing a complete *metav1.ObjectMeta instance, the
// informer's cache will store only the minimal required fields
if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) {
rs, ok := i.(*appsv1.ReplicaSet)
rs, ok := i.(*metav1.PartialObjectMetadata)
if !ok {
return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i)
}
Expand All @@ -314,6 +323,7 @@ func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInform
func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
// Initialization variables
k.stopChan = make(chan struct{})
k.mdStopChan = make(chan struct{})

config, err := LoadConfig(kubeConfigPath)
if err != nil {
Expand All @@ -325,7 +335,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error {
return err
}

err = k.initInformers(kubeClient)
metaKubeClient, err := metadata.NewForConfig(config)
if err != nil {
return err
}

err = k.initInformers(kubeClient, metaKubeClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -360,8 +375,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *KubeData) initInformers(client kubernetes.Interface) error {
func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime)
err := k.initNodeInformer(informerFactory)
if err != nil {
return err
Expand All @@ -374,7 +390,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {
if err != nil {
return err
}
err = k.initReplicaSetInformer(informerFactory)
err = k.initReplicaSetInformer(metadataInformerFactory)
if err != nil {
return err
}
Expand All @@ -384,5 +400,13 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error {
informerFactory.WaitForCacheSync(k.stopChan)
log.Debugf("kubernetes informers started")

log.Debugf("starting kubernetes metadata informers, waiting for synchronization")
metadataInformerFactory.Start(k.mdStopChan)
metadataInformerFactory.WaitForCacheSync(k.mdStopChan)
log.Debugf("kubernetes metadata informers started")
return nil
}

func isServiceIPSet(ip string) bool {
return ip != v1.ClusterIPNone && ip != ""
}
157 changes: 157 additions & 0 deletions vendor/k8s.io/client-go/metadata/metadatainformer/informer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions vendor/k8s.io/client-go/metadata/metadatainformer/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions vendor/k8s.io/client-go/metadata/metadatalister/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 47c2095

Please sign in to comment.