Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Deferred queue for no-op TGB #3861

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ all: controller

# Run tests
test: generate fmt vet manifests helm-lint
go test -race ./pkg/... ./webhooks/... -coverprofile cover.out
go test -race ./pkg/... ./webhooks/... ./controllers/... -coverprofile cover.out

# Build controller binary
controller: generate fmt vet
Expand Down
80 changes: 44 additions & 36 deletions controllers/elbv2/targetgroupbinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package controllers
import (
"context"
"fmt"
discv1 "k8s.io/api/discovery/v1"
"sigs.k8s.io/controller-runtime/pkg/handler"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
discv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/controllers/elbv2/eventhandlers"
Expand All @@ -48,15 +49,16 @@ const (

// NewTargetGroupBindingReconciler constructs new targetGroupBindingReconciler
func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager,
tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig,
tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler,
logger logr.Logger) *targetGroupBindingReconciler {

return &targetGroupBindingReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
tgbResourceManager: tgbResourceManager,
logger: logger,
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
tgbResourceManager: tgbResourceManager,
deferredTargetGroupBindingReconciler: deferredTargetGroupBindingReconciler,
logger: logger,

maxConcurrentReconciles: config.TargetGroupBindingMaxConcurrentReconciles,
maxExponentialBackoffDelay: config.TargetGroupBindingMaxExponentialBackoffDelay,
Expand All @@ -66,11 +68,12 @@ func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder reco

// targetGroupBindingReconciler reconciles a TargetGroupBinding object
type targetGroupBindingReconciler struct {
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
tgbResourceManager targetgroupbinding.ResourceManager
logger logr.Logger
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
tgbResourceManager targetgroupbinding.ResourceManager
deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler
logger logr.Logger

maxConcurrentReconciles int
maxExponentialBackoffDelay time.Duration
Expand Down Expand Up @@ -110,10 +113,17 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
return err
}

if err := r.tgbResourceManager.Reconcile(ctx, tgb); err != nil {
deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)

if err != nil {
return err
}

if deferred {
r.deferredTargetGroupBindingReconciler.Enqueue(tgb)
return nil
}

if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil {
r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
Expand Down Expand Up @@ -141,11 +151,14 @@ func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx contex
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation {
return nil
}

tgbOld := tgb.DeepCopy()

tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation)
if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
return errors.Wrapf(err, "failed to update targetGroupBinding status: %v", k8s.NamespacedName(tgb))
}

return nil
}

