Skip to content
This repository has been archived by the owner on Nov 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #104 from thedadams/start-controllers-while-starti…
Browse files Browse the repository at this point in the history
…ng-caches

Start controllers while the caches are starting
  • Loading branch information
thedadams authored Oct 9, 2023
2 parents 91fb95b + 9459069 commit af2b683
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions pkg/runtime/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"sync"
"time"

"github.com/acorn-io/baaah/pkg/backend"
Expand All @@ -24,21 +24,25 @@ type Backend struct {

cacheFactory SharedControllerFactory
cache cache.Cache
started atomic.Bool
startedLock *sync.RWMutex
started bool
}

func newBackend(cacheFactory SharedControllerFactory, client *cacheClient, cache cache.Cache) *Backend {
return &Backend{
cacheClient: client,
cacheFactory: cacheFactory,
cache: cache,
startedLock: new(sync.RWMutex),
}
}

func (b *Backend) Start(ctx context.Context) (err error) {
b.startedLock.Lock()
defer b.startedLock.Unlock()
defer func() {
if err == nil {
b.started.Store(true)
b.started = true
}
}()
if err := b.cacheFactory.Start(ctx, 5); err != nil {
Expand All @@ -47,9 +51,10 @@ func (b *Backend) Start(ctx context.Context) (err error) {
if !b.cache.WaitForCacheSync(ctx) {
return fmt.Errorf("failed to wait for caches to sync")
}
if !b.started.Load() {
if !b.started {
b.cacheClient.startPurge(ctx)
}

return nil
}

Expand Down Expand Up @@ -89,7 +94,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e
indexers := map[string]kcache.IndexFunc{}
for _, field := range f.FieldNames() {
field := field
indexers["field:"+field] = kcache.IndexFunc(func(obj interface{}) ([]string, error) {
indexers["field:"+field] = func(obj interface{}) ([]string, error) {
f, ok := obj.(fields.Fields)
if !ok {
return nil, nil
Expand All @@ -103,7 +108,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e
vals = append(vals, keyFunc(ko.GetNamespace(), v))
}
return vals, nil
})
}
}
return cache.AddIndexers(indexers)
}
Expand All @@ -121,7 +126,7 @@ func (b *Backend) Watch(ctx context.Context, gvk schema.GroupVersionKind, name s
})
c.RegisterHandler(ctx, fmt.Sprintf("%s %v", name, gvk), handler)

if b.started.Load() {
if b.hasStarted() {
return c.Start(ctx, 5)
}
return nil
Expand All @@ -146,3 +151,9 @@ func (b *Backend) GetInformerForKind(ctx context.Context, gvk schema.GroupVersio
}
return i.(kcache.SharedIndexInformer), nil
}

func (b *Backend) hasStarted() bool {
b.startedLock.RLock()
defer b.startedLock.RUnlock()
return b.started
}

0 comments on commit af2b683

Please sign in to comment.