Skip to content

Commit

Permalink
Refactors setting node network topology labels
Browse files Browse the repository at this point in the history
  • Loading branch information
mmerkes committed Oct 24, 2024
1 parent 6baf290 commit 9f46545
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 60 deletions.
8 changes: 4 additions & 4 deletions pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ type Cloud struct {
// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance
selfAWSInstance *awsInstance

instanceCache instanceCache
zoneCache zoneCache
topologyCache topologyCache
instanceCache instanceCache
zoneCache zoneCache
instanceTopologyManager *instanceTopologyManager

clientBuilder cloudprovider.ControllerClientBuilder
kubeClient clientset.Interface
Expand Down Expand Up @@ -619,7 +619,7 @@ func newAWSCloud2(cfg config.CloudConfig, awsServices Services, provider config.
}
awsCloud.instanceCache.cloud = awsCloud
awsCloud.zoneCache.cloud = awsCloud
awsCloud.topologyCache.cloud = awsCloud
awsCloud.instanceTopologyManager = newInstanceTopologyManager(awsCloud.ec2)

tagged := cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != ""
if cfg.Global.VPC != "" && (cfg.Global.SubnetID != "" || cfg.Global.RoleARN != "") && tagged {
Expand Down
17 changes: 10 additions & 7 deletions pkg/providers/v1/aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ type awsSdkEC2 struct {
}

func (s *awsSdkEC2) DescribeInstanceTopology(request *ec2.DescribeInstanceTopologyInput) ([]*ec2.InstanceTopology, error) {
resp, err := s.ec2.DescribeInstanceTopology(request)
if err != nil {
return nil, fmt.Errorf("error describe AWS Instance Topology: %q", err)
} else if len(resp.Instances) == 0 {
return []*ec2.InstanceTopology{}, nil
}
return resp.Instances, err
topologies := []*ec2.InstanceTopology{}

err := s.ec2.DescribeInstanceTopologyPages(request,
func(page *ec2.DescribeInstanceTopologyOutput, lastPage bool) bool {
topologies = append(topologies, page.Instances...)
// Don't short-circuit. Just go through all of the pages
return false
})

return topologies, err
}

// Implementation of EC2.Instances
Expand Down
32 changes: 20 additions & 12 deletions pkg/providers/v1/instances_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ package aws

import (
"context"
"strconv"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
"strconv"
)

func (c *Cloud) getProviderID(ctx context.Context, node *v1.Node) (string, error) {
Expand Down Expand Up @@ -63,21 +64,28 @@ func (c *Cloud) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, erro
return c.InstanceShutdownByProviderID(ctx, providerID)
}

func (c *Cloud) getAdditionalLabels(zoneName string, instanceID string, instanceType string, region string) (map[string]string, error) {
func (c *Cloud) getAdditionalLabels(zoneName string, instanceID string, instanceType string,
region string, existingLabels map[string]string) (map[string]string, error) {
additionalLabels := map[string]string{}

// Add the zone ID to the additional labels
zoneID, err := c.zoneCache.getZoneIDByZoneName(zoneName)
if err != nil {
return nil, err
// If zone ID label is already set, skip.
if _, ok := existingLabels[LabelZoneID]; !ok {
// Add the zone ID to the additional labels
zoneID, err := c.zoneCache.getZoneIDByZoneName(zoneName)
if err != nil {
return nil, err
}

additionalLabels[LabelZoneID] = zoneID
}

additionalLabels[LabelZoneID] = zoneID
// If topology labels are already set, skip.
if _, ok := existingLabels[LabelNetworkNode+"1"]; !ok {
nodeTopology, err := c.instanceTopologyManager.getNodeTopology(instanceType, region, instanceID)
if err != nil {
return nil, err
}

nodeTopology, err := c.topologyCache.getNodeTopology(instanceType, region, instanceID)
if err != nil {
return nil, err
} else if nodeTopology != nil {
for index, networkNode := range nodeTopology.NetworkNodes {
layer := index + 1
label := LabelNetworkNode + strconv.Itoa(layer)
Expand Down Expand Up @@ -119,7 +127,7 @@ func (c *Cloud) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprov
return nil, err
}

additionalLabels, err := c.getAdditionalLabels(zone.FailureDomain, string(instanceID), instanceType, zone.Region)
additionalLabels, err := c.getAdditionalLabels(zone.FailureDomain, string(instanceID), instanceType, zone.Region, node.Labels)
if err != nil {
return nil, err
}
Expand Down
114 changes: 77 additions & 37 deletions pkg/providers/v1/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,98 @@ limitations under the License.
package aws

import (
"slices"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/client-go/tools/cache"
"k8s.io/cloud-provider-aws/pkg/providers/v1/iface"
"k8s.io/klog/v2"
)

type topologyCache struct {
cloud *Cloud
mutex sync.RWMutex
unsupportedInstance []string
unsupportedRegion []string
const instanceTopologyManagerCacheTimeout = 24 * time.Hour

// stringKeyFunc is a string as cache key function
func topStringKeyFunc(obj interface{}) (string, error) {
// Type should already be a string, so just return as is.
return obj.(string), nil
}

type instanceTopologyManager struct {
ec2 iface.EC2
unsupportedInstanceType cache.Store
unsupportedRegion cache.Store
}

func (t *topologyCache) getNodeTopology(instanceType string, region string, instanceID string) (*ec2.InstanceTopology, error) {
t.mutex.RLock()
defer t.mutex.RUnlock()
func newInstanceTopologyManager(ec2 iface.EC2) *instanceTopologyManager {
return &instanceTopologyManager{
ec2: ec2,
// These should change very infrequently, if ever, so checking once a day sounds fair.
unsupportedRegion: cache.NewTTLStore(topStringKeyFunc, instanceTopologyManagerCacheTimeout),
unsupportedInstanceType: cache.NewTTLStore(topStringKeyFunc, instanceTopologyManagerCacheTimeout),
}
}

func (t *instanceTopologyManager) getNodeTopology(instanceType string, region string, instanceID string) (*ec2.InstanceTopology, error) {
if t.mightSupportTopology(instanceType, region) {
topologyRequest := &ec2.DescribeInstanceTopologyInput{InstanceIds: []*string{&instanceID}}
topology, err := t.cloud.ec2.DescribeInstanceTopology(topologyRequest)
request := &ec2.DescribeInstanceTopologyInput{InstanceIds: []*string{&instanceID}}
topologies, err := t.ec2.DescribeInstanceTopology(request)
if err != nil {
klog.Errorf("Error describing instance topology: %q", err)
if strings.Contains(err.Error(), "The functionality you requested is not available in this region") {
t.unsupportedRegion = append(t.unsupportedRegion, region)
return nil, nil
} else if strings.Contains(err.Error(), "You are not authorized to perform this operation") {
// gracefully handle the DecribeInstanceTopology access missing error
klog.Infof("Not authorized to perform: ec2:DescribeInstanceTopology, permission missing")
return nil, nil
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case "UnsupportedOperation":
klog.Infof("ec2:DescribeInstanceTopology is not available in %s.", region)
// If region is unsupported, track it to avoid making the call in the future.
t.addUnsupportedRegion(region)
return &ec2.InstanceTopology{}, nil
case "UnauthorizedOperation":
// gracefully handle the DecribeInstanceTopology access missing error
klog.Warningf("Not authorized to perform: ec2:DescribeInstanceTopology, permission missing")
return &ec2.InstanceTopology{}, nil
}
}

// Unhandled error
klog.Errorf("Error describing instance topology: %q", err)
return nil, err
} else if len(topology) == 0 {
// instanceType is not support topology info and the result is empty
t.unsupportedInstance = append(t.unsupportedInstance, instanceType)
} else if len(topologies) == 0 {
// If no topology is returned, track the instance type as unsupported
klog.Infof("Instance type %s unsupported for getting instance topology", instanceType)
t.addUnsupportedInstanceType(instanceType)
return &ec2.InstanceTopology{}, nil
}
return topology[0], nil

return topologies[0], nil
}
return nil, nil
return &ec2.InstanceTopology{}, nil
}

func (t *topologyCache) mightSupportTopology(instanceType string, region string) bool {
// Initialize the map if it's unset
if t.unsupportedInstance == nil {
t.unsupportedInstance = []string{}
func (t *instanceTopologyManager) addUnsupportedRegion(region string) {
err := t.unsupportedRegion.Add(region)
if err != nil {
klog.Errorf("Failed to cache unsupported region: %q", err)
}
if t.unsupportedRegion == nil {
t.unsupportedRegion = []string{}
}

func (t *instanceTopologyManager) addUnsupportedInstanceType(instanceType string) {
err := t.unsupportedInstanceType.Add(instanceType)
if err != nil {
klog.Errorf("Failed to cache unsupported instance type: %q", err)
}
// if both instanceType and region are not in unsupported cache, the instance type and region might be supported
// or we haven't check the supportness and cache it yet. If they are unsupported and not cached, we will run
// describeTopology api once for them
// Initialize the map if it's unset
return !slices.Contains(t.unsupportedInstance, instanceType) && !slices.Contains(t.unsupportedRegion, region)
}

func (t *instanceTopologyManager) mightSupportTopology(instanceType string, region string) bool {
if _, exists, err := t.unsupportedRegion.GetByKey(region); exists {
return false
} else if err != nil {
klog.Errorf("Failed to get cached unsupported region: %q:", err)
}

if _, exists, err := t.unsupportedInstanceType.GetByKey(instanceType); exists {
return false
} else if err != nil {
klog.Errorf("Failed to get cached unsupported instance type: %q:", err)
}

return true
}

0 comments on commit 9f46545

Please sign in to comment.