From 47c209556689dec7436edf92bac07d8301d35f7c Mon Sep 17 00:00:00 2001 From: "Mohamed S. Mahmoud" <mmahmoud@redhat.com> Date: Tue, 7 Nov 2023 11:07:35 -0500 Subject: [PATCH] NETOBSERV-1247: using Meta informer for replicaSets (#474) * update go packages to include metadata informer Signed-off-by: msherif1234 <mmahmoud@redhat.com> * use metadatainformer for replicaset to save storage cache Signed-off-by: msherif1234 <mmahmoud@redhat.com> * Fix Transform func converting from PartialObjectMetadata (cherry picked from commit 0da2955e9738d2b5a53d716e7d1cbe5fddffc5f3) --------- Signed-off-by: msherif1234 <mmahmoud@redhat.com> Co-authored-by: Joel Takvorian <jtakvori@redhat.com> --- .../transform/kubernetes/kubernetes.go | 44 +++-- .../metadata/metadatainformer/informer.go | 157 ++++++++++++++++++ .../metadata/metadatainformer/interface.go | 34 ++++ .../metadata/metadatalister/interface.go | 40 +++++ .../metadata/metadatalister/lister.go | 91 ++++++++++ .../client-go/metadata/metadatalister/shim.go | 87 ++++++++++ vendor/modules.txt | 2 + 7 files changed, 445 insertions(+), 10 deletions(-) create mode 100644 vendor/k8s.io/client-go/metadata/metadatainformer/informer.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatainformer/interface.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatalister/interface.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatalister/lister.go create mode 100644 vendor/k8s.io/client-go/metadata/metadatalister/shim.go diff --git a/pkg/pipeline/transform/kubernetes/kubernetes.go b/pkg/pipeline/transform/kubernetes/kubernetes.go index 4916555c9..87bc83f8f 100644 --- a/pkg/pipeline/transform/kubernetes/kubernetes.go +++ b/pkg/pipeline/transform/kubernetes/kubernetes.go @@ -25,12 +25,15 @@ import ( "time" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" + log "github.com/sirupsen/logrus" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -60,6 +63,7 @@ type KubeData struct { // replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers replicaSets cache.SharedIndexInformer stopChan chan struct{} + mdStopChan chan struct{} } type Owner struct { @@ -267,7 +271,7 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF ips := make([]string, 0, len(svc.Spec.ClusterIPs)) for _, ip := range svc.Spec.ClusterIPs { // ignoring None IPs - if ip != v1.ClusterIPNone { + if isServiceIPSet(ip) { ips = append(ips, ip) } } @@ -291,12 +295,17 @@ func (k *KubeData) initServiceInformer(informerFactory informers.SharedInformerF return nil } -func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error { - k.replicaSets = informerFactory.Apps().V1().ReplicaSets().Informer() - // To save space, instead of storing a complete *appvs1.Replicaset instance, the - // informer's cache will store a *metav1.ObjectMeta with the minimal required fields +func (k *KubeData) initReplicaSetInformer(informerFactory metadatainformer.SharedInformerFactory) error { + k.replicaSets = informerFactory.ForResource( + schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }).Informer() + // To save space, instead of storing a complete *metav1.ObjectMeta instance, the + // informer's cache will store only the minimal required fields if err := k.replicaSets.SetTransform(func(i interface{}) (interface{}, error) { - rs, ok := i.(*appsv1.ReplicaSet) + rs, ok := i.(*metav1.PartialObjectMetadata) if !ok { return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i) } @@ -314,6 +323,7 @@ func (k *KubeData) initReplicaSetInformer(informerFactory informers.SharedInform func (k *KubeData) InitFromConfig(kubeConfigPath string) error { // Initialization variables k.stopChan = make(chan struct{}) + k.mdStopChan = make(chan struct{}) config, err := LoadConfig(kubeConfigPath) if err != nil { @@ -325,7 +335,12 @@ func (k *KubeData) InitFromConfig(kubeConfigPath string) error { return err } - err = k.initInformers(kubeClient) + metaKubeClient, err := metadata.NewForConfig(config) + if err != nil { + return err + } + + err = k.initInformers(kubeClient, metaKubeClient) if err != nil { return err } @@ -360,8 +375,9 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *KubeData) initInformers(client kubernetes.Interface) error { +func (k *KubeData) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { informerFactory := informers.NewSharedInformerFactory(client, syncTime) + metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory) if err != nil { return err @@ -374,7 +390,7 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error { if err != nil { return err } - err = k.initReplicaSetInformer(informerFactory) + err = k.initReplicaSetInformer(metadataInformerFactory) if err != nil { return err } @@ -384,5 +400,13 @@ func (k *KubeData) initInformers(client kubernetes.Interface) error { informerFactory.WaitForCacheSync(k.stopChan) log.Debugf("kubernetes informers started") + log.Debugf("starting kubernetes metadata informers, waiting for synchronization") + metadataInformerFactory.Start(k.mdStopChan) + metadataInformerFactory.WaitForCacheSync(k.mdStopChan) + log.Debugf("kubernetes metadata informers started") return nil } + +func isServiceIPSet(ip string) bool { + return ip != v1.ClusterIPNone && ip != "" +} diff --git a/vendor/k8s.io/client-go/metadata/metadatainformer/informer.go b/vendor/k8s.io/client-go/metadata/metadatainformer/informer.go new file mode 100644 index 000000000..e4ebd61f8 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatainformer/informer.go @@ -0,0 +1,157 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatainformer + +import ( + "context" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatalister" + "k8s.io/client-go/tools/cache" +) + +// NewSharedInformerFactory constructs a new instance of metadataSharedInformerFactory for all namespaces. +func NewSharedInformerFactory(client metadata.Interface, defaultResync time.Duration) SharedInformerFactory { + return NewFilteredSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil) +} + +// NewFilteredSharedInformerFactory constructs a new instance of metadataSharedInformerFactory. +// Listers obtained via this factory will be subject to the same filters as specified here. +func NewFilteredSharedInformerFactory(client metadata.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) SharedInformerFactory { + return &metadataSharedInformerFactory{ + client: client, + defaultResync: defaultResync, + namespace: namespace, + informers: map[schema.GroupVersionResource]informers.GenericInformer{}, + startedInformers: make(map[schema.GroupVersionResource]bool), + tweakListOptions: tweakListOptions, + } +} + +type metadataSharedInformerFactory struct { + client metadata.Interface + defaultResync time.Duration + namespace string + + lock sync.Mutex + informers map[schema.GroupVersionResource]informers.GenericInformer + // startedInformers is used for tracking which informers have been started. + // This allows Start() to be called multiple times safely. + startedInformers map[schema.GroupVersionResource]bool + tweakListOptions TweakListOptionsFunc +} + +var _ SharedInformerFactory = &metadataSharedInformerFactory{} + +func (f *metadataSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer { + f.lock.Lock() + defer f.lock.Unlock() + + key := gvr + informer, exists := f.informers[key] + if exists { + return informer + } + + informer = NewFilteredMetadataInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) + f.informers[key] = informer + + return informer +} + +// Start initializes all requested informers. +func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) { + f.lock.Lock() + defer f.lock.Unlock() + + for informerType, informer := range f.informers { + if !f.startedInformers[informerType] { + go informer.Informer().Run(stopCh) + f.startedInformers[informerType] = true + } + } +} + +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { + informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer.Informer() + } + } + return informers + }() + + res := map[schema.GroupVersionResource]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + +// NewFilteredMetadataInformer constructs a new informer for a metadata type. +func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { + return &metadataInformer{ + gvr: gvr, + informer: cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options) + }, + }, + &metav1.PartialObjectMetadata{}, + resyncPeriod, + indexers, + ), + } +} + +type metadataInformer struct { + informer cache.SharedIndexInformer + gvr schema.GroupVersionResource +} + +var _ informers.GenericInformer = &metadataInformer{} + +func (d *metadataInformer) Informer() cache.SharedIndexInformer { + return d.informer +} + +func (d *metadataInformer) Lister() cache.GenericLister { + return metadatalister.NewRuntimeObjectShim(metadatalister.New(d.informer.GetIndexer(), d.gvr)) +} diff --git a/vendor/k8s.io/client-go/metadata/metadatainformer/interface.go b/vendor/k8s.io/client-go/metadata/metadatainformer/interface.go new file mode 100644 index 000000000..732e565c7 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatainformer/interface.go @@ -0,0 +1,34 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatainformer + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" +) + +// SharedInformerFactory provides access to a shared informer and lister for dynamic client +type SharedInformerFactory interface { + Start(stopCh <-chan struct{}) + ForResource(gvr schema.GroupVersionResource) informers.GenericInformer + WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool +} + +// TweakListOptionsFunc defines the signature of a helper function +// that wants to provide more listing options to API +type TweakListOptionsFunc func(*metav1.ListOptions) diff --git a/vendor/k8s.io/client-go/metadata/metadatalister/interface.go b/vendor/k8s.io/client-go/metadata/metadatalister/interface.go new file mode 100644 index 000000000..bb3548589 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatalister/interface.go @@ -0,0 +1,40 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatalister + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// Lister helps list resources. +type Lister interface { + // List lists all resources in the indexer. + List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) + // Get retrieves a resource from the indexer with the given name + Get(name string) (*metav1.PartialObjectMetadata, error) + // Namespace returns an object that can list and get resources in a given namespace. + Namespace(namespace string) NamespaceLister +} + +// NamespaceLister helps list and get resources. +type NamespaceLister interface { + // List lists all resources in the indexer for a given namespace. + List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) + // Get retrieves a resource from the indexer for a given namespace and name. + Get(name string) (*metav1.PartialObjectMetadata, error) +} diff --git a/vendor/k8s.io/client-go/metadata/metadatalister/lister.go b/vendor/k8s.io/client-go/metadata/metadatalister/lister.go new file mode 100644 index 000000000..faeccc0fc --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatalister/lister.go @@ -0,0 +1,91 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatalister + +import ( + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +var _ Lister = &metadataLister{} +var _ NamespaceLister = &metadataNamespaceLister{} + +// metadataLister implements the Lister interface. +type metadataLister struct { + indexer cache.Indexer + gvr schema.GroupVersionResource +} + +// New returns a new Lister. +func New(indexer cache.Indexer, gvr schema.GroupVersionResource) Lister { + return &metadataLister{indexer: indexer, gvr: gvr} +} + +// List lists all resources in the indexer. +func (l *metadataLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) { + err = cache.ListAll(l.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*metav1.PartialObjectMetadata)) + }) + return ret, err +} + +// Get retrieves a resource from the indexer with the given name +func (l *metadataLister) Get(name string) (*metav1.PartialObjectMetadata, error) { + obj, exists, err := l.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(l.gvr.GroupResource(), name) + } + return obj.(*metav1.PartialObjectMetadata), nil +} + +// Namespace returns an object that can list and get resources from a given namespace. +func (l *metadataLister) Namespace(namespace string) NamespaceLister { + return &metadataNamespaceLister{indexer: l.indexer, namespace: namespace, gvr: l.gvr} +} + +// metadataNamespaceLister implements the NamespaceLister interface. +type metadataNamespaceLister struct { + indexer cache.Indexer + namespace string + gvr schema.GroupVersionResource +} + +// List lists all resources in the indexer for a given namespace. +func (l *metadataNamespaceLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) { + err = cache.ListAllByNamespace(l.indexer, l.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*metav1.PartialObjectMetadata)) + }) + return ret, err +} + +// Get retrieves a resource from the indexer for a given namespace and name. +func (l *metadataNamespaceLister) Get(name string) (*metav1.PartialObjectMetadata, error) { + obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(l.gvr.GroupResource(), name) + } + return obj.(*metav1.PartialObjectMetadata), nil +} diff --git a/vendor/k8s.io/client-go/metadata/metadatalister/shim.go b/vendor/k8s.io/client-go/metadata/metadatalister/shim.go new file mode 100644 index 000000000..f31c60725 --- /dev/null +++ b/vendor/k8s.io/client-go/metadata/metadatalister/shim.go @@ -0,0 +1,87 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadatalister + +import ( + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ cache.GenericLister = &metadataListerShim{} +var _ cache.GenericNamespaceLister = &metadataNamespaceListerShim{} + +// metadataListerShim implements the cache.GenericLister interface. +type metadataListerShim struct { + lister Lister +} + +// NewRuntimeObjectShim returns a new shim for Lister. +// It wraps Lister so that it implements cache.GenericLister interface +func NewRuntimeObjectShim(lister Lister) cache.GenericLister { + return &metadataListerShim{lister: lister} +} + +// List will return all objects across namespaces +func (s *metadataListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { + objs, err := s.lister.List(selector) + if err != nil { + return nil, err + } + + ret = make([]runtime.Object, len(objs)) + for index, obj := range objs { + ret[index] = obj + } + return ret, err +} + +// Get will attempt to retrieve assuming that name==key +func (s *metadataListerShim) Get(name string) (runtime.Object, error) { + return s.lister.Get(name) +} + +func (s *metadataListerShim) ByNamespace(namespace string) cache.GenericNamespaceLister { + return &metadataNamespaceListerShim{ + namespaceLister: s.lister.Namespace(namespace), + } +} + +// metadataNamespaceListerShim implements the NamespaceLister interface. +// It wraps NamespaceLister so that it implements cache.GenericNamespaceLister interface +type metadataNamespaceListerShim struct { + namespaceLister NamespaceLister +} + +// List will return all objects in this namespace +func (ns *metadataNamespaceListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { + objs, err := ns.namespaceLister.List(selector) + if err != nil { + return nil, err + } + + ret = make([]runtime.Object, len(objs)) + for index, obj := range objs { + ret[index] = obj + } + return ret, err +} + +// Get will attempt to retrieve by namespace and name +func (ns *metadataNamespaceListerShim) Get(name string) (runtime.Object, error) { + return ns.namespaceLister.Get(name) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5eceedc82..8d79ed5af 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -958,6 +958,8 @@ k8s.io/client-go/listers/storage/v1 k8s.io/client-go/listers/storage/v1alpha1 k8s.io/client-go/listers/storage/v1beta1 k8s.io/client-go/metadata +k8s.io/client-go/metadata/metadatainformer +k8s.io/client-go/metadata/metadatalister k8s.io/client-go/openapi k8s.io/client-go/pkg/apis/clientauthentication k8s.io/client-go/pkg/apis/clientauthentication/install