Skip to content

Commit

Permalink
add tags cache for elb (kubernetes-sigs#3550)
Browse files Browse the repository at this point in the history
  • Loading branch information
M00nF1sh authored Feb 6, 2024
1 parent 004e2ab commit c77b6eb
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 21 deletions.
5 changes: 2 additions & 3 deletions controllers/ingress/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ const (
func NewGroupReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder,
finalizerManager k8s.FinalizerManager, networkingSGManager networkingpkg.SecurityGroupManager,
networkingSGReconciler networkingpkg.SecurityGroupReconciler, subnetsResolver networkingpkg.SubnetsResolver,
controllerConfig config.ControllerConfig, backendSGProvider networkingpkg.BackendSGProvider,
elbv2TaggingManager elbv2deploy.TaggingManager, controllerConfig config.ControllerConfig, backendSGProvider networkingpkg.BackendSGProvider,
sgResolver networkingpkg.SecurityGroupResolver, logger logr.Logger) *groupReconciler {

annotationParser := annotations.NewSuffixAnnotationParser(annotations.AnnotationPrefixIngress)
authConfigBuilder := ingress.NewDefaultAuthConfigBuilder(annotationParser)
enhancedBackendBuilder := ingress.NewDefaultEnhancedBackendBuilder(k8sClient, annotationParser, authConfigBuilder, controllerConfig.IngressConfig.TolerateNonExistentBackendService, controllerConfig.IngressConfig.TolerateNonExistentBackendAction)
referenceIndexer := ingress.NewDefaultReferenceIndexer(enhancedBackendBuilder, authConfigBuilder, logger)
trackingProvider := tracking.NewDefaultProvider(ingressTagPrefix, controllerConfig.ClusterName)
elbv2TaggingManager := elbv2deploy.NewDefaultTaggingManager(cloud.ELBV2(), cloud.VpcID(), controllerConfig.FeatureGates, cloud.RGT(), logger)
modelBuilder := ingress.NewDefaultModelBuilder(k8sClient, eventRecorder,
cloud.EC2(), cloud.ELBV2(), cloud.ACM(),
annotationParser, subnetsResolver,
Expand All @@ -63,7 +62,7 @@ func NewGroupReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder
controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, backendSGProvider, sgResolver,
controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), logger)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler,
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, elbv2TaggingManager,
controllerConfig, ingressTagPrefix, logger)
classLoader := ingress.NewDefaultClassLoader(k8sClient, true)
classAnnotationMatcher := ingress.NewDefaultClassAnnotationMatcher(controllerConfig.IngressConfig.IngressClass)
Expand Down
7 changes: 3 additions & 4 deletions controllers/service/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/elbv2"
elbv2deploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/elbv2"
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/tracking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
Expand All @@ -38,19 +38,18 @@ const (
func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder,
finalizerManager k8s.FinalizerManager, networkingSGManager networking.SecurityGroupManager,
networkingSGReconciler networking.SecurityGroupReconciler, subnetsResolver networking.SubnetsResolver,
vpcInfoProvider networking.VPCInfoProvider, controllerConfig config.ControllerConfig,
vpcInfoProvider networking.VPCInfoProvider, elbv2TaggingManager elbv2deploy.TaggingManager, controllerConfig config.ControllerConfig,
backendSGProvider networking.BackendSGProvider, sgResolver networking.SecurityGroupResolver, logger logr.Logger) *serviceReconciler {

annotationParser := annotations.NewSuffixAnnotationParser(serviceAnnotationPrefix)
trackingProvider := tracking.NewDefaultProvider(serviceTagPrefix, controllerConfig.ClusterName)
elbv2TaggingManager := elbv2.NewDefaultTaggingManager(cloud.ELBV2(), cloud.VpcID(), controllerConfig.FeatureGates, cloud.RGT(), logger)
serviceUtils := service.NewServiceUtils(annotationParser, serviceFinalizer, controllerConfig.ServiceConfig.LoadBalancerClass, controllerConfig.FeatureGates)
modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider,
elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags,
controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils,
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger)
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, elbv2TaggingManager, controllerConfig, serviceTagPrefix, logger)
return &serviceReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"os"
elbv2deploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/elbv2"

