diff --git a/Makefile b/Makefile index fa7f48e83f..66ff4ebea2 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ MAKEFILE_PATH = $(dir $(realpath -s $(firstword $(MAKEFILE_LIST)))) # Image URL to use all building/pushing image targets -IMG ?= public.ecr.aws/eks/aws-load-balancer-controller:v2.9.0 +IMG ?= public.ecr.aws/eks/aws-load-balancer-controller:v2.9.1 # Image URL to use for builder stage in Docker build GOLANG_VERSION ?= $(shell cat .go-version) BUILD_IMAGE ?= public.ecr.aws/docker/library/golang:$(GOLANG_VERSION) @@ -32,7 +32,7 @@ all: controller # Run tests test: generate fmt vet manifests helm-lint - go test -race ./pkg/... ./webhooks/... -coverprofile cover.out + go test -race ./pkg/... ./webhooks/... ./controllers/... -coverprofile cover.out # Build controller binary controller: generate fmt vet diff --git a/config/controller/kustomization.yaml b/config/controller/kustomization.yaml index 24b7714259..32050f9d60 100644 --- a/config/controller/kustomization.yaml +++ b/config/controller/kustomization.yaml @@ -9,4 +9,4 @@ kind: Kustomization images: - name: controller newName: public.ecr.aws/eks/aws-load-balancer-controller - newTag: v2.9.0 + newTag: v2.9.1 diff --git a/controllers/elbv2/targetgroupbinding_controller.go b/controllers/elbv2/targetgroupbinding_controller.go index 36f898b587..0f48d18cb2 100644 --- a/controllers/elbv2/targetgroupbinding_controller.go +++ b/controllers/elbv2/targetgroupbinding_controller.go @@ -19,12 +19,13 @@ package controllers import ( "context" "fmt" + discv1 "k8s.io/api/discovery/v1" + "sigs.k8s.io/controller-runtime/pkg/handler" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - discv1 "k8s.io/api/discovery/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/aws-load-balancer-controller/controllers/elbv2/eventhandlers" @@ -48,15 +49,16 @@ const ( // NewTargetGroupBindingReconciler constructs new targetGroupBindingReconciler func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, - tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, + tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler, logger logr.Logger) *targetGroupBindingReconciler { return &targetGroupBindingReconciler{ - k8sClient: k8sClient, - eventRecorder: eventRecorder, - finalizerManager: finalizerManager, - tgbResourceManager: tgbResourceManager, - logger: logger, + k8sClient: k8sClient, + eventRecorder: eventRecorder, + finalizerManager: finalizerManager, + tgbResourceManager: tgbResourceManager, + deferredTargetGroupBindingReconciler: deferredTargetGroupBindingReconciler, + logger: logger, maxConcurrentReconciles: config.TargetGroupBindingMaxConcurrentReconciles, maxExponentialBackoffDelay: config.TargetGroupBindingMaxExponentialBackoffDelay, @@ -66,11 +68,12 @@ func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder reco // targetGroupBindingReconciler reconciles a TargetGroupBinding object type targetGroupBindingReconciler struct { - k8sClient client.Client - eventRecorder record.EventRecorder - finalizerManager k8s.FinalizerManager - tgbResourceManager targetgroupbinding.ResourceManager - logger logr.Logger + k8sClient client.Client + eventRecorder record.EventRecorder + finalizerManager k8s.FinalizerManager + tgbResourceManager targetgroupbinding.ResourceManager + deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler + logger logr.Logger maxConcurrentReconciles int maxExponentialBackoffDelay time.Duration @@ -110,10 +113,17 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C return err } - if err := r.tgbResourceManager.Reconcile(ctx, tgb); err != nil { + deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb) + + if err != nil { return err } + if deferred { + r.deferredTargetGroupBindingReconciler.Enqueue(tgb) + return nil + } + if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil { r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err)) return err @@ -141,11 +151,14 @@ func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx contex if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation { return nil } + tgbOld := tgb.DeepCopy() + tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation) if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { return errors.Wrapf(err, "failed to update targetGroupBinding status: %v", k8s.NamespacedName(tgb)) } + return nil } @@ -159,34 +172,29 @@ func (r *targetGroupBindingReconciler) SetupWithManager(ctx context.Context, mgr nodeEventsHandler := eventhandlers.NewEnqueueRequestsForNodeEvent(r.k8sClient, r.logger.WithName("eventHandlers").WithName("node")) - // Use the config flag to decide whether to use and watch an Endpoints event handler or an EndpointSlices event handler + var eventHandler handler.EventHandler + var clientObj client.Object + if r.enableEndpointSlices { - epSliceEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient, + clientObj = &discv1.EndpointSlice{} + eventHandler = eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient, r.logger.WithName("eventHandlers").WithName("endpointslices")) - return ctrl.NewControllerManagedBy(mgr). - For(&elbv2api.TargetGroupBinding{}). - Named(controllerName). - Watches(&corev1.Service{}, svcEventHandler). - Watches(&discv1.EndpointSlice{}, epSliceEventsHandler). - Watches(&corev1.Node{}, nodeEventsHandler). - WithOptions(controller.Options{ - MaxConcurrentReconciles: r.maxConcurrentReconciles, - RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}). - Complete(r) } else { - epsEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient, + clientObj = &corev1.Endpoints{} + eventHandler = eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient, r.logger.WithName("eventHandlers").WithName("endpoints")) - return ctrl.NewControllerManagedBy(mgr). - For(&elbv2api.TargetGroupBinding{}). - Named(controllerName). - Watches(&corev1.Service{}, svcEventHandler). - Watches(&corev1.Endpoints{}, epsEventsHandler). - Watches(&corev1.Node{}, nodeEventsHandler). - WithOptions(controller.Options{ - MaxConcurrentReconciles: r.maxConcurrentReconciles, - RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}). - Complete(r) } + + return ctrl.NewControllerManagedBy(mgr). + For(&elbv2api.TargetGroupBinding{}). + Named(controllerName). + Watches(&corev1.Service{}, svcEventHandler). + Watches(clientObj, eventHandler). + Watches(&corev1.Node{}, nodeEventsHandler). + WithOptions(controller.Options{ + MaxConcurrentReconciles: r.maxConcurrentReconciles, + RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}). + Complete(r) } func (r *targetGroupBindingReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error { diff --git a/controllers/elbv2/targetgroupbinding_deferred_reconciler.go b/controllers/elbv2/targetgroupbinding_deferred_reconciler.go new file mode 100644 index 0000000000..ab7aac4356 --- /dev/null +++ b/controllers/elbv2/targetgroupbinding_deferred_reconciler.go @@ -0,0 +1,130 @@ +package controllers + +import ( + "context" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "math/rand" + elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" + "sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding" + "sigs.k8s.io/controller-runtime/pkg/client" + "time" +) + +const ( + // The time to delay the reconcile. Generally, this number should be large enough so we can perform all reconciles + // that have changes before processing reconciles that have no detected changes. + defaultDelayedReconcileTime = 30 * time.Minute + // The max amount of jitter to add to delayedReconcileTime. This is used to ensure that all deferred TGBs are not + // reconciled together. + defaultMaxJitter = 15 * time.Minute + + // The hash to set that is guaranteed to trigger a new reconcile loop (the hash calculation always has an '/') + resetHash = "" +) + +type DeferredTargetGroupBindingReconciler interface { + Enqueue(tgb *elbv2api.TargetGroupBinding) + Run() +} + +type deferredTargetGroupBindingReconcilerImpl struct { + delayQueue workqueue.DelayingInterface + syncPeriod time.Duration + k8sClient client.Client + logger logr.Logger + + delayedReconcileTime time.Duration + maxJitter time.Duration +} + +func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler { + return &deferredTargetGroupBindingReconcilerImpl{ + syncPeriod: syncPeriod, + logger: logger, + delayQueue: delayQueue, + k8sClient: k8sClient, + + delayedReconcileTime: defaultDelayedReconcileTime, + maxJitter: defaultMaxJitter, + } +} + +func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) { + nsn := k8s.NamespacedName(tgb) + if d.isEligibleForDefer(tgb) { + d.enqueue(nsn) + d.logger.Info("enqueued new deferred TGB", "TGB", nsn.Name) + } +} + +func (d *deferredTargetGroupBindingReconcilerImpl) Run() { + var item interface{} + shutDown := false + for !shutDown { + item, shutDown = d.delayQueue.Get() + if item != nil { + deferredNamespacedName := item.(types.NamespacedName) + d.logger.Info("Processing deferred TGB", "item", deferredNamespacedName) + d.handleDeferredItem(deferredNamespacedName) + d.delayQueue.Done(deferredNamespacedName) + } + } + + d.logger.Info("Shutting down deferred TGB queue") +} + +func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.NamespacedName) { + tgb := &elbv2api.TargetGroupBinding{} + + err := d.k8sClient.Get(context.Background(), nsn, tgb) + + if err != nil { + d.handleDeferredItemError(nsn, err, "Failed to get TGB in deferred queue") + return + } + + // Re-check that this tgb hasn't been updated since it was enqueued + if !d.isEligibleForDefer(tgb) { + d.logger.Info("TGB not eligible for deferral", "TGB", nsn) + return + } + + tgbOld := tgb.DeepCopy() + targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash) + + if err := d.k8sClient.Patch(context.Background(), tgb, client.MergeFrom(tgbOld)); err != nil { + d.handleDeferredItemError(nsn, err, "Failed to reset TGB checkpoint") + return + } + d.logger.Info("TGB checkpoint reset", "TGB", nsn) +} + +func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn types.NamespacedName, err error, msg string) { + err = client.IgnoreNotFound(err) + if err != nil { + d.logger.Error(err, msg, "TGB", nsn) + d.enqueue(nsn) + } +} + +func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool { + then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0) + return time.Now().Sub(then) > d.syncPeriod +} + +func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) { + delayedTime := d.jitterReconcileTime() + d.delayQueue.AddAfter(nsn, delayedTime) +} + +func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Duration { + + if d.maxJitter == 0 { + return d.delayedReconcileTime + } + + return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter))) +} diff --git a/controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go b/controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go new file mode 100644 index 0000000000..9e5628730a --- /dev/null +++ b/controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go @@ -0,0 +1,386 @@ +package controllers + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" + testclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + "strconv" + "testing" + "time" +) + +func TestDeferredReconcilerConstructor(t *testing.T) { + dq := workqueue.NewDelayingQueue() + defer dq.ShutDown() + syncPeriod := 5 * time.Minute + k8sClient := testclient.NewClientBuilder().Build() + logger := logr.New(&log.NullLogSink{}) + + d := NewDeferredTargetGroupBindingReconciler(dq, syncPeriod, k8sClient, logger) + + deferredReconciler := d.(*deferredTargetGroupBindingReconcilerImpl) + assert.Equal(t, dq, deferredReconciler.delayQueue) + assert.Equal(t, syncPeriod, deferredReconciler.syncPeriod) + assert.Equal(t, k8sClient, deferredReconciler.k8sClient) + assert.Equal(t, logger, deferredReconciler.logger) +} + +func TestDeferredReconcilerEnqueue(t *testing.T) { + syncPeriod := 5 * time.Minute + testCases := []struct { + name string + tgbToEnqueue []*elbv2api.TargetGroupBinding + expectedQueueEntries sets.Set[types.NamespacedName] + }{ + { + name: "one tgb to enqueue", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }), + }, + { + name: "sync period too short, do not enqueue", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10), + }, + }, + }, + }, + expectedQueueEntries: make(sets.Set[types.NamespacedName]), + }, + { + name: "sync period too long, do enqueue", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Add(-2*syncPeriod).Unix(), 10), + }, + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }), + }, + { + name: "multiple tgb", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb2", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb2", + Namespace: "ns1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb3", + Namespace: "ns3", + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }, types.NamespacedName{ + Name: "tgb1", + Namespace: "ns1", + }, types.NamespacedName{ + Name: "tgb2", + Namespace: "ns", + }, types.NamespacedName{ + Name: "tgb2", + Namespace: "ns1", + }, types.NamespacedName{ + Name: "tgb3", + Namespace: "ns3", + }), + }, + { + name: "de-dupe same tgb", + tgbToEnqueue: []*elbv2api.TargetGroupBinding{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "tgb1", + Namespace: "ns", + }, + }, + }, + expectedQueueEntries: sets.New(types.NamespacedName{ + Name: "tgb1", + Namespace: "ns", + }), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dq := workqueue.NewDelayingQueue() + defer dq.ShutDown() + + k8sSchema := runtime.NewScheme() + clientgoscheme.AddToScheme(k8sSchema) + elbv2api.AddToScheme(k8sSchema) + k8sClient := testclient.NewClientBuilder(). + WithScheme(k8sSchema). + Build() + + impl := deferredTargetGroupBindingReconcilerImpl{ + delayQueue: dq, + syncPeriod: syncPeriod, + k8sClient: k8sClient, + logger: logr.New(&log.NullLogSink{}), + + delayedReconcileTime: 0 * time.Millisecond, + maxJitter: 0 * time.Millisecond, + } + + for _, tgb := range tc.tgbToEnqueue { + impl.Enqueue(tgb) + } + + assert.Equal(t, tc.expectedQueueEntries.Len(), dq.Len()) + + for dq.Len() > 0 { + v, _ := dq.Get() + assert.True(t, tc.expectedQueueEntries.Has(v.(types.NamespacedName)), "Expected queue entry not found %+v", v) + } + + }) + } +} + +func TestDeferredReconcilerRun(t *testing.T) { + testCases := []struct { + name string + nsns []types.NamespacedName + }{ + { + name: "nothing enqueued", + }, + { + name: "something enqueued", + nsns: []types.NamespacedName{ + { + Name: "name", + Namespace: "ns", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dq := workqueue.NewDelayingQueue() + go func() { + time.Sleep(2 * time.Second) + assert.Equal(t, 0, dq.Len()) + dq.ShutDown() + }() + + for _, nsn := range tc.nsns { + dq.Add(nsn) + } + + k8sSchema := runtime.NewScheme() + clientgoscheme.AddToScheme(k8sSchema) + elbv2api.AddToScheme(k8sSchema) + k8sClient := testclient.NewClientBuilder(). + WithScheme(k8sSchema). + Build() + + impl := deferredTargetGroupBindingReconcilerImpl{ + delayQueue: dq, + syncPeriod: 5 * time.Minute, + k8sClient: k8sClient, + logger: logr.New(&log.NullLogSink{}), + + delayedReconcileTime: 0 * time.Millisecond, + maxJitter: 0 * time.Millisecond, + } + + impl.Run() + + time.Sleep(5 * time.Second) + }) + } +} + +func TestHandleDeferredItem(t *testing.T) { + syncPeriod := 5 * time.Minute + testCases := []struct { + name string + nsn types.NamespacedName + storedTGB *elbv2api.TargetGroupBinding + requeue bool + expectedCheckPoint *string + }{ + { + name: "not found", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + }, + { + name: "not eligible", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + storedTGB: &elbv2api.TargetGroupBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPoint: "foo", + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10), + }, + }, + }, + expectedCheckPoint: aws.String("foo"), + }, + { + name: "eligible", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + storedTGB: &elbv2api.TargetGroupBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + Annotations: map[string]string{ + annotations.AnnotationCheckPoint: "foo", + annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Add(-2*syncPeriod).Unix(), 10), + }, + }, + }, + expectedCheckPoint: aws.String(""), + }, + { + name: "failure causes requeue", + nsn: types.NamespacedName{ + Name: "name", + Namespace: "ns", + }, + requeue: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dq := workqueue.NewDelayingQueue() + defer dq.ShutDown() + + k8sSchema := runtime.NewScheme() + // quick hack to inject a fault into the k8s client. + if !tc.requeue { + clientgoscheme.AddToScheme(k8sSchema) + elbv2api.AddToScheme(k8sSchema) + } + k8sClient := testclient.NewClientBuilder(). + WithScheme(k8sSchema). + Build() + + impl := deferredTargetGroupBindingReconcilerImpl{ + delayQueue: dq, + syncPeriod: syncPeriod, + k8sClient: k8sClient, + logger: logr.New(&log.NullLogSink{}), + + delayedReconcileTime: 0 * time.Millisecond, + maxJitter: 0 * time.Millisecond, + } + + if tc.storedTGB != nil { + k8sClient.Create(context.Background(), tc.storedTGB) + } + + impl.handleDeferredItem(tc.nsn) + + if tc.requeue { + assert.Equal(t, 1, dq.Len()) + } else { + assert.Equal(t, 0, dq.Len()) + } + + if tc.expectedCheckPoint != nil { + storedTGB := &elbv2api.TargetGroupBinding{} + k8sClient.Get(context.Background(), tc.nsn, storedTGB) + assert.Equal(t, *tc.expectedCheckPoint, storedTGB.Annotations[annotations.AnnotationCheckPoint]) + } + + }) + } +} diff --git a/docs/deploy/installation.md b/docs/deploy/installation.md index dda9fd4a87..4e653f8aa6 100644 --- a/docs/deploy/installation.md +++ b/docs/deploy/installation.md @@ -90,15 +90,15 @@ Example condition for cluster name resource tag: 2. Download an IAM policy for the LBC using one of the following commands:

