Skip to content

Commit

Permalink
Feature: Defered queue for no-op TGB
Browse files Browse the repository at this point in the history
  • Loading branch information
zac-nixon committed Sep 23, 2024
1 parent 644600a commit c8f5815
Show file tree
Hide file tree
Showing 15 changed files with 773 additions and 75 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ all: controller

# Run tests
test: generate fmt vet manifests helm-lint
go test -race ./pkg/... ./webhooks/... -coverprofile cover.out
go test -race ./pkg/... ./webhooks/... ./controllers/... -coverprofile cover.out

# Build controller binary
controller: generate fmt vet
Expand Down
92 changes: 53 additions & 39 deletions controllers/elbv2/targetgroupbinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package controllers
import (
"context"
"fmt"
discv1 "k8s.io/api/discovery/v1"
"sigs.k8s.io/controller-runtime/pkg/handler"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
discv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/controllers/elbv2/eventhandlers"
Expand All @@ -48,15 +49,16 @@ const (

// NewTargetGroupBindingReconciler constructs new targetGroupBindingReconciler
func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager,
tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig,
tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler,
logger logr.Logger) *targetGroupBindingReconciler {

return &targetGroupBindingReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
tgbResourceManager: tgbResourceManager,
logger: logger,
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
tgbResourceManager: tgbResourceManager,
deferredTargetGroupBindingReconciler: deferredTargetGroupBindingReconciler,
logger: logger,

maxConcurrentReconciles: config.TargetGroupBindingMaxConcurrentReconciles,
maxExponentialBackoffDelay: config.TargetGroupBindingMaxExponentialBackoffDelay,
Expand All @@ -66,11 +68,12 @@ func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder reco

// targetGroupBindingReconciler reconciles a TargetGroupBinding object
type targetGroupBindingReconciler struct {
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
tgbResourceManager targetgroupbinding.ResourceManager
logger logr.Logger
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
tgbResourceManager targetgroupbinding.ResourceManager
deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler
logger logr.Logger

maxConcurrentReconciles int
maxExponentialBackoffDelay time.Duration
Expand Down Expand Up @@ -110,11 +113,18 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
return err
}

if err := r.tgbResourceManager.Reconcile(ctx, tgb); err != nil {
checkPoint, deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)

if err != nil {
return err
}

if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil {
if deferred {
r.deferredTargetGroupBindingReconciler.Enqueue(tgb)
return nil
}

if err := r.updateTargetGroupBindingStatus(ctx, tgb, checkPoint); err != nil {
r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
}
Expand All @@ -137,15 +147,24 @@ func (r *targetGroupBindingReconciler) cleanupTargetGroupBinding(ctx context.Con
return nil
}

func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
if aws.Int64Value(tgb.Status.ObservedGeneration) == tgb.Generation {
func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint string) error {
savedCheckPoint := targetgroupbinding.GetTGBReconcileCheckpoint(tgb)
if aws.Int64Value(tgb.Status.ObservedGeneration) == tgb.Generation && savedCheckPoint == newCheckPoint {
return nil
}

tgbOld := tgb.DeepCopy()
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, newCheckPoint)

if err := r.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb))
}

tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation)
if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
return errors.Wrapf(err, "failed to update targetGroupBinding status: %v", k8s.NamespacedName(tgb))
}

return nil
}

Expand All @@ -159,34 +178,29 @@ func (r *targetGroupBindingReconciler) SetupWithManager(ctx context.Context, mgr
nodeEventsHandler := eventhandlers.NewEnqueueRequestsForNodeEvent(r.k8sClient,
r.logger.WithName("eventHandlers").WithName("node"))

// Use the config flag to decide whether to use and watch an Endpoints event handler or an EndpointSlices event handler
var eventHandler handler.EventHandler
var clientObj client.Object

if r.enableEndpointSlices {
epSliceEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient,
clientObj = &discv1.EndpointSlice{}
eventHandler = eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient,
r.logger.WithName("eventHandlers").WithName("endpointslices"))
return ctrl.NewControllerManagedBy(mgr).
For(&elbv2api.TargetGroupBinding{}).
Named(controllerName).
Watches(&corev1.Service{}, svcEventHandler).
Watches(&discv1.EndpointSlice{}, epSliceEventsHandler).
Watches(&corev1.Node{}, nodeEventsHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
Complete(r)
} else {
epsEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient,
clientObj = &corev1.Endpoints{}
eventHandler = eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient,
r.logger.WithName("eventHandlers").WithName("endpoints"))
return ctrl.NewControllerManagedBy(mgr).
For(&elbv2api.TargetGroupBinding{}).
Named(controllerName).
Watches(&corev1.Service{}, svcEventHandler).
Watches(&corev1.Endpoints{}, epsEventsHandler).
Watches(&corev1.Node{}, nodeEventsHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
Complete(r)
}

return ctrl.NewControllerManagedBy(mgr).
For(&elbv2api.TargetGroupBinding{}).
Named(controllerName).
Watches(&corev1.Service{}, svcEventHandler).
Watches(clientObj, eventHandler).
Watches(&corev1.Node{}, nodeEventsHandler).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
Complete(r)
}

