diff --git a/cmd/provider/main.go b/cmd/provider/main.go index 3995858..e81eb5f 100644 --- a/cmd/provider/main.go +++ b/cmd/provider/main.go @@ -19,24 +19,32 @@ package main import ( "os" "path/filepath" + "time" "gopkg.in/alecthomas/kingpin.v2" + "k8s.io/client-go/tools/leaderelection/resourcelock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/crossplane/crossplane-runtime/pkg/controller" + "github.com/crossplane/crossplane-runtime/pkg/feature" "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane-contrib/provider-kafka/apis" - "github.com/crossplane-contrib/provider-kafka/internal/controller" + kafkacontroller "github.com/crossplane-contrib/provider-kafka/internal/controller" ) func main() { var ( app = kingpin.New(filepath.Base(os.Args[0]), "Kafka support for Crossplane.").DefaultEnvars() debug = app.Flag("debug", "Run with debug logging.").Short('d').Bool() - syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration() leaderElection = app.Flag("leader-election", "Use leader election for the controller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool() + + syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration() + pollInterval = app.Flag("poll", "How often individual resources will be checked for drift from the desired state").Default("1m").Duration() + maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int() ) kingpin.MustParse(app.Parse(os.Args[1:])) @@ -49,21 +57,37 @@ func main() { ctrl.SetLogger(zl) } - log.Debug("Starting", "sync-period", syncPeriod.String()) + log.Debug( + "Starting", + "sync-period", syncPeriod.String(), + "poll-interval", pollInterval.String(), + "max-reconcile-rate", maxReconcileRate, + ) cfg, err := ctrl.GetConfig() kingpin.FatalIfError(err, "Cannot get API server rest config") mgr, err := ctrl.NewManager(cfg, ctrl.Options{ - LeaderElection: *leaderElection, - LeaderElectionID: "crossplane-leader-election-provider-kafka", + LeaderElection: *leaderElection, + LeaderElectionID: "crossplane-leader-election-provider-kafka", + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaseDuration: func() *time.Duration { d := 60 * time.Second; return &d }(), + RenewDeadline: func() *time.Duration { d := 50 * time.Second; return &d }(), Cache: cache.Options{ SyncPeriod: syncPeriod, }, }) kingpin.FatalIfError(err, "Cannot create controller manager") - kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add Kafka APIs to scheme") - kingpin.FatalIfError(controller.Setup(mgr, log), "Cannot setup Kafka controllers") + + o := controller.Options{ + Logger: log, + MaxConcurrentReconciles: *maxReconcileRate, + PollInterval: *pollInterval, + GlobalRateLimiter: ratelimiter.NewGlobal(*maxReconcileRate), + Features: &feature.Flags{}, + } + + kingpin.FatalIfError(kafkacontroller.Setup(mgr, o), "Cannot setup Kafka controllers") kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager") } diff --git a/go.mod b/go.mod index 3e54574..4f67037 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( gopkg.in/alecthomas/kingpin.v2 v2.2.6 k8s.io/api v0.28.3 k8s.io/apimachinery v0.28.3 + k8s.io/client-go v0.28.3 sigs.k8s.io/controller-runtime v0.16.3 sigs.k8s.io/controller-tools v0.13.0 ) @@ -83,7 +84,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.28.3 // indirect - k8s.io/client-go v0.28.3 // indirect k8s.io/component-base v0.28.3 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect diff --git a/internal/controller/acl/acl.go b/internal/controller/acl/acl.go index baa7fdb..6dec455 100644 --- a/internal/controller/acl/acl.go +++ b/internal/controller/acl/acl.go @@ -21,23 +21,21 @@ import ( "strings" "github.com/crossplane-contrib/provider-kafka/internal/clients/kafka" - - v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - "github.com/crossplane-contrib/provider-kafka/internal/clients/kafka/acl" + v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/meta" + "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/twmb/franz-go/pkg/kadm" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" - "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -56,29 +54,25 @@ const ( ) // Setup adds a controller that reconciles AccessControlList managed resources. -func Setup(mgr ctrl.Manager, l logging.Logger) error { +func Setup(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(v1alpha1.AccessControlListGroupKind) - o := controller.Options{ - RateLimiter: ratelimiter.NewController(), - } - r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.AccessControlListGroupVersionKind), managed.WithExternalConnectDisconnecter(&connectDisconnector{ kube: mgr.GetClient(), usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), - log: l, newServiceFn: kafka.NewAdminClient}), - managed.WithLogger(l.WithValues("controller", name)), + managed.WithLogger(o.Logger.WithValues("controller", name)), + managed.WithPollInterval(o.PollInterval), managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), managed.WithInitializers()) return ctrl.NewControllerManagedBy(mgr). Named(name). - WithOptions(o). + WithOptions(o.ForControllerRuntime()). For(&v1alpha1.AccessControlList{}). - Complete(r) + Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter)) } // A connectDisconnector is expected to produce an ExternalClient when its Connect method diff --git a/internal/controller/config/config.go b/internal/controller/config/config.go index 0128b90..5366c1f 100644 --- a/internal/controller/config/config.go +++ b/internal/controller/config/config.go @@ -18,11 +18,9 @@ package config import ( ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/controller" + "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" - "github.com/crossplane/crossplane-runtime/pkg/logging" - "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/reconciler/providerconfig" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -31,24 +29,22 @@ import ( // Setup adds a controller that reconciles ProviderConfigs by accounting for // their current usage. -func Setup(mgr ctrl.Manager, l logging.Logger) error { +func Setup(mgr ctrl.Manager, o controller.Options) error { name := providerconfig.ControllerName(v1alpha1.ProviderConfigGroupKind) - o := controller.Options{ - RateLimiter: ratelimiter.NewController(), - } - of := resource.ProviderConfigKinds{ Config: v1alpha1.ProviderConfigGroupVersionKind, UsageList: v1alpha1.ProviderConfigUsageListGroupVersionKind, } + r := providerconfig.NewReconciler(mgr, of, + providerconfig.WithLogger(o.Logger.WithValues("controller", name)), + providerconfig.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) + return ctrl.NewControllerManagedBy(mgr). Named(name). - WithOptions(o). + WithOptions(o.ForControllerRuntime()). For(&v1alpha1.ProviderConfig{}). Watches(&v1alpha1.ProviderConfigUsage{}, &resource.EnqueueRequestForProviderConfig{}). - Complete(providerconfig.NewReconciler(mgr, of, - providerconfig.WithLogger(l.WithValues("controller", name)), - providerconfig.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))))) + Complete(r) } diff --git a/internal/controller/kafka.go b/internal/controller/kafka.go index 70e3610..45db401 100644 --- a/internal/controller/kafka.go +++ b/internal/controller/kafka.go @@ -17,10 +17,9 @@ limitations under the License. package controller import ( + "github.com/crossplane/crossplane-runtime/pkg/controller" ctrl "sigs.k8s.io/controller-runtime" - "github.com/crossplane/crossplane-runtime/pkg/logging" - "github.com/crossplane-contrib/provider-kafka/internal/controller/acl" "github.com/crossplane-contrib/provider-kafka/internal/controller/config" "github.com/crossplane-contrib/provider-kafka/internal/controller/topic" @@ -28,13 +27,13 @@ import ( // Setup creates all Template controllers with the supplied logger and adds them to // the supplied manager. -func Setup(mgr ctrl.Manager, l logging.Logger) error { - for _, setup := range []func(ctrl.Manager, logging.Logger) error{ +func Setup(mgr ctrl.Manager, o controller.Options) error { + for _, setup := range []func(ctrl.Manager, controller.Options) error{ config.Setup, topic.Setup, acl.Setup, } { - if err := setup(mgr, l); err != nil { + if err := setup(mgr, o); err != nil { return err } } diff --git a/internal/controller/topic/topic.go b/internal/controller/topic/topic.go index eaa4a10..d2aa7ac 100644 --- a/internal/controller/topic/topic.go +++ b/internal/controller/topic/topic.go @@ -21,6 +21,7 @@ import ( "strings" v1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/controller" "github.com/crossplane/crossplane-runtime/pkg/event" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" @@ -32,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "github.com/crossplane-contrib/provider-kafka/apis/topic/v1alpha1" apisv1alpha1 "github.com/crossplane-contrib/provider-kafka/apis/v1alpha1" @@ -51,28 +51,24 @@ const ( ) // Setup adds a controller that reconciles Topic managed resources. -func Setup(mgr ctrl.Manager, l logging.Logger) error { +func Setup(mgr ctrl.Manager, o controller.Options) error { name := managed.ControllerName(v1alpha1.TopicGroupKind) - o := controller.Options{ - RateLimiter: ratelimiter.NewController(), - } - r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.TopicGroupVersionKind), managed.WithExternalConnectDisconnecter(&connectDisconnector{ kube: mgr.GetClient(), usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), - log: l, newServiceFn: kafka.NewAdminClient}), - managed.WithLogger(l.WithValues("controller", name)), + managed.WithLogger(o.Logger.WithValues("controller", name)), + managed.WithPollInterval(o.PollInterval), managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))) return ctrl.NewControllerManagedBy(mgr). Named(name). - WithOptions(o). + WithOptions(o.ForControllerRuntime()). For(&v1alpha1.Topic{}). - Complete(r) + Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter)) } // A connectDisconnector is expected to produce an ExternalClient when its Connect method