"github.com/go-logr/logr"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -112,11 +113,12 @@ func main() {
backendSGProvider := networking.NewBackendSGProvider(controllerCFG.ClusterName, controllerCFG.BackendSecurityGroup,
cloud.VpcID(), cloud.EC2(), mgr.GetClient(), controllerCFG.DefaultTags, ctrl.Log.WithName("backend-sg-provider"))
sgResolver := networking.NewDefaultSecurityGroupResolver(cloud.EC2(), cloud.VpcID())
elbv2TaggingManager := elbv2deploy.NewDefaultTaggingManager(cloud.ELBV2(), cloud.VpcID(), controllerCFG.FeatureGates, cloud.RGT(), ctrl.Log)
ingGroupReconciler := ingress.NewGroupReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("ingress"),
finalizerManager, sgManager, sgReconciler, subnetResolver,
finalizerManager, sgManager, sgReconciler, subnetResolver, elbv2TaggingManager,
controllerCFG, backendSGProvider, sgResolver, ctrl.Log.WithName("controllers").WithName("ingress"))
svcReconciler := service.NewServiceReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("service"),
finalizerManager, sgManager, sgReconciler, subnetResolver, vpcInfoProvider,
finalizerManager, sgManager, sgReconciler, subnetResolver, vpcInfoProvider, elbv2TaggingManager,
controllerCFG, backendSGProvider, sgResolver, ctrl.Log.WithName("controllers").WithName("service"))
tgbReconciler := elbv2controller.NewTargetGroupBindingReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("targetGroupBinding"),
finalizerManager, tgbResManager,
Expand Down
64 changes: 55 additions & 9 deletions pkg/deploy/elbv2/tagging_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package elbv2

