Skip to content

Commit

Permalink
Merge pull request #65 from InditexTech/add-missing-controller-options
Browse files Browse the repository at this point in the history
Add support for poll interval and max-reconcile-rate flags
  • Loading branch information
malodie authored May 8, 2024
2 parents e4356a4 + 783ed65 commit 6bfe34e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 49 deletions.
38 changes: 31 additions & 7 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@ package main
import (
"os"
"path/filepath"
"time"

"gopkg.in/alecthomas/kingpin.v2"
"k8s.io/client-go/tools/leaderelection/resourcelock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/feature"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"

"github.com/crossplane-contrib/provider-kafka/apis"
"github.com/crossplane-contrib/provider-kafka/internal/controller"
kafkacontroller "github.com/crossplane-contrib/provider-kafka/internal/controller"
)

func main() {
var (
app = kingpin.New(filepath.Base(os.Args[0]), "Kafka support for Crossplane.").DefaultEnvars()
debug = app.Flag("debug", "Run with debug logging.").Short('d').Bool()
syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration()
leaderElection = app.Flag("leader-election", "Use leader election for the controller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool()

syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration()
pollInterval = app.Flag("poll", "How often individual resources will be checked for drift from the desired state").Default("1m").Duration()
maxReconcileRate = app.Flag("max-reconcile-rate", "The global maximum rate per second at which resources may checked for drift from the desired state.").Default("10").Int()
)
kingpin.MustParse(app.Parse(os.Args[1:]))

Expand All @@ -49,21 +57,37 @@ func main() {
ctrl.SetLogger(zl)
}

log.Debug("Starting", "sync-period", syncPeriod.String())
log.Debug(
"Starting",
"sync-period", syncPeriod.String(),
"poll-interval", pollInterval.String(),
"max-reconcile-rate", maxReconcileRate,
)

cfg, err := ctrl.GetConfig()
kingpin.FatalIfError(err, "Cannot get API server rest config")

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
LeaderElection: *leaderElection,
LeaderElectionID: "crossplane-leader-election-provider-kafka",
LeaderElection: *leaderElection,
LeaderElectionID: "crossplane-leader-election-provider-kafka",
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: func() *time.Duration { d := 60 * time.Second; return &d }(),
RenewDeadline: func() *time.Duration { d := 50 * time.Second; return &d }(),
Cache: cache.Options{
SyncPeriod: syncPeriod,
},
})
kingpin.FatalIfError(err, "Cannot create controller manager")

kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add Kafka APIs to scheme")
kingpin.FatalIfError(controller.Setup(mgr, log), "Cannot setup Kafka controllers")

o := controller.Options{
Logger: log,
MaxConcurrentReconciles: *maxReconcileRate,
PollInterval: *pollInterval,
GlobalRateLimiter: ratelimiter.NewGlobal(*maxReconcileRate),
Features: &feature.Flags{},
}

kingpin.FatalIfError(kafkacontroller.Setup(mgr, o), "Cannot setup Kafka controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
gopkg.in/alecthomas/kingpin.v2 v2.2.6
k8s.io/api v0.28.3
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
sigs.k8s.io/controller-runtime v0.16.3
sigs.k8s.io/controller-tools v0.13.0
)
Expand Down Expand Up @@ -83,7 +84,6 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.3 // indirect
k8s.io/client-go v0.28.3 // indirect
k8s.io/component-base v0.28.3 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
Expand Down
22 changes: 8 additions & 14 deletions internal/controller/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@ import (
"strings"

"github.com/crossplane-contrib/provider-kafka/internal/clients/kafka"

v1 "github.com/crossplane/crossplane-runtime/apis/common/v1"

"github.com/crossplane-contrib/provider-kafka/internal/clients/kafka/acl"

v1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
"github.com/twmb/franz-go/pkg/kadm"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"

Expand All @@ -56,29 +54,25 @@ const (
)

// Setup adds a controller that reconciles AccessControlList managed resources.
func Setup(mgr ctrl.Manager, l logging.Logger) error {
func Setup(mgr ctrl.Manager, o controller.Options) error {
name := managed.ControllerName(v1alpha1.AccessControlListGroupKind)

o := controller.Options{
RateLimiter: ratelimiter.NewController(),
}

r := managed.NewReconciler(mgr,
resource.ManagedKind(v1alpha1.AccessControlListGroupVersionKind),
managed.WithExternalConnectDisconnecter(&connectDisconnector{
kube: mgr.GetClient(),
usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}),
log: l,
newServiceFn: kafka.NewAdminClient}),
managed.WithLogger(l.WithValues("controller", name)),
managed.WithLogger(o.Logger.WithValues("controller", name)),
managed.WithPollInterval(o.PollInterval),
managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))),
managed.WithInitializers())

