Skip to content

Commit

Permalink
Make max concurrent reconciles configurable and increase the default …
Browse files Browse the repository at this point in the history
…value (#19)

* Make max concurrent reconciles configurable and increase the default value

* Drop unit tests for AddToManager

* Address review comments

* Address review comments (2)
  • Loading branch information
ialidzhikov authored Apr 5, 2024
1 parent a59ec55 commit 67b46de
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 142 deletions.
29 changes: 29 additions & 0 deletions pkg/input/cli_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type CLIOptions struct {
ScrapePeriod time.Duration
ScrapeFlowControlPeriod time.Duration
MinSampleGap time.Duration

// PodController contains Pod controller options.
PodController *ControllerOptions
// SecretController contains Secret controller options.
SecretController *ControllerOptions
}

// NewCLIOptions creates a CLIOptions object with default values
Expand All @@ -33,6 +38,12 @@ func NewCLIOptions() *CLIOptions {
ScrapePeriod: 60 * time.Second,
ScrapeFlowControlPeriod: 200 * time.Millisecond,
MinSampleGap: 10 * time.Second,
PodController: &ControllerOptions{
MaxConcurrentReconciles: 10,
},
SecretController: &ControllerOptions{
MaxConcurrentReconciles: 10,
},
}
}

Expand All @@ -57,15 +68,28 @@ func (options *CLIOptions) AddFlags(flags *pflag.FlagSet) {
fmt.Sprintf(
"If the last two metrics samples are closer in time than this, don't use them to calculate rate. Default: %d",
options.MinSampleGap))

options.PodController.AddFlags(flags, "pod-")
options.SecretController.AddFlags(flags, "secret-")
}

// Complete implements [github.com/gardener/gardener/extensions/pkg/controller/cmd.Completer.Complete].
func (options *CLIOptions) Complete() error {
if err := options.PodController.Complete(); err != nil {
return fmt.Errorf("failed to complete pod controller options: %w", err)
}
if err := options.SecretController.Complete(); err != nil {
return fmt.Errorf("failed to complete secret controller options: %w", err)
}

options.config = &CLIConfig{
ScrapePeriod: options.ScrapePeriod,
ScrapeFlowControlPeriod: options.ScrapeFlowControlPeriod,
MinSampleGap: options.MinSampleGap,
PodController: options.PodController.Completed(),
SecretController: options.SecretController.Completed(),
}

return nil
}

Expand All @@ -84,4 +108,9 @@ type CLIConfig struct {
// differential (rate) calculation accuracy, and are not used as a pair (each may still be used, paired with other
// samples).
MinSampleGap time.Duration

// PodController contains Pod controller configuration.
PodController *ControllerConfig
// SecretController contains Secret controller configuration.
SecretController *ControllerConfig
}
28 changes: 4 additions & 24 deletions pkg/input/controller/pod/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
package pod

import (
"time"

"github.com/go-logr/logr"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
kmgr "sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -21,37 +17,21 @@ import (
scrape_target_registry "github.com/gardener/gardener-custom-metrics/pkg/input/input_data_registry"
)

// AddToManagerWithOptions adds a new pod controller to the specified manager.
// AddToManager adds a new pod controller to the specified manager.
// dataRegistry is a concurrency-safe data repository where the controller finds data it needs, and stores
// the data it produces.
func AddToManagerWithOptions(
func AddToManager(
manager kmgr.Manager,
dataRegistry scrape_target_registry.InputDataRegistry,
controllerOptions *controller.Options,
controllerOptions controller.Options,
client client.Client,
log logr.Logger) error {

return gcmctl.NewControllerFactory().AddNewControllerToManager(manager, gcmctl.AddArgs{
Actuator: NewActuator(client, dataRegistry, log.WithName("pod-controller")),
ControllerName: app.Name + "-pod-controller",
ControllerOptions: *controllerOptions,
ControllerOptions: controllerOptions,
ControlledObjectType: &corev1.Pod{},
Predicates: []predicate.Predicate{NewPredicate(log)},
})
}

// AddToManager adds a new pod controller to the specified manager, using default option values.
func AddToManager(manager kmgr.Manager, dataRegistry scrape_target_registry.InputDataRegistry, log logr.Logger) error {
return AddToManagerWithOptions(
manager,
dataRegistry,
&controller.Options{
RateLimiter: workqueue.NewMaxOfRateLimiter(
// Sacrifice some of the responsiveness provided by the default 5ms initial retry rate, to reduce waste
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 10*time.Minute),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
},
nil,
log)
}
28 changes: 4 additions & 24 deletions pkg/input/controller/secret/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
package secret

import (
"time"

"github.com/go-logr/logr"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
kmgr "sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -21,37 +17,21 @@ import (
scrape_target_registry "github.com/gardener/gardener-custom-metrics/pkg/input/input_data_registry"
)

// AddToManagerWithOptions adds a new secret controller to the specified manager.
// AddToManager adds a new secret controller to the specified manager.
// dataRegistry is a concurrency-safe data repository where the controller finds data it needs, and stores
// the data it produces.
func AddToManagerWithOptions(
func AddToManager(
manager kmgr.Manager,
dataRegistry scrape_target_registry.InputDataRegistry,
controllerOptions *controller.Options,
controllerOptions controller.Options,
client client.Client,
log logr.Logger) error {

return gcmctl.NewControllerFactory().AddNewControllerToManager(manager, gcmctl.AddArgs{
Actuator: NewActuator(client, dataRegistry, log.WithName("secret-controller")),
ControllerName: app.Name + "-secret-controller",
ControllerOptions: *controllerOptions,
ControllerOptions: controllerOptions,
ControlledObjectType: &corev1.Secret{},
Predicates: []predicate.Predicate{NewPredicate(log)},
})
}

// AddToManager adds a new secret controller to the specified manager, using default option values.
func AddToManager(manager kmgr.Manager, dataRegistry scrape_target_registry.InputDataRegistry, log logr.Logger) error {
return AddToManagerWithOptions(
manager,
dataRegistry,
&controller.Options{
RateLimiter: workqueue.NewMaxOfRateLimiter(
// Sacrifice some of the responsiveness provided by the default 5ms initial retry rate, to reduce waste
workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 10*time.Minute),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
},
nil,
log)
}
47 changes: 47 additions & 0 deletions pkg/input/controller_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0

package input

import (
"github.com/spf13/pflag"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

// ControllerOptions are command line options that can be set for controller.Options.
type ControllerOptions struct {
// MaxConcurrentReconciles are the maximum concurrent reconciles.
MaxConcurrentReconciles int

config *ControllerConfig
}

// AddFlags implements Flagger.AddFlags.
func (c *ControllerOptions) AddFlags(fs *pflag.FlagSet, prefix string) {
fs.IntVar(&c.MaxConcurrentReconciles, prefix+"max-concurrent-reconciles", c.MaxConcurrentReconciles, "The maximum number of concurrent reconciliations.")
}

// Complete implements Completer.Complete.
func (c *ControllerOptions) Complete() error {
c.config = &ControllerConfig{
MaxConcurrentReconciles: c.MaxConcurrentReconciles,
}
return nil
}

// Completed returns the completed ControllerConfig. Only call this if `Complete` was successful.
func (c *ControllerOptions) Completed() *ControllerConfig {
return c.config
}

// ControllerConfig is a completed controller configuration.
type ControllerConfig struct {
// MaxConcurrentReconciles is the maximum number of concurrent reconciles.
MaxConcurrentReconciles int
}

// Apply sets the values of this ControllerConfig in the given AddOptions.
func (c *ControllerConfig) Apply(opts *controller.Options) {
opts.MaxConcurrentReconciles = c.MaxConcurrentReconciles
}
24 changes: 22 additions & 2 deletions pkg/input/input_data_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"time"

"github.com/go-logr/logr"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/controller"
kmgr "sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/gardener/gardener-custom-metrics/pkg/app"
Expand Down Expand Up @@ -95,10 +98,27 @@ func (ids *inputDataService) AddToManager(manager kmgr.Manager) error {
}

ids.log.V(app.VerbosityVerbose).Info("Adding controllers to manager")
if err := podctl.AddToManager(manager, ids.inputDataRegistry, ids.log.V(1)); err != nil {
podControllerOptions := controller.Options{
RateLimiter: workqueue.NewMaxOfRateLimiter(
// Sacrifice some of the responsiveness provided by the default 5ms initial retry rate, to reduce waste
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 10*time.Minute),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
}
ids.config.PodController.Apply(&podControllerOptions)
if err := podctl.AddToManager(manager, ids.inputDataRegistry, podControllerOptions, nil, ids.log.V(1)); err != nil {
return fmt.Errorf("add pod controller to manager: %w", err)
}
if err := secretctl.AddToManager(manager, ids.inputDataRegistry, ids.log.V(1)); err != nil {

secretControllerOptions := controller.Options{
RateLimiter: workqueue.NewMaxOfRateLimiter(
// Sacrifice some of the responsiveness provided by the default 5ms initial retry rate, to reduce waste
workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 10*time.Minute),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
}
ids.config.SecretController.Apply(&secretControllerOptions)
if err := secretctl.AddToManager(manager, ids.inputDataRegistry, secretControllerOptions, nil, ids.log.V(1)); err != nil {
return fmt.Errorf("add secret controller to manager: %w", err)
}

Expand Down
92 changes: 0 additions & 92 deletions pkg/input/input_data_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@ import (
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/gardener/gardener-custom-metrics/pkg/input/input_data_registry"
"github.com/gardener/gardener-custom-metrics/pkg/input/metrics_scraper"
"github.com/gardener/gardener-custom-metrics/pkg/util/testutil"
)

var _ = Describe("input.inputDataService", func() {
Expand Down Expand Up @@ -74,91 +69,4 @@ var _ = Describe("input.inputDataService", func() {
Expect(kapis[0].PodName()).To(Equal("pod"))
})
})

Describe("AddToManager", func() {
It("should add the scraper, pod controller, and secret controller to the manager", func() {
// Arrange
ids, _ := newInputDataService()
fm := testutil.NewFakeManager()

// Act
result := ids.AddToManager(fm)

// Assert
Expect(result).Should(Succeed())
Expect(testutil.GetRunnables[*metrics_scraper.Scraper](fm)).To(HaveLen(1))
Expect(testutil.GetRunnables[controller.Controller](fm)).To(HaveLen(2))
})

It("should add apimachinery runtime types scheme to the manager", func() {
// Arrange
ids, _ := newInputDataService()
fm := testutil.NewFakeManager()

// Act
result := ids.AddToManager(fm)

// Assert
Expect(result).To(Succeed())
runtimeScheme := runtime.NewScheme()
builder := runtime.NewSchemeBuilder(scheme.AddToScheme)
builder.AddToScheme(runtimeScheme)
for gvk := range runtimeScheme.AllKnownTypes() {
Expect(fm.Scheme.Recognizes(gvk)).To(BeTrue())
}
})

It("should create a new data registry and pass it to the scraper", func() {
// Arrange
ids, _ := newInputDataService()
fm := testutil.NewFakeManager()
var registryPassedToScraperConstructor input_data_registry.InputDataRegistry
ids.testIsolation.NewScraper = func(
dataRegistry input_data_registry.InputDataRegistry,
_ time.Duration,
_ time.Duration,
_ logr.Logger) *metrics_scraper.Scraper {

registryPassedToScraperConstructor = dataRegistry

return nil
}

// Act
result := ids.AddToManager(fm)

// Assert
Expect(result).To(Succeed())
Expect(registryPassedToScraperConstructor).NotTo(BeNil())
Expect(registryPassedToScraperConstructor == ids.inputDataRegistry).To(BeTrue())
})

It("should configure the scraper with the specified scrape period and flow control period", func() {
// Arrange
ids, _ := newInputDataService()
fm := testutil.NewFakeManager()
var scrapePeriodPassedToScraperConstructor time.Duration
var scrapeFlowControlPeriodPassedToScraperConstructor time.Duration

ids.testIsolation.NewScraper = func(
_ input_data_registry.InputDataRegistry,
scrapePeriod time.Duration,
scrapeFlowControlPeriod time.Duration,
_ logr.Logger) *metrics_scraper.Scraper {

scrapePeriodPassedToScraperConstructor = scrapePeriod
scrapeFlowControlPeriodPassedToScraperConstructor = scrapeFlowControlPeriod

return nil
}

// Act
result := ids.AddToManager(fm)

// Assert
Expect(result).To(Succeed())
Expect(scrapePeriodPassedToScraperConstructor).To(Equal(testScrapePeriod))
Expect(scrapeFlowControlPeriodPassedToScraperConstructor).To(Equal(testScrapeFlowControlPeriod))
})
})
})

0 comments on commit 67b46de

Please sign in to comment.