import (
"context"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/cache"

awssdk "github.com/aws/aws-sdk-go/aws"
elbv2sdk "github.com/aws/aws-sdk-go/service/elbv2"
Expand All @@ -18,6 +22,8 @@ import (
const (
// ELBV2 API supports up to 20 resource per DescribeTags API call.
defaultDescribeTagsChunkSize = 20
// cache ttl for tags on ELB resources.
defaultResourceTagsCacheTTL = 20 * time.Minute
)

// LoadBalancer with it's tags.
Expand Down Expand Up @@ -103,6 +109,8 @@ func NewDefaultTaggingManager(elbv2Client services.ELBV2, vpcID string, featureG
featureGates: featureGates,
logger: logger,
describeTagsChunkSize: defaultDescribeTagsChunkSize,
resourceTagsCache: cache.NewExpiring(),
resourceTagsCacheTTL: defaultResourceTagsCacheTTL,
rgt: rgt,
}
}
Expand All @@ -117,7 +125,11 @@ type defaultTaggingManager struct {
featureGates config.FeatureGates
logger logr.Logger
describeTagsChunkSize int
rgt services.RGT
// cache for tags on ELB resources.
resourceTagsCache *cache.Expiring
resourceTagsCacheTTL time.Duration
resourceTagsCacheMutex sync.RWMutex
rgt services.RGT
}

func (m *defaultTaggingManager) ReconcileTags(ctx context.Context, arn string, desiredTags map[string]string, opts ...ReconcileTagsOption) error {
Expand All @@ -128,7 +140,7 @@ func (m *defaultTaggingManager) ReconcileTags(ctx context.Context, arn string, d
reconcileOpts.ApplyOptions(opts)
currentTags := reconcileOpts.CurrentTags
if currentTags == nil {
tagsByARN, err := m.describeResourceTagsNative(ctx, []string{arn})
tagsByARN, err := m.describeResourceTags(ctx, []string{arn})
if err != nil {
return err
}
Expand All @@ -153,6 +165,7 @@ func (m *defaultTaggingManager) ReconcileTags(ctx context.Context, arn string, d
if _, err := m.elbv2Client.AddTagsWithContext(ctx, req); err != nil {
return err
}
m.invalidateResourceTagsCache(arn)
m.logger.Info("added resource tags",
"arn", arn)
}
Expand All @@ -170,6 +183,7 @@ func (m *defaultTaggingManager) ReconcileTags(ctx context.Context, arn string, d
if _, err := m.elbv2Client.RemoveTagsWithContext(ctx, req); err != nil {
return err
}
m.invalidateResourceTagsCache(arn)
m.logger.Info("removed resource tags",
"arn", arn)
}
Expand All @@ -193,7 +207,7 @@ func (m *defaultTaggingManager) ListListeners(ctx context.Context, lbARN string)
}
var tagsByARN map[string]map[string]string
if m.featureGates.Enabled(config.ListenerRulesTagging) {
tagsByARN, err = m.describeResourceTagsNative(ctx, lsARNs)
tagsByARN, err = m.describeResourceTags(ctx, lsARNs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -226,7 +240,7 @@ func (m *defaultTaggingManager) ListListenerRules(ctx context.Context, lsARN str
}
var tagsByARN map[string]map[string]string
if m.featureGates.Enabled(config.ListenerRulesTagging) {
tagsByARN, err = m.describeResourceTagsNative(ctx, lrARNs)
tagsByARN, err = m.describeResourceTags(ctx, lrARNs)
if err != nil {
return nil, err
}
Expand All @@ -242,6 +256,7 @@ func (m *defaultTaggingManager) ListListenerRules(ctx context.Context, lsARN str
return sdkLRs, err
}

// TODO: we can refactor this by store provisioned LB's ARN as annotations on Ingress/Service, thus avoid this heavy lookup calls when RGT is not available.
func (m *defaultTaggingManager) ListLoadBalancers(ctx context.Context, tagFilters ...tracking.TagFilter) ([]LoadBalancerWithTags, error) {
if m.featureGates.Enabled(config.EnableRGTAPI) {
return m.listLoadBalancersRGT(ctx, tagFilters)
Expand All @@ -254,7 +269,6 @@ func (m *defaultTaggingManager) ListTargetGroups(ctx context.Context, tagFilters
return m.listTargetGroupsRGT(ctx, tagFilters)
}
return m.listTargetGroupsNative(ctx, tagFilters)

}

func (m *defaultTaggingManager) listLoadBalancersRGT(ctx context.Context, tagFilters []tracking.TagFilter) ([]LoadBalancerWithTags, error) {
Expand Down Expand Up @@ -311,7 +325,7 @@ func (m *defaultTaggingManager) listLoadBalancersNative(ctx context.Context, tag
lbARNsWithinVPC = append(lbARNsWithinVPC, lbARN)
lbByARNWithinVPC[lbARN] = lb
}
tagsByARN, err := m.describeResourceTagsNative(ctx, lbARNsWithinVPC)
tagsByARN, err := m.describeResourceTags(ctx, lbARNsWithinVPC)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -391,7 +405,7 @@ func (m *defaultTaggingManager) listTargetGroupsNative(ctx context.Context, tagF
tgARNsWithinVPC = append(tgARNsWithinVPC, tgARN)
tgByARNWithinVPC[tgARN] = tg
}
tagsByARN, err := m.describeResourceTagsNative(ctx, tgARNsWithinVPC)
tagsByARN, err := m.describeResourceTags(ctx, tgARNsWithinVPC)
if err != nil {
return nil, err
}
Expand All @@ -416,9 +430,34 @@ func (m *defaultTaggingManager) listTargetGroupsNative(ctx context.Context, tagF
return matchedTGs, nil
}

// describeResourceTagsNative describes tags for elbv2 resources.
func (m *defaultTaggingManager) describeResourceTags(ctx context.Context, arns []string) (map[string]map[string]string, error) {
m.resourceTagsCacheMutex.Lock()
defer m.resourceTagsCacheMutex.Unlock()

tagsByARN := make(map[string]map[string]string, len(arns))
var arnsWithoutTagsCache []string
for _, arn := range arns {
if rawTagsCacheItem, exists := m.resourceTagsCache.Get(arn); exists {
tagsCacheItem := rawTagsCacheItem.(map[string]string)
tagsByARN[arn] = tagsCacheItem
} else {
arnsWithoutTagsCache = append(arnsWithoutTagsCache, arn)
}
}
tagsByARNFromAWS, err := m.describeResourceTagsFromAWS(ctx, arnsWithoutTagsCache)
if err != nil {
return nil, err
}
for arn, tags := range tagsByARNFromAWS {
m.resourceTagsCache.Set(arn, tags, m.resourceTagsCacheTTL)
tagsByARN[arn] = tags
}
return tagsByARN, nil
}

// describeResourceTagsFromAWS describes tags for elbv2 resources.
// returns tags indexed by resource ARN.
func (m *defaultTaggingManager) describeResourceTagsNative(ctx context.Context, arns []string) (map[string]map[string]string, error) {
func (m *defaultTaggingManager) describeResourceTagsFromAWS(ctx context.Context, arns []string) (map[string]map[string]string, error) {
tagsByARN := make(map[string]map[string]string, len(arns))
arnsChunks := algorithm.ChunkStrings(arns, m.describeTagsChunkSize)
for _, arnsChunk := range arnsChunks {
Expand All @@ -436,6 +475,13 @@ func (m *defaultTaggingManager) describeResourceTagsNative(ctx context.Context,
return tagsByARN, nil
}

func (m *defaultTaggingManager) invalidateResourceTagsCache(arn string) {
m.resourceTagsCacheMutex.Lock()
defer m.resourceTagsCacheMutex.Unlock()

m.resourceTagsCache.Delete(arn)
}

// convert tags into AWS SDK tag presentation.
func convertTagsToSDKTags(tags map[string]string) []*elbv2sdk.Tag {
if len(tags) == 0 {
Expand Down
Loading

0 comments on commit c77b6eb

Please sign in to comment.