func (r *targetGroupBindingReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
Expand Down
130 changes: 130 additions & 0 deletions controllers/elbv2/targetgroupbinding_deferred_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package controllers

import (
"context"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"math/rand"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

const (
// The time to delay the reconcile. Generally, this number should be large enough so we can perform all reconciles
// that have changes before processing reconciles that have no detected changes.
defaultDelayedReconcileTime = 30 * time.Minute
// The max amount of jitter to add to delayedReconcileTime. This is used to ensure that all deferred TGBs are not
// reconciled together.
defaultMaxJitter = 15 * time.Minute

// The hash to set that is guaranteed to trigger a new reconcile loop (the hash calculation always has an '/')
resetHash = ""
)

type DeferredTargetGroupBindingReconciler interface {
Enqueue(tgb *elbv2api.TargetGroupBinding)
Run()
}

type deferredTargetGroupBindingReconcilerImpl struct {
delayQueue workqueue.DelayingInterface
syncPeriod time.Duration
k8sClient client.Client
logger logr.Logger

delayedReconcileTime time.Duration
maxJitter time.Duration
}

func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler {
return &deferredTargetGroupBindingReconcilerImpl{
syncPeriod: syncPeriod,
logger: logger,
delayQueue: delayQueue,
k8sClient: k8sClient,

delayedReconcileTime: defaultDelayedReconcileTime,
maxJitter: defaultMaxJitter,
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) {
nsn := k8s.NamespacedName(tgb)
if d.isEligibleForDefer(tgb) {
d.enqueue(nsn)
d.logger.Info("enqueued new deferred TGB", "TGB", nsn.Name)
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) Run() {
var item interface{}
shutDown := false
for !shutDown {
item, shutDown = d.delayQueue.Get()
if item != nil {
deferredNamespacedName := item.(types.NamespacedName)
d.logger.Info("Processing deferred TGB", "item", deferredNamespacedName)
d.handleDeferredItem(deferredNamespacedName)
d.delayQueue.Done(deferredNamespacedName)
}
}

d.logger.Info("Shutting down deferred TGB queue")
}

func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.NamespacedName) {
tgb := &elbv2api.TargetGroupBinding{}

err := d.k8sClient.Get(context.Background(), nsn, tgb)

if err != nil {
d.handleDeferredItemError(nsn, err, "Failed to get TGB in deferred queue")
return
}

// Re-check that this tgb hasn't been updated since it was enqueued
if !d.isEligibleForDefer(tgb) {
d.logger.Info("TGB not eligible for deferral", "TGB", nsn)
return
}

tgbOld := tgb.DeepCopy()
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash)

if err := d.k8sClient.Patch(context.Background(), tgb, client.MergeFrom(tgbOld)); err != nil {
d.handleDeferredItemError(nsn, err, "Failed to reset TGB checkpoint")
return
}
d.logger.Info("TGB checkpoint reset", "TGB", nsn)
}

func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn types.NamespacedName, err error, msg string) {
err = client.IgnoreNotFound(err)
if err != nil {
d.logger.Error(err, msg, "TGB", nsn)
d.enqueue(nsn)
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool {
then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0)
return time.Now().Sub(then) > d.syncPeriod
}

func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) {
delayedTime := d.jitterReconcileTime()
d.delayQueue.AddAfter(nsn, delayedTime)
}

func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Duration {

if d.maxJitter == 0 {
return d.delayedReconcileTime
}

return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter)))
}
Loading

0 comments on commit c8f5815

Please sign in to comment.