diff --git a/pkg/controller/cache.go b/pkg/controller/cache.go new file mode 100644 index 000000000..4004b0030 --- /dev/null +++ b/pkg/controller/cache.go @@ -0,0 +1,265 @@ +/* +Copyright 2023 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// GVKRoutedCache is a cache that routes requests by GVK to other caches. +type GVKRoutedCache struct { + scheme *runtime.Scheme + + fallback cache.Cache + + lock sync.RWMutex + delegates map[schema.GroupVersionKind]cache.Cache +} + +// NewGVKRoutedCache returns a new routed cache. +func NewGVKRoutedCache(scheme *runtime.Scheme, fallback cache.Cache) *GVKRoutedCache { + return &GVKRoutedCache{ + scheme: scheme, + fallback: fallback, + delegates: make(map[schema.GroupVersionKind]cache.Cache), + } +} + +var _ cache.Cache = &GVKRoutedCache{} + +// AddDelegate adds a delegated cache for a given GVK. +func (c *GVKRoutedCache) AddDelegate(gvk schema.GroupVersionKind, delegate cache.Cache) { + c.lock.Lock() + defer c.lock.Unlock() + + c.delegates[gvk] = delegate +} + +// RemoveDelegate removes a delegated cache for a given GVK. +func (c *GVKRoutedCache) RemoveDelegate(gvk schema.GroupVersionKind) { + c.lock.Lock() + defer c.lock.Unlock() + + delete(c.delegates, gvk) +} + +// Get retrieves an object for a given ObjectKey backed by a cache. +func (c *GVKRoutedCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return fmt.Errorf("failed to get GVK for type %T: %w", obj, err) + } + + c.lock.RLock() + delegate, ok := c.delegates[gvk] + c.lock.RUnlock() + + if ok { + return delegate.Get(ctx, key, obj, opts...) + } + + return c.fallback.Get(ctx, key, obj, opts...) +} + +// List lists objects for a given ObjectList backed by a cache. +func (c *GVKRoutedCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + gvk, err := apiutil.GVKForObject(list, c.scheme) + if err != nil { + return fmt.Errorf("failed to get GVK for type %T: %w", list, err) + } + + c.lock.RLock() + delegate, ok := c.delegates[gvk] + c.lock.RUnlock() + + if ok { + return delegate.List(ctx, list, opts...) + } + + return c.fallback.List(ctx, list, opts...) +} + +// GetInformer returns an informer for the given object. +func (c *GVKRoutedCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) { + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err) + } + + c.lock.RLock() + delegate, ok := c.delegates[gvk] + c.lock.RUnlock() + + if ok { + return delegate.GetInformer(ctx, obj, opts...) + } + + return c.fallback.GetInformer(ctx, obj, opts...) +} + +// GetInformerForKind returns an informer for the given GVK. +func (c *GVKRoutedCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) { + c.lock.RLock() + delegate, ok := c.delegates[gvk] + c.lock.RUnlock() + + if ok { + return delegate.GetInformerForKind(ctx, gvk, opts...) + } + + return c.fallback.GetInformerForKind(ctx, gvk, opts...) +} + +// Start for a GVKRoutedCache is a no-op. Start must be called for each delegate. +func (c *GVKRoutedCache) Start(_ context.Context) error { + return nil +} + +// WaitForCacheSync for a GVKRoutedCache waits for all delegates to sync, and +// returns false if any of them fails to sync. +func (c *GVKRoutedCache) WaitForCacheSync(ctx context.Context) bool { + var wg sync.WaitGroup + synced := make(chan bool, len(c.delegates)+1) + + ctx, cacnelFn := context.WithCancel(ctx) + + c.lock.RLock() + wg.Add(len(c.delegates) + 1) + for _, delegate := range c.delegates { + go func(delegate cache.Cache) { + defer wg.Done() + synced := delegate.WaitForCacheSync(ctx) + if !synced { + // first unsynced cache breaks the whole wait + cacnelFn() + } + }(delegate) + } + c.lock.RUnlock() + + wg.Wait() + close(synced) + cacnelFn() + + // any not synced? + for synced := range synced { + if !synced { + return false + } + } + + return true +} + +// IndexField adds an index with the given field name on the given object type +// by using the given function to extract the value for that field. +func (c *GVKRoutedCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return fmt.Errorf("failed to get GVK for type %T: %w", obj, err) + } + + c.lock.RLock() + delegate, ok := c.delegates[gvk] + c.lock.RUnlock() + + if ok { + return delegate.IndexField(ctx, obj, field, extractValue) + } + + return c.fallback.IndexField(ctx, obj, field, extractValue) +} + +// cachedRoutedClient wraps a client and routes read requests by GVK to a cache. +type cachedRoutedClient struct { + client.Client + + scheme *runtime.Scheme + cache *GVKRoutedCache +} + +func (c *cachedRoutedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + gvk, err := apiutil.GVKForObject(obj, c.scheme) + if err != nil { + return fmt.Errorf("failed to get GVK for type %T: %w", obj, err) + } + + c.cache.lock.RLock() + delegate, ok := c.cache.delegates[gvk] + c.cache.lock.RUnlock() + + if ok { + return delegate.Get(ctx, key, obj, opts...) + } + + return c.Client.Get(ctx, key, obj, opts...) +} + +func (c *cachedRoutedClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + gvk, err := apiutil.GVKForObject(list, c.scheme) + if err != nil { + return fmt.Errorf("failed to get GVK for type %T: %w", list, err) + } + + c.cache.lock.RLock() + delegate, ok := c.cache.delegates[gvk] + c.cache.lock.RUnlock() + + if ok { + return delegate.List(ctx, list, opts...) + } + + return c.Client.List(ctx, list, opts...) +} + +// WithGVKRoutedCache returns a manager backed by a GVKRoutedCache. The client +// returned by the manager will route read requests to cached GVKs. +func WithGVKRoutedCache(c *GVKRoutedCache, mgr controllerruntime.Manager) controllerruntime.Manager { + return &routedManager{ + Manager: mgr, + client: &cachedRoutedClient{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + cache: c, + }, + cache: c, + } +} + +type routedManager struct { + controllerruntime.Manager + + client client.Client + cache cache.Cache +} + +func (m *routedManager) GetClient() client.Client { + return m.client +} + +func (m *routedManager) GetCache() cache.Cache { + return m.cache +} diff --git a/pkg/controller/engine.go b/pkg/controller/engine.go index 107fc2571..e7ab051f1 100644 --- a/pkg/controller/engine.go +++ b/pkg/controller/engine.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -176,16 +177,26 @@ func TriggeredBy(source source.Source, h handler.EventHandler, p ...predicate.Pr // the supplied options, and configured with the supplied watches. Start does // not block. func (e *Engine) Start(name string, o controller.Options, w ...Watch) error { - if e.IsRunning(name) { - return nil + c, err := e.Create(name, o, w...) + if err != nil { + return err } + return c.Start(context.Background()) +} - ctx, stop := context.WithCancel(context.Background()) - e.mx.Lock() - e.started[name] = stop - e.errors[name] = nil - e.mx.Unlock() +// NamedController is a named controller that is not started yet. +type NamedController struct { + name string + e *Engine + ca cache.Cache + ctrl controller.Controller +} +// Create the named controller. Each controller gets its own cache +// whose lifecycle is coupled to the controller. The controller is created with +// the supplied options, and configured with the supplied watches. It is not +// started yet. +func (e *Engine) Create(name string, o controller.Options, w ...Watch) (NamedController, error) { // Each controller gets its own cache because there's currently no way to // stop an informer. In practice a controller-runtime cache is a map of // kinds to informers. If we delete the CRD for a kind we need to stop the @@ -193,34 +204,74 @@ func (e *Engine) Start(name string, o controller.Options, w ...Watch) error { // work around this by stopping the entire cache. ca, err := e.newCache(e.mgr.GetConfig(), cache.Options{Scheme: e.mgr.GetScheme(), Mapper: e.mgr.GetRESTMapper()}) if err != nil { - return errors.Wrap(err, errCreateCache) + return NamedController{}, errors.Wrap(err, errCreateCache) } - ctrl, err := e.newCtrl(name, e.mgr, o) + // Wrap the existing manager to use our cache for the GVKs of this controller. + rc := NewGVKRoutedCache(e.mgr.GetScheme(), e.mgr.GetCache()) + rm := &routedManager{ + Manager: e.mgr, + client: &cachedRoutedClient{ + Client: e.mgr.GetClient(), + scheme: e.mgr.GetScheme(), + cache: rc, + }, + cache: rc, + } + + ctrl, err := e.newCtrl(name, rm, o) if err != nil { - return errors.Wrap(err, errCreateController) + return NamedController{}, errors.Wrap(err, errCreateController) } for _, wt := range w { if wt.customSource != nil { if err := ctrl.Watch(wt.customSource, wt.handler, wt.predicates...); err != nil { - return errors.Wrap(err, errWatch) + return NamedController{}, errors.Wrap(err, errWatch) } continue } + + // route cache and client (read) requests to our cache for this GVK. + gvk, err := apiutil.GVKForObject(wt.kind, e.mgr.GetScheme()) + if err != nil { + return NamedController{}, errors.Wrapf(err, "failed to get GVK for type %T", wt.kind) + } + rc.AddDelegate(gvk, ca) + if err := ctrl.Watch(source.Kind(ca, wt.kind), wt.handler, wt.predicates...); err != nil { - return errors.Wrap(err, errWatch) + return NamedController{}, errors.Wrap(err, errWatch) } } + return NamedController{name: name, e: e, ca: ca, ctrl: ctrl}, nil +} + +// Start the named controller. Start does not block. +func (c NamedController) Start(ctx context.Context) error { + if c.e.IsRunning(c.name) { + return nil + } + + ctx, stop := context.WithCancel(ctx) + c.e.mx.Lock() + c.e.started[c.name] = stop + c.e.errors[c.name] = nil + c.e.mx.Unlock() + go func() { - <-e.mgr.Elected() - e.done(name, errors.Wrap(ca.Start(ctx), errCrashCache)) + <-c.e.mgr.Elected() + c.e.done(c.name, errors.Wrap(c.ca.Start(ctx), errCrashCache)) }() go func() { - <-e.mgr.Elected() - e.done(name, errors.Wrap(ctrl.Start(ctx), errCrashController)) + <-c.e.mgr.Elected() + c.e.done(c.name, errors.Wrap(c.ctrl.Start(ctx), errCrashController)) }() return nil } + +// GetCache returns the cache used by the named controller. +func (c NamedController) GetCache() cache.Cache { + return c.ca +} diff --git a/pkg/controller/engine_test.go b/pkg/controller/engine_test.go index 1abca9f36..9098bec14 100644 --- a/pkg/controller/engine_test.go +++ b/pkg/controller/engine_test.go @@ -22,6 +22,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -92,7 +94,11 @@ func TestEngine(t *testing.T) { }, "NewControllerError": { reason: "Errors creating a new controller should be returned", - e: NewEngine(&fake.Manager{}, + e: NewEngine( + &fake.Manager{ + Scheme: runtime.NewScheme(), + Cache: &MockCache{}, + }, WithNewCacheFn(func(*rest.Config, cache.Options) (cache.Cache, error) { return nil, nil }), WithNewControllerFn(func(string, manager.Manager, controller.Options) (controller.Controller, error) { return nil, errBoom }), ), @@ -105,7 +111,11 @@ func TestEngine(t *testing.T) { }, "WatchError": { reason: "Errors adding a watch should be returned", - e: NewEngine(&fake.Manager{}, + e: NewEngine( + &fake.Manager{ + Scheme: runtime.NewScheme(), + Cache: &MockCache{}, + }, WithNewCacheFn(func(*rest.Config, cache.Options) (cache.Cache, error) { return nil, nil }), WithNewControllerFn(func(string, manager.Manager, controller.Options) (controller.Controller, error) { c := &MockController{MockWatch: func(source.Source, handler.EventHandler, ...predicate.Predicate) error { return errBoom }} @@ -114,12 +124,35 @@ func TestEngine(t *testing.T) { ), args: args{ name: "coolcontroller", - w: []Watch{For(&fake.Managed{}, nil)}, + w: []Watch{For(&unstructured.Unstructured{ + Object: map[string]interface{}{"apiVersion": "example.org/v1", "kind": "Thing"}, + }, nil)}, }, want: want{ err: errors.Wrap(errBoom, errWatch), }, }, + "SchemeError": { + reason: "Passing an object of unknown GVK", + e: NewEngine( + &fake.Manager{ + Scheme: runtime.NewScheme(), + Cache: &MockCache{}, + }, + WithNewCacheFn(func(*rest.Config, cache.Options) (cache.Cache, error) { return nil, nil }), + WithNewControllerFn(func(string, manager.Manager, controller.Options) (controller.Controller, error) { + c := &MockController{MockWatch: func(source.Source, handler.EventHandler, ...predicate.Predicate) error { return errBoom }} + return c, nil + }), + ), + args: args{ + name: "coolcontroller", + w: []Watch{For(&unstructured.Unstructured{}, nil)}, + }, + want: want{ + err: errors.Wrap(runtime.NewMissingKindErr("unstructured object has no kind"), "failed to get GVK for type *unstructured.Unstructured"), + }, + }, "CacheCrashError": { reason: "Errors starting or running a cache should be returned", e: NewEngine(&fake.Manager{}, diff --git a/pkg/resource/fake/mocks.go b/pkg/resource/fake/mocks.go index 29cc9d5e6..e240d7171 100644 --- a/pkg/resource/fake/mocks.go +++ b/pkg/resource/fake/mocks.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -441,6 +442,7 @@ func (m *CompositeClaim) DeepCopyObject() runtime.Object { type Manager struct { manager.Manager + Cache cache.Cache Client client.Client Scheme *runtime.Scheme Config *rest.Config @@ -455,6 +457,9 @@ func (m *Manager) Elected() <-chan struct{} { return e } +// GetCache returns the cache. +func (m *Manager) GetCache() cache.Cache { return m.Cache } + // GetClient returns the client. func (m *Manager) GetClient() client.Client { return m.Client }