If your cluster is in a US Gov Cloud region: ``` - curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/install/iam_policy_us-gov.json + curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/install/iam_policy_us-gov.json ``` If your cluster is in a China region: ``` - curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/install/iam_policy_cn.json + curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/install/iam_policy_cn.json ``` If your cluster is in any other region: ``` - curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/install/iam_policy.json + curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/install/iam_policy.json ``` 3. Create an IAM policy named `AWSLoadBalancerControllerIAMPolicy`. If you downloaded a different policy, replace `iam-policy` with the name of the policy that you downloaded. @@ -124,7 +124,7 @@ Example condition for cluster name resource tag: ### Option B: Attach IAM policies to nodes If you're not setting up IAM roles for service accounts, apply the IAM policies from the following URL at a minimum. Please be aware of the possibility that the controller permissions may be assumed by other users in a pod after retrieving the node role credentials, so the best practice would be using IRSA instead of attaching IAM policy directly. ``` -curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/install/iam_policy.json +curl -o iam-policy.json https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/install/iam_policy.json ``` The following IAM permissions subset is for those using `TargetGroupBinding` only and don't plan to use the LBC to manage security group rules: @@ -209,7 +209,7 @@ We recommend using the Helm chart to install the controller. The chart supports ### Apply YAML 1. Download the spec for the LBC. ``` - wget https://github.com/kubernetes-sigs/aws-load-balancer-controller/releases/download/v2.9.0/v2_9_0_full.yaml + wget https://github.com/kubernetes-sigs/aws-load-balancer-controller/releases/download/v2.9.1/v2_9_1_full.yaml ``` 2. Edit the saved yaml file, go to the Deployment spec, and set the controller `--cluster-name` arg value to your EKS cluster name ``` @@ -233,15 +233,15 @@ We recommend using the Helm chart to install the controller. The chart supports ``` 4. Apply the yaml file ``` - kubectl apply -f v2_9_0_full.yaml + kubectl apply -f v2_9_1_full.yaml ``` 5. Optionally download the default ingressclass and ingressclass params ``` - wget https://github.com/kubernetes-sigs/aws-load-balancer-controller/releases/download/v2.9.0/v2_9_0_ingclass.yaml + wget https://github.com/kubernetes-sigs/aws-load-balancer-controller/releases/download/v2.9.1/v2_9_1_ingclass.yaml ``` 6. Apply the ingressclass and params ``` - kubectl apply -f v2_9_0_ingclass.yaml + kubectl apply -f v2_9_1_ingclass.yaml ``` ## Create Update Strategy diff --git a/docs/examples/echo_server.md b/docs/examples/echo_server.md index 7b4d2a0abf..edb04b117f 100644 --- a/docs/examples/echo_server.md +++ b/docs/examples/echo_server.md @@ -87,9 +87,9 @@ In this walkthrough, you'll 1. Deploy all the echoserver resources (namespace, service, deployment) ```bash - kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/examples/echoservice/echoserver-namespace.yaml &&\ - kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/examples/echoservice/echoserver-service.yaml &&\ - kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/examples/echoservice/echoserver-deployment.yaml + kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/examples/echoservice/echoserver-namespace.yaml &&\ + kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/examples/echoservice/echoserver-service.yaml &&\ + kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/examples/echoservice/echoserver-deployment.yaml ``` 1. List all the resources to ensure they were created. @@ -113,7 +113,7 @@ In this walkthrough, you'll 1. Download the echoserver ingress manifest locally. ```bash - wget https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/examples/echoservice/echoserver-ingress.yaml + wget https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/examples/echoservice/echoserver-ingress.yaml ``` 1. Configure the subnets, either by add annotation to the ingress or add tags to subnets. This step is optional in lieu of auto-discovery. @@ -300,7 +300,7 @@ You should get back a valid response. follow below steps if you want to use kube2iam to provide the AWS credentials 1. configure the proper policy - The policy to be used can be fetched from https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.0/docs/install/iam_policy.json + The policy to be used can be fetched from https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.9.1/docs/install/iam_policy.json 1. configure the proper role and create the trust relationship You have to find which role is associated with your K8S nodes. Once you found take note of the full arn: diff --git a/helm/aws-load-balancer-controller/Chart.yaml b/helm/aws-load-balancer-controller/Chart.yaml index 93d892dd63..df8ad92f1c 100644 --- a/helm/aws-load-balancer-controller/Chart.yaml +++ b/helm/aws-load-balancer-controller/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 name: aws-load-balancer-controller description: AWS Load Balancer Controller Helm chart for Kubernetes -version: 1.9.0 -appVersion: v2.9.0 +version: 1.9.1 +appVersion: v2.9.1 home: https://github.com/aws/eks-charts icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png sources: diff --git a/helm/aws-load-balancer-controller/test.yaml b/helm/aws-load-balancer-controller/test.yaml index e0dc7ef987..2163430d60 100644 --- a/helm/aws-load-balancer-controller/test.yaml +++ b/helm/aws-load-balancer-controller/test.yaml @@ -6,7 +6,7 @@ replicaCount: 2 image: repository: public.ecr.aws/eks/aws-load-balancer-controller - tag: v2.9.0 + tag: v2.9.1 pullPolicy: IfNotPresent imagePullSecrets: [] diff --git a/helm/aws-load-balancer-controller/values.yaml b/helm/aws-load-balancer-controller/values.yaml index 1be4b62d28..dcf09fd3cf 100644 --- a/helm/aws-load-balancer-controller/values.yaml +++ b/helm/aws-load-balancer-controller/values.yaml @@ -8,7 +8,7 @@ revisionHistoryLimit: 10 image: repository: public.ecr.aws/eks/aws-load-balancer-controller - tag: v2.9.0 + tag: v2.9.1 pullPolicy: IfNotPresent runtimeClassName: "" diff --git a/main.go b/main.go index 8bd9f29a41..c58ebe64ef 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "k8s.io/client-go/util/workqueue" "os" elbv2deploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/elbv2" @@ -124,9 +125,14 @@ func main() { svcReconciler := service.NewServiceReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("service"), finalizerManager, sgManager, sgReconciler, subnetResolver, vpcInfoProvider, elbv2TaggingManager, controllerCFG, backendSGProvider, sgResolver, ctrl.Log.WithName("controllers").WithName("service")) + + delayingQueue := workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{ + Name: "delayed-target-group-binding", + }) + + deferredTGBQueue := elbv2controller.NewDeferredTargetGroupBindingReconciler(delayingQueue, controllerCFG.RuntimeConfig.SyncPeriod, mgr.GetClient(), ctrl.Log.WithName("deferredTGBQueue")) tgbReconciler := elbv2controller.NewTargetGroupBindingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("targetGroupBinding"), - finalizerManager, tgbResManager, - controllerCFG, ctrl.Log.WithName("controllers").WithName("targetGroupBinding")) + finalizerManager, tgbResManager, controllerCFG, deferredTGBQueue, ctrl.Log.WithName("controllers").WithName("targetGroupBinding")) ctx := ctrl.SetupSignalHandler() if err = ingGroupReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil { @@ -180,6 +186,12 @@ func main() { os.Exit(1) } }() + + go func() { + setupLog.Info("starting deferred tgb reconciler") + deferredTGBQueue.Run() + }() + if err := podInfoRepo.WaitForCacheSync(ctx); err != nil { setupLog.Error(err, "problem wait for podInfo repo sync") os.Exit(1) diff --git a/pkg/algorithm/hash.go b/pkg/algorithm/hash.go new file mode 100644 index 0000000000..d26e6feeec --- /dev/null +++ b/pkg/algorithm/hash.go @@ -0,0 +1,12 @@ +package algorithm + +import ( + "crypto/sha256" + "encoding/base64" +) + +func ComputeSha256(s string) string { + checkpointHash := sha256.New() + _, _ = checkpointHash.Write([]byte(s)) + return base64.RawURLEncoding.EncodeToString(checkpointHash.Sum(nil)) +} diff --git a/pkg/annotations/constants.go b/pkg/annotations/constants.go index 2c738a2e64..c008f859ce 100644 --- a/pkg/annotations/constants.go +++ b/pkg/annotations/constants.go @@ -1,6 +1,13 @@ package annotations const ( + // AnnotationCheckPoint is the annotation used to store a checkpoint for resources. + // It contains an opaque value that represents the last known reconciled state. + AnnotationCheckPoint = "elbv2.k8s.aws/checkpoint" + + // AnnotationCheckPointTimestamp is the annotation used to store the last checkpointed time. The value is stored in seconds. + AnnotationCheckPointTimestamp = AnnotationCheckPoint + "-timestamp" + // IngressClass IngressClass = "kubernetes.io/ingress.class" diff --git a/pkg/backend/endpoint_resolver.go b/pkg/backend/endpoint_resolver.go index 254b4ae9f9..507ac16ca2 100644 --- a/pkg/backend/endpoint_resolver.go +++ b/pkg/backend/endpoint_resolver.go @@ -170,8 +170,10 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte containsPotentialReadyEndpoints = true continue } + podEndpoint := buildPodEndpoint(pod, epAddr, epPort) - if ep.Conditions.Ready != nil && *ep.Conditions.Ready { + // Recommendation from Kubernetes is to consider unknown ready status as ready (ready == nil) + if ep.Conditions.Ready == nil || *ep.Conditions.Ready { readyPodEndpoints = append(readyPodEndpoints, podEndpoint) continue } diff --git a/pkg/backend/endpoint_types.go b/pkg/backend/endpoint_types.go index 648db9cec8..4fb677fe94 100644 --- a/pkg/backend/endpoint_types.go +++ b/pkg/backend/endpoint_types.go @@ -1,12 +1,17 @@ package backend import ( + "fmt" corev1 "k8s.io/api/core/v1" discv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" ) +type Endpoint interface { + GetIdentifier() string +} + // An endpoint provided by pod directly. type PodEndpoint struct { // Pod's IP. @@ -17,6 +22,10 @@ type PodEndpoint struct { Pod k8s.PodInfo } +func (e PodEndpoint) GetIdentifier() string { + return fmt.Sprintf("%s:%d:%d", e.IP, e.Port, e.Pod.CreationTime.UnixMilli()) +} + // An endpoint provided by nodePort as traffic proxy. type NodePortEndpoint struct { // Node's instanceID. @@ -27,6 +36,10 @@ type NodePortEndpoint struct { Node *corev1.Node } +func (e NodePortEndpoint) GetIdentifier() string { + return fmt.Sprintf("%s:%d:%d", e.InstanceID, e.Port, e.Node.CreationTimestamp.UnixMilli()) +} + type EndpointsData struct { Ports []discv1.EndpointPort Endpoints []discv1.Endpoint diff --git a/pkg/k8s/pod_info.go b/pkg/k8s/pod_info.go index 2b81598f88..b7fea6ce0e 100644 --- a/pkg/k8s/pod_info.go +++ b/pkg/k8s/pod_info.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -23,6 +24,7 @@ type PodInfo struct { Conditions []corev1.PodCondition NodeName string PodIP string + CreationTime v1.Time ENIInfos []PodENIInfo } @@ -104,6 +106,7 @@ func buildPodInfo(pod *corev1.Pod) PodInfo { Conditions: pod.Status.Conditions, NodeName: pod.Spec.NodeName, PodIP: pod.Status.PodIP, + CreationTime: pod.CreationTimestamp, ENIInfos: podENIInfos, } diff --git a/pkg/k8s/pod_info_repo_test.go b/pkg/k8s/pod_info_repo_test.go index a0e62884b9..306c10dd21 100644 --- a/pkg/k8s/pod_info_repo_test.go +++ b/pkg/k8s/pod_info_repo_test.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "testing" + "time" ) func Test_podInfoKeyFunc(t *testing.T) { @@ -55,6 +56,8 @@ func Test_podInfoConversionFunc(t *testing.T) { type args struct { obj interface{} } + + timeNow := time.Now() tests := []struct { name string args args @@ -69,12 +72,16 @@ func Test_podInfoConversionFunc(t *testing.T) { Namespace: "ns-1", Name: "pod-a", UID: "pod-uuid", + CreationTimestamp: metav1.Time{ + Time: timeNow, + }, }, }, }, want: &PodInfo{ - Key: types.NamespacedName{Namespace: "ns-1", Name: "pod-a"}, - UID: "pod-uuid", + Key: types.NamespacedName{Namespace: "ns-1", Name: "pod-a"}, + UID: "pod-uuid", + CreationTime: metav1.Time{Time: timeNow}, }, }, { diff --git a/pkg/k8s/pod_info_test.go b/pkg/k8s/pod_info_test.go index c5eac214be..61c8b23058 100644 --- a/pkg/k8s/pod_info_test.go +++ b/pkg/k8s/pod_info_test.go @@ -8,6 +8,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "testing" + "time" ) func TestPodInfo_HasAnyOfReadinessGates(t *testing.T) { @@ -149,6 +150,7 @@ func TestPodInfo_GetPodCondition(t *testing.T) { type args struct { conditionType corev1.PodConditionType } + tests := []struct { name string pod PodInfo @@ -312,6 +314,9 @@ func Test_buildPodInfo(t *testing.T) { type args struct { pod *corev1.Pod } + + timeNow := time.Now() + tests := []struct { name string args args @@ -325,6 +330,9 @@ func Test_buildPodInfo(t *testing.T) { Namespace: "my-ns", Name: "pod-1", UID: "pod-uuid", + CreationTimestamp: metav1.Time{ + Time: timeNow, + }, }, Spec: corev1.PodSpec{ NodeName: "ip-192-168-13-198.us-west-2.compute.internal", @@ -409,8 +417,9 @@ func Test_buildPodInfo(t *testing.T) { Status: corev1.ConditionTrue, }, }, - NodeName: "ip-192-168-13-198.us-west-2.compute.internal", - PodIP: "192.168.1.1", + NodeName: "ip-192-168-13-198.us-west-2.compute.internal", + PodIP: "192.168.1.1", + CreationTime: metav1.Time{Time: timeNow}, }, }, { @@ -424,6 +433,9 @@ func Test_buildPodInfo(t *testing.T) { Annotations: map[string]string{ "vpc.amazonaws.com/pod-eni": `[{"eniId":"eni-06a712e1622fda4a0","ifAddress":"02:34:a5:25:0b:63","privateIp":"192.168.219.103","vlanId":3,"subnetCidr":"192.168.192.0/19"}]`, }, + CreationTimestamp: metav1.Time{ + Time: timeNow, + }, }, Spec: corev1.PodSpec{ NodeName: "ip-192-168-13-198.us-west-2.compute.internal", @@ -508,8 +520,9 @@ func Test_buildPodInfo(t *testing.T) { Status: corev1.ConditionTrue, }, }, - NodeName: "ip-192-168-13-198.us-west-2.compute.internal", - PodIP: "192.168.1.1", + NodeName: "ip-192-168-13-198.us-west-2.compute.internal", + PodIP: "192.168.1.1", + CreationTime: metav1.Time{Time: timeNow}, ENIInfos: []PodENIInfo{ { ENIID: "eni-06a712e1622fda4a0", diff --git a/pkg/k8s/secrets_manager.go b/pkg/k8s/secrets_manager.go index 83394a6b29..ec4c8c13cd 100644 --- a/pkg/k8s/secrets_manager.go +++ b/pkg/k8s/secrets_manager.go @@ -7,7 +7,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "github.com/go-logr/logr" @@ -44,7 +43,6 @@ type defaultSecretsManager struct { secretMap map[types.NamespacedName]*secretItem secretsEventChan chan<- event.TypedGenericEvent[*corev1.Secret] clientSet kubernetes.Interface - queue workqueue.RateLimitingInterface logger logr.Logger } diff --git a/pkg/targetgroupbinding/resource_manager.go b/pkg/targetgroupbinding/resource_manager.go index d9b263ea28..9bef6a70f3 100644 --- a/pkg/targetgroupbinding/resource_manager.go +++ b/pkg/targetgroupbinding/resource_manager.go @@ -26,11 +26,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -const defaultTargetHealthRequeueDuration = 15 * time.Second +const defaultRequeueDuration = 15 * time.Second // ResourceManager manages the TargetGroupBinding resource. type ResourceManager interface { - Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error + Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error } @@ -60,7 +60,7 @@ func NewDefaultResourceManager(k8sClient client.Client, elbv2Client services.ELB vpcInfoProvider: vpcInfoProvider, podInfoRepo: podInfoRepo, - targetHealthRequeueDuration: defaultTargetHealthRequeueDuration, + requeueDuration: defaultRequeueDuration, } } @@ -78,17 +78,34 @@ type defaultResourceManager struct { podInfoRepo k8s.PodInfoRepo vpcID string - targetHealthRequeueDuration time.Duration + requeueDuration time.Duration } -func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { +func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error) { if tgb.Spec.TargetType == nil { - return errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String()) + return false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String()) } + + var newCheckPoint string + var oldCheckPoint string + var isDeferred bool + var err error + if *tgb.Spec.TargetType == elbv2api.TargetTypeIP { - return m.reconcileWithIPTargetType(ctx, tgb) + newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithIPTargetType(ctx, tgb) + } else { + newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithInstanceTargetType(ctx, tgb) } - return m.reconcileWithInstanceTargetType(ctx, tgb) + + if err != nil { + return false, err + } + + if isDeferred { + return true, nil + } + + return false, m.updateTGBCheckPoint(ctx, tgb, newCheckPoint, oldCheckPoint) } func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { @@ -104,7 +121,8 @@ func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.Targ return nil } -func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { +func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) { + tgbScopedLogger := m.logger.WithValues("tgb", k8s.NamespacedName(tgb)) svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef) targetHealthCondType := BuildTargetHealthPodConditionType(tgb) @@ -121,65 +139,105 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, if err != nil { if errors.Is(err, backend.ErrNotFound) { m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return m.Cleanup(ctx, tgb) + return "", "", false, m.Cleanup(ctx, tgb) } - return err + return "", "", false, err + } + + newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + + if err != nil { + return "", "", false, err + } + + oldCheckPoint := GetTGBReconcileCheckpoint(tgb) + + if !containsPotentialReadyEndpoints && oldCheckPoint == newCheckPoint { + tgbScopedLogger.Info("Skipping targetgroupbinding reconcile", "calculated hash", newCheckPoint) + return newCheckPoint, oldCheckPoint, true, nil } tgARN := tgb.Spec.TargetGroupARN vpcID := tgb.Spec.VpcID targets, err := m.targetsManager.ListTargets(ctx, tgARN) if err != nil { - return err + return "", "", false, err } - notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets) + notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets) matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets) needNetworkingRequeue := false if err := m.networkingManager.ReconcileForPodEndpoints(ctx, tgb, endpoints); err != nil { + tgbScopedLogger.Error(err, "Requesting network requeue due to error from ReconcileForPodEndpoints") m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedNetworkReconcile, err.Error()) needNetworkingRequeue = true } + + preflightNeedFurtherProbe := false + for _, endpointAndTarget := range matchedEndpointAndTargets { + _, localPreflight := m.calculateReadinessGateTransition(endpointAndTarget.endpoint.Pod, targetHealthCondType, endpointAndTarget.target.TargetHealth) + if localPreflight { + preflightNeedFurtherProbe = true + break + } + } + + // Any change that we perform should reset the checkpoint. + // TODO - How to make this cleaner? + if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 || needNetworkingRequeue || containsPotentialReadyEndpoints || preflightNeedFurtherProbe { + // Set to an empty checkpoint, to ensure that no matter what we try to reconcile atleast one more time. + // Consider this ordering of events (without using this method of overriding the checkpoint) + // 1. Register some pod IP, don't update TGB checkpoint. + // 2. Before next invocation of reconcile happens, the pod is removed. + // 3. The next reconcile loop has no knowledge that it needs to deregister the pod ip, therefore it skips deregistering the removed pod ip. + err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint) + if err != nil { + tgbScopedLogger.Error(err, "Unable to update checkpoint before mutating change") + return "", "", false, err + } + } + if len(unmatchedTargets) > 0 { if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil { - return err + return "", "", false, err } } if len(unmatchedEndpoints) > 0 { if err := m.registerPodEndpoints(ctx, tgARN, vpcID, unmatchedEndpoints); err != nil { - return err + return "", "", false, err } } anyPodNeedFurtherProbe, err := m.updateTargetHealthPodCondition(ctx, targetHealthCondType, matchedEndpointAndTargets, unmatchedEndpoints) if err != nil { - return err + return "", "", false, err } if anyPodNeedFurtherProbe { - if containsTargetsInInitialState(matchedEndpointAndTargets) || len(unmatchedEndpoints) != 0 { - return runtime.NewRequeueNeededAfter("monitor targetHealth", m.targetHealthRequeueDuration) - } - return runtime.NewRequeueNeeded("monitor targetHealth") + tgbScopedLogger.Info("Requeue for target monitor target health") + return "", "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.requeueDuration) } if containsPotentialReadyEndpoints { - return runtime.NewRequeueNeeded("monitor potential ready endpoints") + tgbScopedLogger.Info("Requeue for potentially ready endpoints") + return "", "", false, runtime.NewRequeueNeededAfter("monitor potential ready endpoints", m.requeueDuration) } - _ = drainingTargets - if needNetworkingRequeue { - return runtime.NewRequeueNeeded("networking reconciliation") + tgbScopedLogger.Info("Requeue for networking requeue") + return "", "", false, runtime.NewRequeueNeededAfter("networking reconciliation", m.requeueDuration) } - return nil + + tgbScopedLogger.Info("Successful reconcile", "checkpoint", newCheckPoint) + return newCheckPoint, oldCheckPoint, false, nil } -func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { +func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) { + tgbScopedLogger := m.logger.WithValues("tgb", k8s.NamespacedName(tgb)) svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef) nodeSelector, err := backend.GetTrafficProxyNodeSelector(tgb) if err != nil { - return err + return "", "", false, err } resolveOpts := []backend.EndpointResolveOption{backend.WithNodeSelector(nodeSelector)} @@ -187,33 +245,58 @@ func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Con if err != nil { if errors.Is(err, backend.ErrNotFound) { m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return m.Cleanup(ctx, tgb) + return "", "", false, m.Cleanup(ctx, tgb) } - return err + return "", "", false, err + } + + newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + + if err != nil { + return "", "", false, err } + + oldCheckPoint := GetTGBReconcileCheckpoint(tgb) + + if newCheckPoint == oldCheckPoint { + tgbScopedLogger.Info("Skipping targetgroupbinding reconcile", "calculated hash", newCheckPoint) + return newCheckPoint, oldCheckPoint, true, nil + } + tgARN := tgb.Spec.TargetGroupARN targets, err := m.targetsManager.ListTargets(ctx, tgARN) if err != nil { - return err + return "", "", false, err } - notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets) + notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets) _, unmatchedEndpoints, unmatchedTargets := matchNodePortEndpointWithTargets(endpoints, notDrainingTargets) if err := m.networkingManager.ReconcileForNodePortEndpoints(ctx, tgb, endpoints); err != nil { - return err + tgbScopedLogger.Error(err, "Requesting network requeue due to error from ReconcileForNodePortEndpoints") + return "", "", false, err } + + if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 { + // Same thought process, see the IP target registration code as to why we clear out the check point. + err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint) + if err != nil { + tgbScopedLogger.Error(err, "Unable to update checkpoint before mutating change") + return "", "", false, err + } + } + if len(unmatchedTargets) > 0 { if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil { - return err + return "", "", false, err } } if len(unmatchedEndpoints) > 0 { if err := m.registerNodePortEndpoints(ctx, tgARN, unmatchedEndpoints); err != nil { - return err + return "", "", false, err } } - _ = drainingTargets - return nil + tgbScopedLogger.Info("Successful reconcile", "checkpoint", newCheckPoint) + return newCheckPoint, oldCheckPoint, false, nil } func (m *defaultResourceManager) cleanupTargets(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { @@ -281,19 +364,13 @@ func (m *defaultResourceManager) updateTargetHealthPodConditionForPod(ctx contex return false, nil } - targetHealthCondStatus := corev1.ConditionUnknown var reason, message string if targetHealth != nil { - if string(targetHealth.State) == string(elbv2types.TargetHealthStateEnumHealthy) { - targetHealthCondStatus = corev1.ConditionTrue - } else { - targetHealthCondStatus = corev1.ConditionFalse - } - reason = string(targetHealth.Reason) message = awssdk.ToString(targetHealth.Description) } - needFurtherProbe := targetHealthCondStatus != corev1.ConditionTrue + + targetHealthCondStatus, needFurtherProbe := m.calculateReadinessGateTransition(pod, targetHealthCondType, targetHealth) existingTargetHealthCond, hasExistingTargetHealthCond := pod.GetPodCondition(targetHealthCondType) // we skip patch pod if it matches current computed status/reason/message. @@ -343,6 +420,21 @@ func (m *defaultResourceManager) updateTargetHealthPodConditionForPod(ctx contex return needFurtherProbe, nil } +func (m *defaultResourceManager) calculateReadinessGateTransition(pod k8s.PodInfo, targetHealthCondType corev1.PodConditionType, targetHealth *elbv2types.TargetHealth) (corev1.ConditionStatus, bool) { + if !pod.HasAnyOfReadinessGates([]corev1.PodConditionType{targetHealthCondType}) { + return corev1.ConditionTrue, false + } + targetHealthCondStatus := corev1.ConditionUnknown + if targetHealth != nil { + if string(targetHealth.State) == string(elbv2types.TargetHealthStateEnumHealthy) { + targetHealthCondStatus = corev1.ConditionTrue + } else { + targetHealthCondStatus = corev1.ConditionFalse + } + } + return targetHealthCondStatus, targetHealthCondStatus != corev1.ConditionTrue +} + // updatePodAsHealthyForDeletedTGB updates pod's targetHealth condition as healthy when deleting a TGB // if the pod has readiness Gate. func (m *defaultResourceManager) updatePodAsHealthyForDeletedTGB(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { @@ -432,6 +524,20 @@ func (m *defaultResourceManager) registerNodePortEndpoints(ctx context.Context, return m.targetsManager.RegisterTargets(ctx, tgARN, sdkTargets) } +func (m *defaultResourceManager) updateTGBCheckPoint(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint, previousCheckPoint string) error { + if newCheckPoint == previousCheckPoint { + return nil + } + + tgbOld := tgb.DeepCopy() + SaveTGBReconcileCheckpoint(tgb, newCheckPoint) + + if err := m.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { + return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb)) + } + return nil +} + type podEndpointAndTargetPair struct { endpoint backend.PodEndpoint target TargetInfo diff --git a/pkg/targetgroupbinding/utils.go b/pkg/targetgroupbinding/utils.go index 65f3860964..05bd3746c0 100644 --- a/pkg/targetgroupbinding/utils.go +++ b/pkg/targetgroupbinding/utils.go @@ -1,11 +1,19 @@ package targetgroupbinding import ( + "encoding/json" "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/algorithm" + "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" + "sigs.k8s.io/aws-load-balancer-controller/pkg/backend" "sigs.k8s.io/controller-runtime/pkg/client" + "slices" + "strconv" + "strings" + "time" ) const ( @@ -29,6 +37,59 @@ func IndexFuncServiceRefName(obj client.Object) []string { return []string{tgb.Spec.ServiceRef.Name} } +// calculateTGBReconcileCheckpoint calculates the checkpoint for a tgb using the endpoints and tgb spec +func calculateTGBReconcileCheckpoint[V backend.Endpoint](endpoints []V, tgb *elbv2api.TargetGroupBinding) (string, error) { + + endpointStrings := make([]string, 0, len(endpoints)) + + for _, ep := range endpoints { + endpointStrings = append(endpointStrings, ep.GetIdentifier()) + } + + slices.Sort(endpointStrings) + csv := strings.Join(endpointStrings, ",") + + specJSON, err := json.Marshal(tgb.Spec) + if err != nil { + return "", err + } + + endpointSha := algorithm.ComputeSha256(csv) + specSha := algorithm.ComputeSha256(string(specJSON)) + + return fmt.Sprintf("%s/%s", endpointSha, specSha), nil +} + +// GetTGBReconcileCheckpoint gets the sha256 hash saved in the annotations +func GetTGBReconcileCheckpoint(tgb *elbv2api.TargetGroupBinding) string { + if checkPoint, ok := tgb.Annotations[annotations.AnnotationCheckPoint]; ok { + return checkPoint + } + return "" +} + +// GetTGBReconcileCheckpointTimestamp gets the latest updated timestamp (in seconds) for the TGB checkpoint +func GetTGBReconcileCheckpointTimestamp(tgb *elbv2api.TargetGroupBinding) int64 { + if ts, ok := tgb.Annotations[annotations.AnnotationCheckPointTimestamp]; ok { + ts64, err := strconv.ParseInt(ts, 10, 64) + if err != nil { + return 0 + } + return ts64 + } + return 0 +} + +// SaveTGBReconcileCheckpoint updates the TGB object with a new checkpoint string. +func SaveTGBReconcileCheckpoint(tgb *elbv2api.TargetGroupBinding, checkpoint string) { + if tgb.Annotations == nil { + tgb.Annotations = map[string]string{} + } + + tgb.Annotations[annotations.AnnotationCheckPoint] = checkpoint + tgb.Annotations[annotations.AnnotationCheckPointTimestamp] = strconv.FormatInt(time.Now().Unix(), 10) +} + func buildServiceReferenceKey(tgb *elbv2api.TargetGroupBinding, svcRef elbv2api.ServiceReference) types.NamespacedName { return types.NamespacedName{ Namespace: tgb.Namespace, diff --git a/version-stable.txt b/version-stable.txt index c8e38b6140..dedcc7d433 100644 --- a/version-stable.txt +++ b/version-stable.txt @@ -1 +1 @@ -2.9.0 +2.9.1 diff --git a/version.txt b/version.txt index c8e38b6140..dedcc7d433 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -2.9.0 +2.9.1