Expand All @@ -159,34 +172,29 @@ func (r *targetGroupBindingReconciler) SetupWithManager(ctx context.Context, mgr
nodeEventsHandler := eventhandlers.NewEnqueueRequestsForNodeEvent(r.k8sClient,
r.logger.WithName("eventHandlers").WithName("node"))

// Use the config flag to decide whether to use and watch an Endpoints event handler or an EndpointSlices event handler
var eventHandler handler.EventHandler
var clientObj client.Object

if r.enableEndpointSlices {
epSliceEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient,
clientObj = &discv1.EndpointSlice{}
eventHandler = eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient,
r.logger.WithName("eventHandlers").WithName("endpointslices"))
return ctrl.NewControllerManagedBy(mgr).
For(&elbv2api.TargetGroupBinding{}).
Named(controllerName).
Watches(&corev1.Service{}, svcEventHandler).
Watches(&discv1.EndpointSlice{}, epSliceEventsHandler).
Watches(&corev1.Node{}, nodeEventsHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
Complete(r)
} else {
epsEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient,
clientObj = &corev1.Endpoints{}
eventHandler = eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient,
r.logger.WithName("eventHandlers").WithName("endpoints"))
return ctrl.NewControllerManagedBy(mgr).
For(&elbv2api.TargetGroupBinding{}).
Named(controllerName).
Watches(&corev1.Service{}, svcEventHandler).
Watches(&corev1.Endpoints{}, epsEventsHandler).
Watches(&corev1.Node{}, nodeEventsHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
Complete(r)
}

return ctrl.NewControllerManagedBy(mgr).
For(&elbv2api.TargetGroupBinding{}).
Named(controllerName).
Watches(&corev1.Service{}, svcEventHandler).
Watches(clientObj, eventHandler).
Watches(&corev1.Node{}, nodeEventsHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
Complete(r)
}

func (r *targetGroupBindingReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
Expand Down
130 changes: 130 additions & 0 deletions controllers/elbv2/targetgroupbinding_deferred_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package controllers

import (
"context"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"math/rand"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

const (
// The time to delay the reconcile. Generally, this number should be large enough so we can perform all reconciles
// that have changes before processing reconciles that have no detected changes.
defaultDelayedReconcileTime = 30 * time.Minute
// The max amount of jitter to add to delayedReconcileTime. This is used to ensure that all deferred TGBs are not
// reconciled together.
defaultMaxJitter = 15 * time.Minute

// The hash to set that is guaranteed to trigger a new reconcile loop (the hash calculation always has an '/')
resetHash = ""
)

type DeferredTargetGroupBindingReconciler interface {
Enqueue(tgb *elbv2api.TargetGroupBinding)
Run()
}

type deferredTargetGroupBindingReconcilerImpl struct {
delayQueue workqueue.DelayingInterface
syncPeriod time.Duration
k8sClient client.Client
logger logr.Logger

delayedReconcileTime time.Duration
maxJitter time.Duration
}

func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler {
return &deferredTargetGroupBindingReconcilerImpl{
syncPeriod: syncPeriod,
logger: logger,
delayQueue: delayQueue,
k8sClient: k8sClient,

delayedReconcileTime: defaultDelayedReconcileTime,
maxJitter: defaultMaxJitter,
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) {
nsn := k8s.NamespacedName(tgb)
if d.isEligibleForDefer(tgb) {
d.enqueue(nsn)
d.logger.Info("enqueued new deferred TGB", "TGB", nsn.Name)
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) Run() {
var item interface{}
shutDown := false
for !shutDown {
item, shutDown = d.delayQueue.Get()
if item != nil {
deferredNamespacedName := item.(types.NamespacedName)
d.logger.Info("Processing deferred TGB", "item", deferredNamespacedName)
d.handleDeferredItem(deferredNamespacedName)
d.delayQueue.Done(deferredNamespacedName)
}
}

d.logger.Info("Shutting down deferred TGB queue")
}

func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.NamespacedName) {
tgb := &elbv2api.TargetGroupBinding{}

err := d.k8sClient.Get(context.Background(), nsn, tgb)

if err != nil {
d.handleDeferredItemError(nsn, err, "Failed to get TGB in deferred queue")
return
}

// Re-check that this tgb hasn't been updated since it was enqueued
if !d.isEligibleForDefer(tgb) {
d.logger.Info("TGB not eligible for deferral", "TGB", nsn)
return
}

tgbOld := tgb.DeepCopy()
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash)

if err := d.k8sClient.Patch(context.Background(), tgb, client.MergeFrom(tgbOld)); err != nil {
d.handleDeferredItemError(nsn, err, "Failed to reset TGB checkpoint")
return
}
d.logger.Info("TGB checkpoint reset", "TGB", nsn)
}

func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn types.NamespacedName, err error, msg string) {
err = client.IgnoreNotFound(err)
if err != nil {
d.logger.Error(err, msg, "TGB", nsn)
d.enqueue(nsn)
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool {
then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0)
return time.Now().Sub(then) > d.syncPeriod
}

func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) {
delayedTime := d.jitterReconcileTime()
d.delayQueue.AddAfter(nsn, delayedTime)
}

func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Duration {

if d.maxJitter == 0 {
return d.delayedReconcileTime
}

return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter)))
}
Loading
Loading