return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(o).
WithOptions(o.ForControllerRuntime()).
For(&v1alpha1.AccessControlList{}).
Complete(r)
Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter))
}

// A connectDisconnector is expected to produce an ExternalClient when its Connect method
Expand Down
20 changes: 8 additions & 12 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package config

import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/providerconfig"
"github.com/crossplane/crossplane-runtime/pkg/resource"

Expand All @@ -31,24 +29,22 @@ import (

// Setup adds a controller that reconciles ProviderConfigs by accounting for
// their current usage.
func Setup(mgr ctrl.Manager, l logging.Logger) error {
func Setup(mgr ctrl.Manager, o controller.Options) error {
name := providerconfig.ControllerName(v1alpha1.ProviderConfigGroupKind)

o := controller.Options{
RateLimiter: ratelimiter.NewController(),
}

of := resource.ProviderConfigKinds{
Config: v1alpha1.ProviderConfigGroupVersionKind,
UsageList: v1alpha1.ProviderConfigUsageListGroupVersionKind,
}

r := providerconfig.NewReconciler(mgr, of,
providerconfig.WithLogger(o.Logger.WithValues("controller", name)),
providerconfig.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))))

return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(o).
WithOptions(o.ForControllerRuntime()).
For(&v1alpha1.ProviderConfig{}).
Watches(&v1alpha1.ProviderConfigUsage{}, &resource.EnqueueRequestForProviderConfig{}).
Complete(providerconfig.NewReconciler(mgr, of,
providerconfig.WithLogger(l.WithValues("controller", name)),
providerconfig.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name)))))
Complete(r)
}
9 changes: 4 additions & 5 deletions internal/controller/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@ limitations under the License.
package controller

import (
"github.com/crossplane/crossplane-runtime/pkg/controller"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/crossplane/crossplane-runtime/pkg/logging"

"github.com/crossplane-contrib/provider-kafka/internal/controller/acl"
"github.com/crossplane-contrib/provider-kafka/internal/controller/config"
"github.com/crossplane-contrib/provider-kafka/internal/controller/topic"
)

// Setup creates all Template controllers with the supplied logger and adds them to
// the supplied manager.
func Setup(mgr ctrl.Manager, l logging.Logger) error {
for _, setup := range []func(ctrl.Manager, logging.Logger) error{
func Setup(mgr ctrl.Manager, o controller.Options) error {
for _, setup := range []func(ctrl.Manager, controller.Options) error{
config.Setup,
topic.Setup,
acl.Setup,
} {
if err := setup(mgr, l); err != nil {
if err := setup(mgr, o); err != nil {
return err
}
}
Expand Down
16 changes: 6 additions & 10 deletions internal/controller/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

v1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/meta"
Expand All @@ -32,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/crossplane-contrib/provider-kafka/apis/topic/v1alpha1"
apisv1alpha1 "github.com/crossplane-contrib/provider-kafka/apis/v1alpha1"
Expand All @@ -51,28 +51,24 @@ const (
)

// Setup adds a controller that reconciles Topic managed resources.
func Setup(mgr ctrl.Manager, l logging.Logger) error {
func Setup(mgr ctrl.Manager, o controller.Options) error {
name := managed.ControllerName(v1alpha1.TopicGroupKind)

o := controller.Options{
RateLimiter: ratelimiter.NewController(),
}

r := managed.NewReconciler(mgr,
resource.ManagedKind(v1alpha1.TopicGroupVersionKind),
managed.WithExternalConnectDisconnecter(&connectDisconnector{
kube: mgr.GetClient(),
usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}),
log: l,
newServiceFn: kafka.NewAdminClient}),
managed.WithLogger(l.WithValues("controller", name)),
managed.WithLogger(o.Logger.WithValues("controller", name)),
managed.WithPollInterval(o.PollInterval),
managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))))

return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(o).
WithOptions(o.ForControllerRuntime()).
For(&v1alpha1.Topic{}).
Complete(r)
Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter))
}

// A connectDisconnector is expected to produce an ExternalClient when its Connect method
Expand Down

0 comments on commit 6bfe34e

Please sign in to comment.