From c0324c282a20d84f456fc4e7086cb1d253c46e52 Mon Sep 17 00:00:00 2001 From: mariomalinditex Date: Fri, 26 Jan 2024 08:28:06 +0100 Subject: [PATCH 1/3] feat: add support for poll interval and max-reconcile-rate flags Signed-off-by: mariomalinditex --- cmd/provider/main.go | 50 +++++++++++++++++++++++----- internal/controller/acl/acl.go | 24 ++++++------- internal/controller/config/config.go | 19 +++++------ internal/controller/kafka.go | 9 +++-- internal/controller/topic/topic.go | 21 ++++++------ 5 files changed, 73 insertions(+), 50 deletions(-) diff --git a/cmd/provider/main.go b/cmd/provider/main.go index c6b5895..bb8e2ad 100644 --- a/cmd/provider/main.go +++ b/cmd/provider/main.go @@ -19,23 +19,31 @@ package main import ( "os" "path/filepath" + "time" "gopkg.in/alecthomas/kingpin.v2" + "k8s.io/client-go/tools/leaderelection/resourcelock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/crossplane/crossplane-runtime/pkg/controller" + "github.com/crossplane/crossplane-runtime/pkg/feature" "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane-contrib/provider-kafka/apis" - "github.com/crossplane-contrib/provider-kafka/internal/controller" + kafkacontroller "github.com/crossplane-contrib/provider-kafka/internal/controller" ) func main() { var ( app = kingpin.New(filepath.Base(os.Args[0]), "Kafka support for Crossplane.").DefaultEnvars() debug = app.Flag("debug", "Run with debug logging.").Short('d').Bool() - syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration() leaderElection = app.Flag("leader-election", "Use leader election for the controller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool() + + syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration() + pollInterval = app.Flag("poll", "How often individual resources will be checked for drift from the desired state").Default("1m").Duration() + maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int() ) kingpin.MustParse(app.Parse(os.Args[1:])) @@ -48,19 +56,43 @@ func main() { ctrl.SetLogger(zl) } - log.Debug("Starting", "sync-period", syncPeriod.String()) + log.Debug( + "Starting", + "sync-period", syncPeriod.String(), + "poll-interval", pollInterval.String(), + "max-reconcile-rate", maxReconcileRate, + ) cfg, err := ctrl.GetConfig() kingpin.FatalIfError(err, "Cannot get API server rest config") - mgr, err := ctrl.NewManager(cfg, ctrl.Options{ - LeaderElection: *leaderElection, - LeaderElectionID: "crossplane-leader-election-provider-kafka", - SyncPeriod: syncPeriod, + mgr, err := ctrl.NewManager(ratelimiter.LimitRESTConfig(cfg, *maxReconcileRate), ctrl.Options{ + SyncPeriod: syncPeriod, + + // controller-runtime uses both ConfigMaps and Leases for leader + // election by default. Leases expire after 15 seconds, with a + // 10 second renewal deadline. We've observed leader loss due to + // renewal deadlines being exceeded when under high load - i.e. + // hundreds of reconciles per second and ~200rps to the API + // server. Switching to Leases only and longer leases appears to + // alleviate this. + LeaderElection: *leaderElection, + LeaderElectionID: "crossplane-leader-election-provider-kafka", + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaseDuration: func() *time.Duration { d := 60 * time.Second; return &d }(), + RenewDeadline: func() *time.Duration { d := 50 * time.Second; return &d }(), }) kingpin.FatalIfError(err, "Cannot create controller manager") - kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add Kafka APIs to scheme") - kingpin.FatalIfError(controller.Setup(mgr, log), "Cannot setup Kafka controllers") + + o := controller.Options{ + Logger: log, + MaxConcurrentReconciles: *maxReconcileRate, + PollInterval: *pollInterval, + GlobalRateLimiter: ratelimiter.NewGlobal(*maxReconcileRate), + Features: &feature.Flags{}, + } + + kingpin.FatalIfError(kafkacontroller.Setup(mgr, o), "Cannot setup Kafka controllers") kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager") } diff --git a/internal/controller/acl/acl.go b/internal/controller/acl/acl.go index 59382ee..0432778 100644 --- a/internal/controller/acl/acl.go +++ b/internal/controller/acl/acl.go @@ -21,23 +21,21 @@ import ( "strings" "github.com/crossplane-contrib/provider-kafka/internal/clients/kafka" - - v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - "github.com/crossplane-contrib/provider-kafka/internal/clients/kafka/acl" + v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/meta" + "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/twmb/franz-go/pkg/kadm" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" - "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -56,29 +54,27 @@ const ( ) // Setup adds a controller that reconciles AccessControlList managed resources. -func Setup(mgr ctrl.Manager, l logging.Logger) error { +func Setup(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(v1alpha1.AccessControlListGroupKind) - o := controller.Options{ - RateLimiter: ratelimiter.NewController(), - } + cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.AccessControlListGroupVersionKind), managed.WithExternalConnectDisconnecter(&connectDisconnector{ kube: mgr.GetClient(), usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), - log: l, newServiceFn: kafka.NewAdminClient}), - managed.WithLogger(l.WithValues("controller", name)), + managed.WithLogger(o.Logger.WithValues("controller", name)), + managed.WithPollInterval(o.PollInterval), managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), - managed.WithInitializers()) + managed.WithConnectionPublishers(cps...)) return ctrl.NewControllerManagedBy(mgr). Named(name). - WithOptions(o). + WithOptions(o.ForControllerRuntime()). For(&v1alpha1.AccessControlList{}). - Complete(r) + Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter)) } // A connectDisconnector is expected to produce an ExternalClient when its Connect method diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index 45d2264..d8d2084 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -18,11 +18,10 @@ package config import ( ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/source" + "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" - "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/reconciler/providerconfig" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -32,24 +31,22 @@ import ( // Setup adds a controller that reconciles ProviderConfigs by accounting for // their current usage. -func Setup(mgr ctrl.Manager, l logging.Logger) error { +func Setup(mgr ctrl.Manager, o controller.Options) error { name := providerconfig.ControllerName(v1alpha1.ProviderConfigGroupKind) - o := controller.Options{ - RateLimiter: ratelimiter.NewController(), - } - of := resource.ProviderConfigKinds{ Config: v1alpha1.ProviderConfigGroupVersionKind, UsageList: v1alpha1.ProviderConfigUsageListGroupVersionKind, } + r := providerconfig.NewReconciler(mgr, of, + providerconfig.WithLogger(o.Logger.WithValues("controller", name)), + providerconfig.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) + return ctrl.NewControllerManagedBy(mgr). Named(name). - WithOptions(o). + WithOptions(o.ForControllerRuntime()). For(&v1alpha1.ProviderConfig{}). Watches(&source.Kind{Type: &v1alpha1.ProviderConfigUsage{}}, &resource.EnqueueRequestForProviderConfig{}). - Complete(providerconfig.NewReconciler(mgr, of, - providerconfig.WithLogger(l.WithValues("controller", name)), - providerconfig.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))))) + Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter)) } diff --git a/internal/controller/kafka.go b/internal/controller/kafka.go index 70e3610..45db401 100644 --- a/internal/controller/kafka.go +++ b/internal/controller/kafka.go @@ -17,10 +17,9 @@ limitations under the License. package controller import ( + "github.com/crossplane/crossplane-runtime/pkg/controller" ctrl "sigs.k8s.io/controller-runtime" - "github.com/crossplane/crossplane-runtime/pkg/logging" - "github.com/crossplane-contrib/provider-kafka/internal/controller/acl" "github.com/crossplane-contrib/provider-kafka/internal/controller/config" "github.com/crossplane-contrib/provider-kafka/internal/controller/topic" @@ -28,13 +27,13 @@ import ( // Setup creates all Template controllers with the supplied logger and adds them to // the supplied manager. -func Setup(mgr ctrl.Manager, l logging.Logger) error { - for _, setup := range []func(ctrl.Manager, logging.Logger) error{ +func Setup(mgr ctrl.Manager, o controller.Options) error { + for _, setup := range []func(ctrl.Manager, controller.Options) error{ config.Setup, topic.Setup, acl.Setup, } { - if err := setup(mgr, l); err != nil { + if err := setup(mgr, o); err != nil { return err } } diff --git a/internal/controller/topic/topic.go b/internal/controller/topic/topic.go index eaa4a10..6f40212 100644 --- a/internal/controller/topic/topic.go +++ b/internal/controller/topic/topic.go @@ -21,6 +21,7 @@ import ( "strings" v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" @@ -32,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/crossplane-contrib/provider-kafka/apis/topic/v1alpha1" apisv1alpha1 "github.com/crossplane-contrib/provider-kafka/apis/v1alpha1" @@ -51,28 +51,27 @@ const ( ) // Setup adds a controller that reconciles Topic managed resources. -func Setup(mgr ctrl.Manager, l logging.Logger) error { +func Setup(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(v1alpha1.TopicGroupKind) - o := controller.Options{ - RateLimiter: ratelimiter.NewController(), - } + cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.TopicGroupVersionKind), - managed.WithExternalConnectDisconnecter(&connectDisconnector{ + managed.WithExternalConnecter(&connectDisconnector{ kube: mgr.GetClient(), usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), - log: l, newServiceFn: kafka.NewAdminClient}), - managed.WithLogger(l.WithValues("controller", name)), - managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) + managed.WithLogger(o.Logger.WithValues("controller", name)), + managed.WithPollInterval(o.PollInterval), + managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), + managed.WithConnectionPublishers(cps...)) return ctrl.NewControllerManagedBy(mgr). Named(name). - WithOptions(o). + WithOptions(o.ForControllerRuntime()). For(&v1alpha1.Topic{}). - Complete(r) + Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter)) } // A connectDisconnector is expected to produce an ExternalClient when its Connect method From d9d8f2f510bd81c1a25c34d7664a072e19974a64 Mon Sep 17 00:00:00 2001 From: mariomalinditex Date: Fri, 26 Jan 2024 09:14:08 +0100 Subject: [PATCH 2/3] fix: keep setup of controllers with same logic as before migrate to crossplane-runtime Signed-off-by: mariomalinditex --- internal/controller/acl/acl.go | 4 +--- internal/controller/topic/topic.go | 7 ++----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/internal/controller/acl/acl.go b/internal/controller/acl/acl.go index 0432778..0048593 100644 --- a/internal/controller/acl/acl.go +++ b/internal/controller/acl/acl.go @@ -57,8 +57,6 @@ const ( func Setup(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(v1alpha1.AccessControlListGroupKind) - cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} - r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.AccessControlListGroupVersionKind), managed.WithExternalConnectDisconnecter(&connectDisconnector{ @@ -68,7 +66,7 @@ func Setup(mgr ctrl.Manager, o controller.Options) error { managed.WithLogger(o.Logger.WithValues("controller", name)), managed.WithPollInterval(o.PollInterval), managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), - managed.WithConnectionPublishers(cps...)) + managed.WithInitializers()) return ctrl.NewControllerManagedBy(mgr). Named(name). diff --git a/internal/controller/topic/topic.go b/internal/controller/topic/topic.go index 6f40212..d2aa7ac 100644 --- a/internal/controller/topic/topic.go +++ b/internal/controller/topic/topic.go @@ -54,18 +54,15 @@ const ( func Setup(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(v1alpha1.TopicGroupKind) - cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} - r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.TopicGroupVersionKind), - managed.WithExternalConnecter(&connectDisconnector{ + managed.WithExternalConnectDisconnecter(&connectDisconnector{ kube: mgr.GetClient(), usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), newServiceFn: kafka.NewAdminClient}), managed.WithLogger(o.Logger.WithValues("controller", name)), managed.WithPollInterval(o.PollInterval), - managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), - managed.WithConnectionPublishers(cps...)) + managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) return ctrl.NewControllerManagedBy(mgr). Named(name). From 0f469b319f5d63d65a593155f0204099d196cb12 Mon Sep 17 00:00:00 2001 From: mariomalinditex Date: Fri, 26 Jan 2024 09:16:24 +0100 Subject: [PATCH 3/3] build: update required project dependencies Signed-off-by: mariomalinditex --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 39201fe..b73dd9c 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( gopkg.in/alecthomas/kingpin.v2 v2.2.6 k8s.io/api v0.26.1 k8s.io/apimachinery v0.26.1 + k8s.io/client-go v0.26.1 sigs.k8s.io/controller-runtime v0.14.1 sigs.k8s.io/controller-tools v0.11.1 ) @@ -82,7 +83,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.26.1 // indirect - k8s.io/client-go v0.26.1 // indirect k8s.io/component-base v0.26.1 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect