diff --git a/Makefile b/Makefile index 36202439a1..0b6d128e0a 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ all: controller # Run tests test: generate fmt vet manifests helm-lint - go test -race ./pkg/... ./webhooks/... -coverprofile cover.out + go test -race ./pkg/... ./webhooks/... ./controllers/... -coverprofile cover.out # Build controller binary controller: generate fmt vet diff --git a/controllers/elbv2/targetgroupbinding_controller.go b/controllers/elbv2/targetgroupbinding_controller.go index 88c1561eee..e3317bb010 100644 --- a/controllers/elbv2/targetgroupbinding_controller.go +++ b/controllers/elbv2/targetgroupbinding_controller.go @@ -19,12 +19,13 @@ package controllers import ( "context" "fmt" + discv1 "k8s.io/api/discovery/v1" + "sigs.k8s.io/controller-runtime/pkg/handler" "time" "github.com/aws/aws-sdk-go/aws" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - discv1 "k8s.io/api/discovery/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/aws-load-balancer-controller/controllers/elbv2/eventhandlers" @@ -48,15 +49,16 @@ const ( // NewTargetGroupBindingReconciler constructs new targetGroupBindingReconciler func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, - tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, + tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler, logger logr.Logger) *targetGroupBindingReconciler { return &targetGroupBindingReconciler{ - k8sClient: k8sClient, - eventRecorder: eventRecorder, - finalizerManager: finalizerManager, - tgbResourceManager: tgbResourceManager, - logger: logger, + k8sClient: k8sClient, + eventRecorder: eventRecorder, + finalizerManager: finalizerManager, + tgbResourceManager: tgbResourceManager, + deferredTargetGroupBindingReconciler: deferredTargetGroupBindingReconciler, + logger: logger, maxConcurrentReconciles: config.TargetGroupBindingMaxConcurrentReconciles, maxExponentialBackoffDelay: config.TargetGroupBindingMaxExponentialBackoffDelay, @@ -66,11 +68,12 @@ func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder reco // targetGroupBindingReconciler reconciles a TargetGroupBinding object type targetGroupBindingReconciler struct { - k8sClient client.Client - eventRecorder record.EventRecorder - finalizerManager k8s.FinalizerManager - tgbResourceManager targetgroupbinding.ResourceManager - logger logr.Logger + k8sClient client.Client + eventRecorder record.EventRecorder + finalizerManager k8s.FinalizerManager + tgbResourceManager targetgroupbinding.ResourceManager + deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler + logger logr.Logger maxConcurrentReconciles int maxExponentialBackoffDelay time.Duration @@ -110,11 +113,18 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C return err } - if err := r.tgbResourceManager.Reconcile(ctx, tgb); err != nil { + checkPoint, deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb) + + if err != nil { return err } - if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil { + if deferred { + r.deferredTargetGroupBindingReconciler.Enqueue(tgb) + return nil + } + + if err := r.updateTargetGroupBindingStatus(ctx, tgb, checkPoint); err != nil { r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err)) return err } @@ -137,15 +147,24 @@ func (r *targetGroupBindingReconciler) cleanupTargetGroupBinding(ctx context.Con return nil } -func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { - if aws.Int64Value(tgb.Status.ObservedGeneration) == tgb.Generation { +func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint string) error { + savedCheckPoint := targetgroupbinding.GetTGBReconcileCheckpoint(tgb) + if aws.Int64Value(tgb.Status.ObservedGeneration) == tgb.Generation && savedCheckPoint == newCheckPoint { return nil } + tgbOld := tgb.DeepCopy() + targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, newCheckPoint) + + if err := r.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { + return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb)) + } + tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation) if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { return errors.Wrapf(err, "failed to update targetGroupBinding status: %v", k8s.NamespacedName(tgb)) } + return nil } @@ -159,34 +178,29 @@ func (r *targetGroupBindingReconciler) SetupWithManager(ctx context.Context, mgr nodeEventsHandler := eventhandlers.NewEnqueueRequestsForNodeEvent(r.k8sClient, r.logger.WithName("eventHandlers").WithName("node")) - // Use the config flag to decide whether to use and watch an Endpoints event handler or an EndpointSlices event handler + var eventHandler handler.EventHandler + var clientObj client.Object + if r.enableEndpointSlices { - epSliceEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient, + clientObj = &discv1.EndpointSlice{} + eventHandler = eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient, r.logger.WithName("eventHandlers").WithName("endpointslices")) - return ctrl.NewControllerManagedBy(mgr). - For(&elbv2api.TargetGroupBinding{}). - Named(controllerName). - Watches(&corev1.Service{}, svcEventHandler). - Watches(&discv1.EndpointSlice{}, epSliceEventsHandler). - Watches(&corev1.Node{}, nodeEventsHandler). - WithOptions(controller.Options{ - MaxConcurrentReconciles: r.maxConcurrentReconciles, - RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}). - Complete(r) } else { - epsEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient, + clientObj = &corev1.Endpoints{} + eventHandler = eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient, r.logger.WithName("eventHandlers").WithName("endpoints")) - return ctrl.NewControllerManagedBy(mgr). - For(&elbv2api.TargetGroupBinding{}). - Named(controllerName). - Watches(&corev1.Service{}, svcEventHandler). - Watches(&corev1.Endpoints{}, epsEventsHandler). - Watches(&corev1.Node{}, nodeEventsHandler). - WithOptions(controller.Options{ - MaxConcurrentReconciles: r.maxConcurrentReconciles, - RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}). - Complete(r) } + + return ctrl.NewControllerManagedBy(mgr). + For(&elbv2api.TargetGroupBinding{}). + Named(controllerName). + Watches(&corev1.Service{}, svcEventHandler). + Watches(clientObj, eventHandler). + Watches(&corev1.Node{}, nodeEventsHandler). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.maxConcurrentReconciles, + RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}). + Complete(r) } func (r *targetGroupBindingReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error { diff --git a/controllers/elbv2/targetgroupbinding_deferred_reconciler.go b/controllers/elbv2/targetgroupbinding_deferred_reconciler.go new file mode 100644 index 0000000000..ab7aac4356 --- /dev/null +++ b/controllers/elbv2/targetgroupbinding_deferred_reconciler.go @@ -0,0 +1,130 @@ +package controllers + +import ( + "context" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "math/rand" + elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" + "sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding" + "sigs.k8s.io/controller-runtime/pkg/client" + "time" +) + +const ( + // The time to delay the reconcile. Generally, this number should be large enough so we can perform all reconciles + // that have changes before processing reconciles that have no detected changes. + defaultDelayedReconcileTime = 30 * time.Minute + // The max amount of jitter to add to delayedReconcileTime. This is used to ensure that all deferred TGBs are not + // reconciled together. + defaultMaxJitter = 15 * time.Minute + + // The hash to set that is guaranteed to trigger a new reconcile loop (the hash calculation always has an '/') + resetHash = "" +) + +type DeferredTargetGroupBindingReconciler interface { + Enqueue(tgb *elbv2api.TargetGroupBinding) + Run() +} + +type deferredTargetGroupBindingReconcilerImpl struct { + delayQueue workqueue.DelayingInterface + syncPeriod time.Duration + k8sClient client.Client + logger logr.Logger + + delayedReconcileTime time.Duration + maxJitter time.Duration +} + +func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler { + return &deferredTargetGroupBindingReconcilerImpl{ + syncPeriod: syncPeriod, + logger: logger, + delayQueue: delayQueue, + k8sClient: k8sClient, + + delayedReconcileTime: defaultDelayedReconcileTime, + maxJitter: defaultMaxJitter, + } +} + +func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) { + nsn := k8s.NamespacedName(tgb) + if d.isEligibleForDefer(tgb) { + d.enqueue(nsn) + d.logger.Info("enqueued new deferred TGB", "TGB", nsn.Name) + } +} + +func (d *deferredTargetGroupBindingReconcilerImpl) Run() { + var item interface{} + shutDown := false + for !shutDown { + item, shutDown = d.delayQueue.Get() + if item != nil { + deferredNamespacedName := item.(types.NamespacedName) + d.logger.Info("Processing deferred TGB", "item", deferredNamespacedName) + d.handleDeferredItem(deferredNamespacedName) + d.delayQueue.Done(deferredNamespacedName) + } + } + + d.logger.Info("Shutting down deferred TGB queue") +} + +func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.NamespacedName) { + tgb := &elbv2api.TargetGroupBinding{} + + err := d.k8sClient.Get(context.Background(), nsn, tgb) + + if err != nil { + d.handleDeferredItemError(nsn, err, "Failed to get TGB in deferred queue") + return + } + + // Re-check that this tgb hasn't been updated since it was enqueued + if !d.isEligibleForDefer(tgb) { + d.logger.Info("TGB not eligible for deferral", "TGB", nsn) + return + } + + tgbOld := tgb.DeepCopy() + targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash) + + if err := d.k8sClient.Patch(context.Background(), tgb, client.MergeFrom(tgbOld)); err != nil { + d.handleDeferredItemError(nsn, err, "Failed to reset TGB checkpoint") + return + } + d.logger.Info("TGB checkpoint reset", "TGB", nsn) +} + +func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn types.NamespacedName, err error, msg string) { + err = client.IgnoreNotFound(err) + if err != nil { + d.logger.Error(err, msg, "TGB", nsn) + d.enqueue(nsn) + } +} + +func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool { + then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0) + return time.Now().Sub(then) > d.syncPeriod +} + +func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) { + delayedTime := d.jitterReconcileTime() + d.delayQueue.AddAfter(nsn, delayedTime) +} + +func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Duration { + + if d.maxJitter == 0 { + return d.delayedReconcileTime + } + + return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter))) +} diff --git a/controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go b/controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go new file mode 100644 index 0000000000..07acdd3f48 --- /dev/null +++ b/controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go @@ -0,0 +1,386 @@ +package controllers + +import ( + "context" + "github.com/aws/aws-sdk-go/aws" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" + testclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + "strconv" + "testing" + "time" +) + +func TestDeferredReconcilerConstructor(t *testing.T) { + dq := workqueue.NewDelayingQueue() + defer dq.ShutDown() + syncPeriod := 5 * time.Minute + k8sClient := testclient.NewClientBuilder().Build() + logger := logr.New(&log.NullLogSink{}) + + d := NewDeferredTargetGroupBindingReconciler(dq, syncPeriod, k8sClient, logger) + + deferredReconciler := d.(*deferredTargetGroupBindingReconcilerImpl) + assert.Equal(t, dq, deferredReconciler.delayQueue) + assert.Equal(t, syncPeriod, deferredReconciler.syncPeriod) + assert.Equal(t, k8sClient, deferredReconciler.k8sClient) + assert.Equal(t, logger, deferredReconciler.logger) +} + +func TestDeferredReconcilerEnqueue(t *testing.T) { + syncPeriod := 5 * time.Minute + testCases := []struct { + name string + tgbToEnqueue []*elbv2api.TargetGroupBinding + expectedQueueEntries sets.Set[types.NamespacedName] + }{ + { + name: "one tgb to enqueue", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }), + }, + { + name: "sync period too short, do not enqueue", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10), + }, + }, + }, + }, + expectedQueueEntries: make(sets.Set[types.NamespacedName]), + }, + { + name: "sync period too long, do enqueue", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Add(-2*syncPeriod).Unix(), 10), + }, + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }), + }, + { + name: "multiple tgb", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb2", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb2", + Namespace: "ns1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb3", + Namespace: "ns3", + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }, types.NamespacedName{ + Name: "tgb1", + Namespace: "ns1", + }, types.NamespacedName{ + Name: "tgb2", + Namespace: "ns", + }, types.NamespacedName{ + Name: "tgb2", + Namespace: "ns1", + }, types.NamespacedName{ + Name: "tgb3", + Namespace: "ns3", + }), + }, + { + name: "de-dupe same tgb", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dq := workqueue.NewDelayingQueue() + defer dq.ShutDown() + + k8sSchema := runtime.NewScheme() + clientgoscheme.AddToScheme(k8sSchema) + elbv2api.AddToScheme(k8sSchema) + k8sClient := testclient.NewClientBuilder(). + WithScheme(k8sSchema). + Build() + + impl := deferredTargetGroupBindingReconcilerImpl{ + delayQueue: dq, + syncPeriod: syncPeriod, + k8sClient: k8sClient, + logger: logr.New(&log.NullLogSink{}), + + delayedReconcileTime: 0 * time.Millisecond, + maxJitter: 0 * time.Millisecond, + } + + for _, tgb := range tc.tgbToEnqueue { + impl.Enqueue(tgb) + } + + assert.Equal(t, tc.expectedQueueEntries.Len(), dq.Len()) + + for dq.Len() > 0 { + v, _ := dq.Get() + assert.True(t, tc.expectedQueueEntries.Has(v.(types.NamespacedName)), "Expected queue entry not found %+v", v) + } + + }) + } +} + +func TestDeferredReconcilerRun(t *testing.T) { + testCases := []struct { + name string + nsns []types.NamespacedName + }{ + { + name: "nothing enqueued", + }, + { + name: "something enqueued", + nsns: []types.NamespacedName{ + { + Name: "name", + Namespace: "ns", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dq := workqueue.NewDelayingQueue() + go func() { + time.Sleep(2 * time.Second) + assert.Equal(t, 0, dq.Len()) + dq.ShutDown() + }() + + for _, nsn := range tc.nsns { + dq.Add(nsn) + } + + k8sSchema := runtime.NewScheme() + clientgoscheme.AddToScheme(k8sSchema) + elbv2api.AddToScheme(k8sSchema) + k8sClient := testclient.NewClientBuilder(). + WithScheme(k8sSchema). + Build() + + impl := deferredTargetGroupBindingReconcilerImpl{ + delayQueue: dq, + syncPeriod: 5 * time.Minute, + k8sClient: k8sClient, + logger: logr.New(&log.NullLogSink{}), + + delayedReconcileTime: 0 * time.Millisecond, + maxJitter: 0 * time.Millisecond, + } + + impl.Run() + + time.Sleep(5 * time.Second) + }) + } +} + +func TestHandleDeferredItem(t *testing.T) { + syncPeriod := 5 * time.Minute + testCases := []struct { + name string + nsn types.NamespacedName + storedTGB *elbv2api.TargetGroupBinding + requeue bool + expectedCheckPoint *string + }{ + { + name: "not found", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + }, + { + name: "not eligible", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + storedTGB: &elbv2api.TargetGroupBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPoint: "foo", + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10), + }, + }, + }, + expectedCheckPoint: aws.String("foo"), + }, + { + name: "eligible", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + storedTGB: &elbv2api.TargetGroupBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPoint: "foo", + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Add(-2*syncPeriod).Unix(), 10), + }, + }, + }, + expectedCheckPoint: aws.String(""), + }, + { + name: "failure causes requeue", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + requeue: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dq := workqueue.NewDelayingQueue() + defer dq.ShutDown() + + k8sSchema := runtime.NewScheme() + // quick hack to inject a fault into the k8s client. + if !tc.requeue { + clientgoscheme.AddToScheme(k8sSchema) + elbv2api.AddToScheme(k8sSchema) + } + k8sClient := testclient.NewClientBuilder(). + WithScheme(k8sSchema). + Build() + + impl := deferredTargetGroupBindingReconcilerImpl{ + delayQueue: dq, + syncPeriod: syncPeriod, + k8sClient: k8sClient, + logger: logr.New(&log.NullLogSink{}), + + delayedReconcileTime: 0 * time.Millisecond, + maxJitter: 0 * time.Millisecond, + } + + if tc.storedTGB != nil { + k8sClient.Create(context.Background(), tc.storedTGB) + } + + impl.handleDeferredItem(tc.nsn) + + if tc.requeue { + assert.Equal(t, 1, dq.Len()) + } else { + assert.Equal(t, 0, dq.Len()) + } + + if tc.expectedCheckPoint != nil { + storedTGB := &elbv2api.TargetGroupBinding{} + k8sClient.Get(context.Background(), tc.nsn, storedTGB) + assert.Equal(t, *tc.expectedCheckPoint, storedTGB.Annotations[annotations.AnnotationCheckPoint]) + } + + }) + } +} diff --git a/main.go b/main.go index d9435e1755..8dcc0e2148 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "k8s.io/client-go/util/workqueue" "os" elbv2deploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/elbv2" @@ -121,9 +122,14 @@ func main() { svcReconciler := service.NewServiceReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("service"), finalizerManager, sgManager, sgReconciler, subnetResolver, vpcInfoProvider, elbv2TaggingManager, controllerCFG, backendSGProvider, sgResolver, ctrl.Log.WithName("controllers").WithName("service")) + + delayingQueue := workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{ + Name: "delayed-target-group-binding", + }) + + deferredTGBQueue := elbv2controller.NewDeferredTargetGroupBindingReconciler(delayingQueue, controllerCFG.RuntimeConfig.SyncPeriod, mgr.GetClient(), ctrl.Log.WithName("deferredTGBQueue")) tgbReconciler := elbv2controller.NewTargetGroupBindingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("targetGroupBinding"), - finalizerManager, tgbResManager, - controllerCFG, ctrl.Log.WithName("controllers").WithName("targetGroupBinding")) + finalizerManager, tgbResManager, controllerCFG, deferredTGBQueue, ctrl.Log.WithName("controllers").WithName("targetGroupBinding")) ctx := ctrl.SetupSignalHandler() if err = ingGroupReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil { @@ -177,6 +183,12 @@ func main() { os.Exit(1) } }() + + go func() { + setupLog.Info("starting deferred tgb reconciler") + deferredTGBQueue.Run() + }() + if err := podInfoRepo.WaitForCacheSync(ctx); err != nil { setupLog.Error(err, "problem wait for podInfo repo sync") os.Exit(1) diff --git a/pkg/algorithm/hash.go b/pkg/algorithm/hash.go new file mode 100644 index 0000000000..d26e6feeec --- /dev/null +++ b/pkg/algorithm/hash.go @@ -0,0 +1,12 @@ +package algorithm + +import ( + "crypto/sha256" + "encoding/base64" +) + +func ComputeSha256(s string) string { + checkpointHash := sha256.New() + _, _ = checkpointHash.Write([]byte(s)) + return base64.RawURLEncoding.EncodeToString(checkpointHash.Sum(nil)) +} diff --git a/pkg/annotations/constants.go b/pkg/annotations/constants.go index 493e0427ef..4f50173c2f 100644 --- a/pkg/annotations/constants.go +++ b/pkg/annotations/constants.go @@ -1,6 +1,13 @@ package annotations const ( + // AnnotationCheckPoint is the annotation used to store a checkpoint for resources. + // It contains an opaque value that represents the last known reconciled state. + AnnotationCheckPoint = "elbv2.k8s.aws/checkpoint" + + // AnnotationCheckPointTimestamp is the annotation used to store the last checkpointed time. The value is stored in seconds. + AnnotationCheckPointTimestamp = AnnotationCheckPoint + "-timestamp" + // IngressClass IngressClass = "kubernetes.io/ingress.class" diff --git a/pkg/aws/cloud.go b/pkg/aws/cloud.go index faba5cec47..06734a08b5 100644 --- a/pkg/aws/cloud.go +++ b/pkg/aws/cloud.go @@ -3,9 +3,11 @@ package aws import ( "context" "fmt" + "github.com/aws/aws-sdk-go/aws/client" "net" "os" "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/endpoints" @@ -91,7 +93,18 @@ func NewCloud(cfg CloudConfig, metricsRegisterer prometheus.Registerer, logger l } cfg.Region = region } - awsCFG := aws.NewConfig().WithRegion(cfg.Region).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint).WithMaxRetries(cfg.MaxRetries).WithEndpointResolver(endpointsResolver) + + clientRetryer := client.DefaultRetryer{ + NumMaxRetries: cfg.MaxRetries, + MinRetryDelay: time.Millisecond * 100, + MinThrottleDelay: time.Millisecond * 100, + MaxRetryDelay: time.Second * 10, + MaxThrottleDelay: time.Second * 10, + } + + awsCFG := aws.NewConfig().WithRegion(cfg.Region).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint).WithEndpointResolver(endpointsResolver) + awsCFG.Retryer = clientRetryer + opts = session.Options{} opts.Config.MergeIn(awsCFG) if !hasIPv4 { diff --git a/pkg/backend/endpoint_types.go b/pkg/backend/endpoint_types.go index 76d017bd5f..0b4ccc0194 100644 --- a/pkg/backend/endpoint_types.go +++ b/pkg/backend/endpoint_types.go @@ -1,12 +1,17 @@ package backend import ( + "fmt" corev1 "k8s.io/api/core/v1" discv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" ) +type Endpoint interface { + GetIdentifier() string +} + // An endpoint provided by pod directly. type PodEndpoint struct { // Pod's IP. @@ -17,6 +22,10 @@ type PodEndpoint struct { Pod k8s.PodInfo } +func (e PodEndpoint) GetIdentifier() string { + return fmt.Sprintf("%s:%d:%d", e.IP, e.Port, e.Pod.CreationTime.UnixMilli()) +} + // An endpoint provided by nodePort as traffic proxy. type NodePortEndpoint struct { // Node's instanceID. @@ -27,6 +36,10 @@ type NodePortEndpoint struct { Node *corev1.Node } +func (e NodePortEndpoint) GetIdentifier() string { + return fmt.Sprintf("%s:%d:%d", e.InstanceID, e.Port, e.Node.CreationTimestamp.UnixMilli()) +} + type EndpointsData struct { Ports []discv1.EndpointPort Endpoints []discv1.Endpoint diff --git a/pkg/k8s/pod_info.go b/pkg/k8s/pod_info.go index 2b81598f88..b7fea6ce0e 100644 --- a/pkg/k8s/pod_info.go +++ b/pkg/k8s/pod_info.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -23,6 +24,7 @@ type PodInfo struct { Conditions []corev1.PodCondition NodeName string PodIP string + CreationTime v1.Time ENIInfos []PodENIInfo } @@ -104,6 +106,7 @@ func buildPodInfo(pod *corev1.Pod) PodInfo { Conditions: pod.Status.Conditions, NodeName: pod.Spec.NodeName, PodIP: pod.Status.PodIP, + CreationTime: pod.CreationTimestamp, ENIInfos: podENIInfos, } diff --git a/pkg/k8s/pod_info_repo_test.go b/pkg/k8s/pod_info_repo_test.go index a0e62884b9..306c10dd21 100644 --- a/pkg/k8s/pod_info_repo_test.go +++ b/pkg/k8s/pod_info_repo_test.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "testing" + "time" ) func Test_podInfoKeyFunc(t *testing.T) { @@ -55,6 +56,8 @@ func Test_podInfoConversionFunc(t *testing.T) { type args struct { obj interface{} } + + timeNow := time.Now() tests := []struct { name string args args @@ -69,12 +72,16 @@ func Test_podInfoConversionFunc(t *testing.T) { Namespace: "ns-1", Name: "pod-a", UID: "pod-uuid", + CreationTimestamp: metav1.Time{ + Time: timeNow, + }, }, }, }, want: &PodInfo{ - Key: types.NamespacedName{Namespace: "ns-1", Name: "pod-a"}, - UID: "pod-uuid", + Key: types.NamespacedName{Namespace: "ns-1", Name: "pod-a"}, + UID: "pod-uuid", + CreationTime: metav1.Time{Time: timeNow}, }, }, { diff --git a/pkg/k8s/pod_info_test.go b/pkg/k8s/pod_info_test.go index c5eac214be..61c8b23058 100644 --- a/pkg/k8s/pod_info_test.go +++ b/pkg/k8s/pod_info_test.go @@ -8,6 +8,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "testing" + "time" ) func TestPodInfo_HasAnyOfReadinessGates(t *testing.T) { @@ -149,6 +150,7 @@ func TestPodInfo_GetPodCondition(t *testing.T) { type args struct { conditionType corev1.PodConditionType } + tests := []struct { name string pod PodInfo @@ -312,6 +314,9 @@ func Test_buildPodInfo(t *testing.T) { type args struct { pod *corev1.Pod } + + timeNow := time.Now() + tests := []struct { name string args args @@ -325,6 +330,9 @@ func Test_buildPodInfo(t *testing.T) { Namespace: "my-ns", Name: "pod-1", UID: "pod-uuid", + CreationTimestamp: metav1.Time{ + Time: timeNow, + }, }, Spec: corev1.PodSpec{ NodeName: "ip-192-168-13-198.us-west-2.compute.internal", @@ -409,8 +417,9 @@ func Test_buildPodInfo(t *testing.T) { Status: corev1.ConditionTrue, }, }, - NodeName: "ip-192-168-13-198.us-west-2.compute.internal", - PodIP: "192.168.1.1", + NodeName: "ip-192-168-13-198.us-west-2.compute.internal", + PodIP: "192.168.1.1", + CreationTime: metav1.Time{Time: timeNow}, }, }, { @@ -424,6 +433,9 @@ func Test_buildPodInfo(t *testing.T) { Annotations: map[string]string{ "vpc.amazonaws.com/pod-eni": `[{"eniId":"eni-06a712e1622fda4a0","ifAddress":"02:34:a5:25:0b:63","privateIp":"192.168.219.103","vlanId":3,"subnetCidr":"192.168.192.0/19"}]`, }, + CreationTimestamp: metav1.Time{ + Time: timeNow, + }, }, Spec: corev1.PodSpec{ NodeName: "ip-192-168-13-198.us-west-2.compute.internal", @@ -508,8 +520,9 @@ func Test_buildPodInfo(t *testing.T) { Status: corev1.ConditionTrue, }, }, - NodeName: "ip-192-168-13-198.us-west-2.compute.internal", - PodIP: "192.168.1.1", + NodeName: "ip-192-168-13-198.us-west-2.compute.internal", + PodIP: "192.168.1.1", + CreationTime: metav1.Time{Time: timeNow}, ENIInfos: []PodENIInfo{ { ENIID: "eni-06a712e1622fda4a0", diff --git a/pkg/k8s/secrets_manager.go b/pkg/k8s/secrets_manager.go index 83394a6b29..ec4c8c13cd 100644 --- a/pkg/k8s/secrets_manager.go +++ b/pkg/k8s/secrets_manager.go @@ -7,7 +7,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "github.com/go-logr/logr" @@ -44,7 +43,6 @@ type defaultSecretsManager struct { secretMap map[types.NamespacedName]*secretItem secretsEventChan chan<- event.TypedGenericEvent[*corev1.Secret] clientSet kubernetes.Interface - queue workqueue.RateLimitingInterface logger logr.Logger } diff --git a/pkg/targetgroupbinding/resource_manager.go b/pkg/targetgroupbinding/resource_manager.go index 7f92166254..a6aeedb882 100644 --- a/pkg/targetgroupbinding/resource_manager.go +++ b/pkg/targetgroupbinding/resource_manager.go @@ -30,7 +30,7 @@ const defaultTargetHealthRequeueDuration = 15 * time.Second // ResourceManager manages the TargetGroupBinding resource. type ResourceManager interface { - Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error + Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error } @@ -81,9 +81,9 @@ type defaultResourceManager struct { targetHealthRequeueDuration time.Duration } -func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { +func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) { if tgb.Spec.TargetType == nil { - return errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String()) + return "", false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String()) } if *tgb.Spec.TargetType == elbv2api.TargetTypeIP { return m.reconcileWithIPTargetType(ctx, tgb) @@ -104,7 +104,7 @@ func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.Targ return nil } -func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { +func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) { svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef) targetHealthCondType := BuildTargetHealthPodConditionType(tgb) @@ -121,16 +121,29 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, if err != nil { if errors.Is(err, backend.ErrNotFound) { m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return m.Cleanup(ctx, tgb) + return "", false, m.Cleanup(ctx, tgb) } - return err + return "", false, err + } + + currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + + if err != nil { + return "", false, err + } + + savedCheckPoint := GetTGBReconcileCheckpoint(tgb) + + if currentCheckPoint == savedCheckPoint { + m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint) + return currentCheckPoint, true, nil } tgARN := tgb.Spec.TargetGroupARN vpcID := tgb.Spec.VpcID targets, err := m.targetsManager.ListTargets(ctx, tgARN) if err != nil { - return err + return "", false, err } notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets) matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets) @@ -142,44 +155,45 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, } if len(unmatchedTargets) > 0 { if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil { - return err + return "", false, err } } if len(unmatchedEndpoints) > 0 { if err := m.registerPodEndpoints(ctx, tgARN, vpcID, unmatchedEndpoints); err != nil { - return err + return "", false, err } } anyPodNeedFurtherProbe, err := m.updateTargetHealthPodCondition(ctx, targetHealthCondType, matchedEndpointAndTargets, unmatchedEndpoints) if err != nil { - return err + return "", false, err } if anyPodNeedFurtherProbe { if containsTargetsInInitialState(matchedEndpointAndTargets) || len(unmatchedEndpoints) != 0 { - return runtime.NewRequeueNeededAfter("monitor targetHealth", m.targetHealthRequeueDuration) + return "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.targetHealthRequeueDuration) } - return runtime.NewRequeueNeeded("monitor targetHealth") + return "", false, runtime.NewRequeueNeeded("monitor targetHealth") } if containsPotentialReadyEndpoints { - return runtime.NewRequeueNeeded("monitor potential ready endpoints") + return "", false, runtime.NewRequeueNeeded("monitor potential ready endpoints") } _ = drainingTargets if needNetworkingRequeue { - return runtime.NewRequeueNeeded("networking reconciliation") + return "", false, runtime.NewRequeueNeeded("networking reconciliation") } - return nil + + return currentCheckPoint, false, nil } -func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { +func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) { svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef) nodeSelector, err := backend.GetTrafficProxyNodeSelector(tgb) if err != nil { - return err + return "", false, err } resolveOpts := []backend.EndpointResolveOption{backend.WithNodeSelector(nodeSelector)} @@ -187,33 +201,48 @@ func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Con if err != nil { if errors.Is(err, backend.ErrNotFound) { m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return m.Cleanup(ctx, tgb) + return "", false, m.Cleanup(ctx, tgb) } - return err + return "", false, err + } + + currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + + if err != nil { + return "", false, err + } + + savedCheckPoint := GetTGBReconcileCheckpoint(tgb) + + if currentCheckPoint == savedCheckPoint { + m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint) + return currentCheckPoint, true, nil } + tgARN := tgb.Spec.TargetGroupARN targets, err := m.targetsManager.ListTargets(ctx, tgARN) if err != nil { - return err + return "", false, err } notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets) _, unmatchedEndpoints, unmatchedTargets := matchNodePortEndpointWithTargets(endpoints, notDrainingTargets) if err := m.networkingManager.ReconcileForNodePortEndpoints(ctx, tgb, endpoints); err != nil { - return err + return "", false, err } + if len(unmatchedTargets) > 0 { if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil { - return err + return "", false, err } } if len(unmatchedEndpoints) > 0 { if err := m.registerNodePortEndpoints(ctx, tgARN, unmatchedEndpoints); err != nil { - return err + return "", false, err } } _ = drainingTargets - return nil + return currentCheckPoint, false, nil } func (m *defaultResourceManager) cleanupTargets(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { diff --git a/pkg/targetgroupbinding/utils.go b/pkg/targetgroupbinding/utils.go index 65f3860964..05bd3746c0 100644 --- a/pkg/targetgroupbinding/utils.go +++ b/pkg/targetgroupbinding/utils.go @@ -1,11 +1,19 @@ package targetgroupbinding import ( + "encoding/json" "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/algorithm" + "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" + "sigs.k8s.io/aws-load-balancer-controller/pkg/backend" "sigs.k8s.io/controller-runtime/pkg/client" + "slices" + "strconv" + "strings" + "time" ) const ( @@ -29,6 +37,59 @@ func IndexFuncServiceRefName(obj client.Object) []string { return []string{tgb.Spec.ServiceRef.Name} } +// calculateTGBReconcileCheckpoint calculates the checkpoint for a tgb using the endpoints and tgb spec +func calculateTGBReconcileCheckpoint[V backend.Endpoint](endpoints []V, tgb *elbv2api.TargetGroupBinding) (string, error) { + + endpointStrings := make([]string, 0, len(endpoints)) + + for _, ep := range endpoints { + endpointStrings = append(endpointStrings, ep.GetIdentifier()) + } + + slices.Sort(endpointStrings) + csv := strings.Join(endpointStrings, ",") + + specJSON, err := json.Marshal(tgb.Spec) + if err != nil { + return "", err + } + + endpointSha := algorithm.ComputeSha256(csv) + specSha := algorithm.ComputeSha256(string(specJSON)) + + return fmt.Sprintf("%s/%s", endpointSha, specSha), nil +} + +// GetTGBReconcileCheckpoint gets the sha256 hash saved in the annotations +func GetTGBReconcileCheckpoint(tgb *elbv2api.TargetGroupBinding) string { + if checkPoint, ok := tgb.Annotations[annotations.AnnotationCheckPoint]; ok { + return checkPoint + } + return "" +} + +// GetTGBReconcileCheckpointTimestamp gets the latest updated timestamp (in seconds) for the TGB checkpoint +func GetTGBReconcileCheckpointTimestamp(tgb *elbv2api.TargetGroupBinding) int64 { + if ts, ok := tgb.Annotations[annotations.AnnotationCheckPointTimestamp]; ok { + ts64, err := strconv.ParseInt(ts, 10, 64) + if err != nil { + return 0 + } + return ts64 + } + return 0 +} + +// SaveTGBReconcileCheckpoint updates the TGB object with a new checkpoint string. +func SaveTGBReconcileCheckpoint(tgb *elbv2api.TargetGroupBinding, checkpoint string) { + if tgb.Annotations == nil { + tgb.Annotations = map[string]string{} + } + + tgb.Annotations[annotations.AnnotationCheckPoint] = checkpoint + tgb.Annotations[annotations.AnnotationCheckPointTimestamp] = strconv.FormatInt(time.Now().Unix(), 10) +} + func buildServiceReferenceKey(tgb *elbv2api.TargetGroupBinding, svcRef elbv2api.ServiceReference) types.NamespacedName { return types.NamespacedName{ Namespace: tgb.Namespace,