Skip to content

Commit

Permalink
add debug
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <[email protected]>
  • Loading branch information
miklezzzz committed Sep 19, 2024
1 parent 22aa2b0 commit 08f1130
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 27 deletions.
65 changes: 40 additions & 25 deletions pkg/kube_events_manager/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -30,10 +31,10 @@ type FactoryIndex struct {
}

type Factory struct {
shared dynamicinformer.DynamicSharedInformerFactory
score uint64
ctx context.Context
cancel context.CancelFunc
shared dynamicinformer.DynamicSharedInformerFactory
handlerRegistrations map[string]cache.ResourceEventHandlerRegistration
ctx context.Context
cancel context.CancelFunc
}

type FactoryStore struct {
Expand All @@ -47,21 +48,21 @@ func NewFactoryStore() *FactoryStore {
}
}

func (c *FactoryStore) add(ctx context.Context, index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
cctx, cancel := context.WithCancel(ctx)
func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
ctx, cancel := context.WithCancel(context.Background())
c.data[index] = Factory{
shared: f,
score: uint64(1),
ctx: cctx,
cancel: cancel,
shared: f,
handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration, 0),
ctx: ctx,
cancel: cancel,
}
log.Debugf("Factory store: added a new factory for %v index", index)
}

func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index FactoryIndex) Factory {
func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory {
f, ok := c.data[index]
if ok {
f.score++
c.data[index] = f
log.Debugf("Factory store: the factory with %v index found", index)
return f
}

Expand All @@ -81,21 +82,29 @@ func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index
client, resyncPeriod, index.Namespace, tweakListOptions)
factory.ForResource(index.GVR)

c.add(ctx, index, factory)
c.add(index, factory)
return c.data[index]
}

func (c *FactoryStore) Start(ctx context.Context, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error {
func (c *FactoryStore) Start(informerId string, ctx context.Context, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error {
c.mu.Lock()
defer c.mu.Unlock()

factory := c.get(ctx, client, index)
factory := c.get(client, index)

informer := factory.shared.ForResource(index.GVR).Informer()
// Add error handler, ignore "already started" error.
_ = informer.SetWatchErrorHandler(errorHandler.handler)
// TODO(nabokihms): think about what will happen if we stop and then start the monitor
informer.AddEventHandler(handler)
registration, err := informer.AddEventHandler(handler)
if err != nil {
log.Warnf("Factory store: couldn't add event handler to the %v factory's informer: %v", index, err)
}
if _, found := factory.handlerRegistrations[informerId]; found {
log.Errorf("Factory store: resource informer %s already has a handler", informerId)
}
factory.handlerRegistrations[informerId] = registration
log.Debugf("Factory store: increased usage counter to %d of the factory with %v index", len(factory.handlerRegistrations), index)

if !informer.HasSynced() {
go informer.Run(factory.ctx.Done())
Expand All @@ -106,10 +115,11 @@ func (c *FactoryStore) Start(ctx context.Context, client dynamic.Interface, inde
return err
}
}
log.Debugf("Factory store: started informer for %v index", index)
return nil
}

func (c *FactoryStore) Stop(index FactoryIndex) {
func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -119,12 +129,17 @@ func (c *FactoryStore) Stop(index FactoryIndex) {
return
}

f.score--
if f.score == 0 {
f.cancel()
delete(c.data, index)
return
if handlerRegistration, found := f.handlerRegistrations[informerId]; found {
err := f.shared.ForResource(index.GVR).Informer().RemoveEventHandler(handlerRegistration)
if err != nil {
log.Warnf("Factory store: couldn't remove event handler from the %v factory's informer: %v", index, err)
}
delete(f.handlerRegistrations, informerId)
log.Debugf("Factory store: decreased usage counter to %d of the factory with %v index", len(f.handlerRegistrations), index)
if len(f.handlerRegistrations) == 0 {
f.cancel()
delete(c.data, index)
log.Debugf("Factory store: deleted factory for %v index", index)
}
}

c.data[index] = f
}
12 changes: 10 additions & 2 deletions pkg/kube_events_manager/resource_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/gofrs/uuid/v5"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -20,6 +21,7 @@ import (
)

type resourceInformer struct {
id string
KubeClient *klient.Client
Monitor *MonitorConfig
// Filter by namespace
Expand Down Expand Up @@ -70,6 +72,7 @@ type resourceInformerConfig struct {

func newResourceInformer(ns, name string, cfg *resourceInformerConfig) *resourceInformer {
informer := &resourceInformer{
id: uuid.Must(uuid.NewV4()).String(),
KubeClient: cfg.client,
metricStorage: cfg.mstor,
Namespace: ns,
Expand Down Expand Up @@ -256,6 +259,11 @@ func (ei *resourceInformer) OnDelete(obj interface{}) {
func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType WatchEventType) {
// check if stop
if ei.stopped {
log.Debugf("%s: received WATCH for a stopped %s/%s informer %s",
ei.Monitor.Metadata.DebugName,
ei.Namespace,
ei.Name,
eventType)
return
}

Expand Down Expand Up @@ -427,13 +435,13 @@ func (ei *resourceInformer) start() {
go func() {
if ei.ctx != nil {
<-ei.ctx.Done()
DefaultFactoryStore.Stop(ei.FactoryIndex)
DefaultFactoryStore.Stop(ei.id, ei.FactoryIndex)
}
}()

// TODO: separate handler and informer
errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage)
err := DefaultFactoryStore.Start(ei.ctx, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
err := DefaultFactoryStore.Start(ei.id, ei.ctx, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
if err != nil {
ei.Monitor.LogEntry.Errorf("%s: cache is not synced for informer", ei.Monitor.Metadata.DebugName)
return
Expand Down

0 comments on commit 08f1130

Please sign in to comment.