Skip to content

Commit

Permalink
controller/engine: use local cache for client read requests
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
  • Loading branch information
sttts committed Oct 5, 2023
1 parent b537456 commit 9746110
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 19 deletions.
265 changes: 265 additions & 0 deletions pkg/controller/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
Copyright 2023 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// GVKRoutedCache is a cache that routes requests by GVK to other caches.
type GVKRoutedCache struct {
scheme *runtime.Scheme

fallback cache.Cache

lock sync.RWMutex
delegates map[schema.GroupVersionKind]cache.Cache
}

// NewGVKRoutedCache returns a new routed cache.
func NewGVKRoutedCache(scheme *runtime.Scheme, fallback cache.Cache) *GVKRoutedCache {
return &GVKRoutedCache{
scheme: scheme,
fallback: fallback,
delegates: make(map[schema.GroupVersionKind]cache.Cache),
}
}

var _ cache.Cache = &GVKRoutedCache{}

// AddDelegate adds a delegated cache for a given GVK.
func (c *GVKRoutedCache) AddDelegate(gvk schema.GroupVersionKind, delegate cache.Cache) {
c.lock.Lock()
defer c.lock.Unlock()

c.delegates[gvk] = delegate
}

// RemoveDelegate removes a delegated cache for a given GVK.
func (c *GVKRoutedCache) RemoveDelegate(gvk schema.GroupVersionKind) {
c.lock.Lock()
defer c.lock.Unlock()

delete(c.delegates, gvk)
}

// Get retrieves an object for a given ObjectKey backed by a cache.
func (c *GVKRoutedCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.Get(ctx, key, obj, opts...)
}

return c.fallback.Get(ctx, key, obj, opts...)
}

// List lists objects for a given ObjectList backed by a cache.
func (c *GVKRoutedCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.scheme)
if err != nil {
return fmt.Errorf("failed to get GVK for type %T: %w", list, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.List(ctx, list, opts...)
}

return c.fallback.List(ctx, list, opts...)
}

// GetInformer returns an informer for the given object.
func (c *GVKRoutedCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.GetInformer(ctx, obj, opts...)
}

return c.fallback.GetInformer(ctx, obj, opts...)
}

// GetInformerForKind returns an informer for the given GVK.
func (c *GVKRoutedCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.GetInformerForKind(ctx, gvk, opts...)
}

return c.fallback.GetInformerForKind(ctx, gvk, opts...)
}

// Start for a GVKRoutedCache is a no-op. Start must be called for each delegate.
func (c *GVKRoutedCache) Start(_ context.Context) error {
return nil
}

// WaitForCacheSync for a GVKRoutedCache waits for all delegates to sync, and
// returns false if any of them fails to sync.
func (c *GVKRoutedCache) WaitForCacheSync(ctx context.Context) bool {
var wg sync.WaitGroup
synced := make(chan bool, len(c.delegates)+1)

ctx, cacnelFn := context.WithCancel(ctx)

c.lock.RLock()
wg.Add(len(c.delegates) + 1)
for _, delegate := range c.delegates {
go func(delegate cache.Cache) {
defer wg.Done()
synced := delegate.WaitForCacheSync(ctx)
if !synced {
// first unsynced cache breaks the whole wait
cacnelFn()
}
}(delegate)
}
c.lock.RUnlock()

wg.Wait()
close(synced)
cacnelFn()

// any not synced?
for synced := range synced {
if !synced {
return false
}
}

return true
}

// IndexField adds an index with the given field name on the given object type
// by using the given function to extract the value for that field.
func (c *GVKRoutedCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.IndexField(ctx, obj, field, extractValue)
}

return c.fallback.IndexField(ctx, obj, field, extractValue)
}

// cachedRoutedClient wraps a client and routes read requests by GVK to a cache.
type cachedRoutedClient struct {
client.Client

scheme *runtime.Scheme
cache *GVKRoutedCache
}

func (c *cachedRoutedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.cache.lock.RLock()
delegate, ok := c.cache.delegates[gvk]
c.cache.lock.RUnlock()

if ok {
return delegate.Get(ctx, key, obj, opts...)
}

return c.Client.Get(ctx, key, obj, opts...)
}

func (c *cachedRoutedClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.scheme)
if err != nil {
return fmt.Errorf("failed to get GVK for type %T: %w", list, err)
}

c.cache.lock.RLock()
delegate, ok := c.cache.delegates[gvk]
c.cache.lock.RUnlock()

if ok {
return delegate.List(ctx, list, opts...)
}

return c.Client.List(ctx, list, opts...)
}

// WithGVKRoutedCache returns a manager backed by a GVKRoutedCache. The client
// returned by the manager will route read requests to cached GVKs.
func WithGVKRoutedCache(c *GVKRoutedCache, mgr controllerruntime.Manager) controllerruntime.Manager {
return &routedManager{
Manager: mgr,
client: &cachedRoutedClient{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
cache: c,
},
cache: c,
}
}

type routedManager struct {
controllerruntime.Manager

client client.Client
cache cache.Cache
}

func (m *routedManager) GetClient() client.Client {
return m.client
}

func (m *routedManager) GetCache() cache.Cache {
return m.cache
}
88 changes: 72 additions & 16 deletions pkg/controller/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -176,51 +177,106 @@ func TriggeredBy(source source.Source, h handler.EventHandler, p ...predicate.Pr
// the supplied options, and configured with the supplied watches. Start does
// not block.
func (e *Engine) Start(name string, o controller.Options, w ...Watch) error {
if e.IsRunning(name) {
return nil
c, err := e.Create(name, o, w...)
if err != nil {
return err
}
return c.Start(context.Background())
}

ctx, stop := context.WithCancel(context.Background())
e.mx.Lock()
e.started[name] = stop
e.errors[name] = nil
e.mx.Unlock()
// NamedController is a named controller that is not started yet.
type NamedController interface {
Start(ctx context.Context) error
GetCache() cache.Cache
}

type namedController struct {
name string
e *Engine
ca cache.Cache
ctrl controller.Controller
}

// Create the named controller. Each controller gets its own cache
// whose lifecycle is coupled to the controller. The controller is created with
// the supplied options, and configured with the supplied watches. It is not
// started yet.
func (e *Engine) Create(name string, o controller.Options, w ...Watch) (NamedController, error) {
// Each controller gets its own cache because there's currently no way to
// stop an informer. In practice a controller-runtime cache is a map of
// kinds to informers. If we delete the CRD for a kind we need to stop the
// relevant informer, or it will spew errors about the kind not existing. We
// work around this by stopping the entire cache.
ca, err := e.newCache(e.mgr.GetConfig(), cache.Options{Scheme: e.mgr.GetScheme(), Mapper: e.mgr.GetRESTMapper()})
if err != nil {
return errors.Wrap(err, errCreateCache)
return nil, errors.Wrap(err, errCreateCache)
}

ctrl, err := e.newCtrl(name, e.mgr, o)
// Wrap the existing manager to use our cache for the GVKs of this controller.
rc := NewGVKRoutedCache(e.mgr.GetScheme(), e.mgr.GetCache())
rm := &routedManager{
Manager: e.mgr,
client: &cachedRoutedClient{
Client: e.mgr.GetClient(),
scheme: e.mgr.GetScheme(),
cache: rc,
},
cache: rc,
}

ctrl, err := e.newCtrl(name, rm, o)
if err != nil {
return errors.Wrap(err, errCreateController)
return nil, errors.Wrap(err, errCreateController)
}

for _, wt := range w {
if wt.customSource != nil {
if err := ctrl.Watch(wt.customSource, wt.handler, wt.predicates...); err != nil {
return errors.Wrap(err, errWatch)
return nil, errors.Wrap(err, errWatch)
}
continue
}

// route cache and client (read) requests to our cache for this GVK.
gvk, err := apiutil.GVKForObject(wt.kind, e.mgr.GetScheme())
if err != nil {
return nil, errors.Wrapf(err, "failed to get GVK for type %T", wt.kind)
}
rc.AddDelegate(gvk, ca)

if err := ctrl.Watch(source.Kind(ca, wt.kind), wt.handler, wt.predicates...); err != nil {
return errors.Wrap(err, errWatch)
return nil, errors.Wrap(err, errWatch)
}
}

return namedController{name: name, e: e, ca: ca, ctrl: ctrl}, nil
}

// Start the named controller. Start does not block.
func (c namedController) Start(ctx context.Context) error {
if c.e.IsRunning(c.name) {
return nil
}

ctx, stop := context.WithCancel(ctx)
c.e.mx.Lock()
c.e.started[c.name] = stop
c.e.errors[c.name] = nil
c.e.mx.Unlock()

go func() {
<-e.mgr.Elected()
e.done(name, errors.Wrap(ca.Start(ctx), errCrashCache))
<-c.e.mgr.Elected()
c.e.done(c.name, errors.Wrap(c.ca.Start(ctx), errCrashCache))
}()
go func() {
<-e.mgr.Elected()
e.done(name, errors.Wrap(ctrl.Start(ctx), errCrashController))
<-c.e.mgr.Elected()
c.e.done(c.name, errors.Wrap(c.ctrl.Start(ctx), errCrashController))
}()

return nil
}

// GetCache returns the cache used by the named controller.
func (c namedController) GetCache() cache.Cache {
return c.ca
}
Loading

0 comments on commit 9746110

Please sign in to comment.