diff --git a/pkg/providers/v1/aws.go b/pkg/providers/v1/aws.go index 703ec53830..091cf26841 100644 --- a/pkg/providers/v1/aws.go +++ b/pkg/providers/v1/aws.go @@ -389,9 +389,9 @@ type Cloud struct { // Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance selfAWSInstance *awsInstance - instanceCache instanceCache - zoneCache zoneCache - topologyCache topologyCache + instanceCache instanceCache + zoneCache zoneCache + instanceTopologyManager *instanceTopologyManager clientBuilder cloudprovider.ControllerClientBuilder kubeClient clientset.Interface @@ -619,7 +619,7 @@ func newAWSCloud2(cfg config.CloudConfig, awsServices Services, provider config. } awsCloud.instanceCache.cloud = awsCloud awsCloud.zoneCache.cloud = awsCloud - awsCloud.topologyCache.cloud = awsCloud + awsCloud.instanceTopologyManager = newInstanceTopologyManager(awsCloud.ec2) tagged := cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "" if cfg.Global.VPC != "" && (cfg.Global.SubnetID != "" || cfg.Global.RoleARN != "") && tagged { diff --git a/pkg/providers/v1/aws_ec2.go b/pkg/providers/v1/aws_ec2.go index 37c558345a..d3c66d435a 100644 --- a/pkg/providers/v1/aws_ec2.go +++ b/pkg/providers/v1/aws_ec2.go @@ -31,13 +31,16 @@ type awsSdkEC2 struct { } func (s *awsSdkEC2) DescribeInstanceTopology(request *ec2.DescribeInstanceTopologyInput) ([]*ec2.InstanceTopology, error) { - resp, err := s.ec2.DescribeInstanceTopology(request) - if err != nil { - return nil, fmt.Errorf("error describe AWS Instance Topology: %q", err) - } else if len(resp.Instances) == 0 { - return []*ec2.InstanceTopology{}, nil - } - return resp.Instances, err + topologies := []*ec2.InstanceTopology{} + + err := s.ec2.DescribeInstanceTopologyPages(request, + func(page *ec2.DescribeInstanceTopologyOutput, lastPage bool) bool { + topologies = append(topologies, page.Instances...) + // Don't short-circuit. Just go through all of the pages + return false + }) + + return topologies, err } // Implementation of EC2.Instances diff --git a/pkg/providers/v1/instances_v2.go b/pkg/providers/v1/instances_v2.go index a99f6340f2..42e7e546c7 100644 --- a/pkg/providers/v1/instances_v2.go +++ b/pkg/providers/v1/instances_v2.go @@ -22,10 +22,11 @@ package aws import ( "context" + "strconv" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" cloudprovider "k8s.io/cloud-provider" - "strconv" ) func (c *Cloud) getProviderID(ctx context.Context, node *v1.Node) (string, error) { @@ -63,21 +64,28 @@ func (c *Cloud) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, erro return c.InstanceShutdownByProviderID(ctx, providerID) } -func (c *Cloud) getAdditionalLabels(zoneName string, instanceID string, instanceType string, region string) (map[string]string, error) { +func (c *Cloud) getAdditionalLabels(zoneName string, instanceID string, instanceType string, + region string, existingLabels map[string]string) (map[string]string, error) { additionalLabels := map[string]string{} - // Add the zone ID to the additional labels - zoneID, err := c.zoneCache.getZoneIDByZoneName(zoneName) - if err != nil { - return nil, err + // If zone ID label is already set, skip. + if _, ok := existingLabels[LabelZoneID]; !ok { + // Add the zone ID to the additional labels + zoneID, err := c.zoneCache.getZoneIDByZoneName(zoneName) + if err != nil { + return nil, err + } + + additionalLabels[LabelZoneID] = zoneID } - additionalLabels[LabelZoneID] = zoneID + // If topology labels are already set, skip. + if _, ok := existingLabels[LabelNetworkNode+"1"]; !ok { + nodeTopology, err := c.instanceTopologyManager.getNodeTopology(instanceType, region, instanceID) + if err != nil { + return nil, err + } - nodeTopology, err := c.topologyCache.getNodeTopology(instanceType, region, instanceID) - if err != nil { - return nil, err - } else if nodeTopology != nil { for index, networkNode := range nodeTopology.NetworkNodes { layer := index + 1 label := LabelNetworkNode + strconv.Itoa(layer) @@ -119,7 +127,7 @@ func (c *Cloud) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprov return nil, err } - additionalLabels, err := c.getAdditionalLabels(zone.FailureDomain, string(instanceID), instanceType, zone.Region) + additionalLabels, err := c.getAdditionalLabels(zone.FailureDomain, string(instanceID), instanceType, zone.Region, node.Labels) if err != nil { return nil, err } diff --git a/pkg/providers/v1/topology.go b/pkg/providers/v1/topology.go index b3aa695f65..8a75386d93 100644 --- a/pkg/providers/v1/topology.go +++ b/pkg/providers/v1/topology.go @@ -17,58 +17,98 @@ limitations under the License. package aws import ( - "slices" - "strings" - "sync" + "time" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" + "k8s.io/client-go/tools/cache" + "k8s.io/cloud-provider-aws/pkg/providers/v1/iface" "k8s.io/klog/v2" ) -type topologyCache struct { - cloud *Cloud - mutex sync.RWMutex - unsupportedInstance []string - unsupportedRegion []string +const instanceTopologyManagerCacheTimeout = 24 * time.Hour + +// stringKeyFunc is a string as cache key function +func topStringKeyFunc(obj interface{}) (string, error) { + // Type should already be a string, so just return as is. + return obj.(string), nil +} + +type instanceTopologyManager struct { + ec2 iface.EC2 + unsupportedInstanceType cache.Store + unsupportedRegion cache.Store } -func (t *topologyCache) getNodeTopology(instanceType string, region string, instanceID string) (*ec2.InstanceTopology, error) { - t.mutex.RLock() - defer t.mutex.RUnlock() +func newInstanceTopologyManager(ec2 iface.EC2) *instanceTopologyManager { + return &instanceTopologyManager{ + ec2: ec2, + // These should change very infrequently, if ever, so checking once a day sounds fair. + unsupportedRegion: cache.NewTTLStore(topStringKeyFunc, instanceTopologyManagerCacheTimeout), + unsupportedInstanceType: cache.NewTTLStore(topStringKeyFunc, instanceTopologyManagerCacheTimeout), + } +} + +func (t *instanceTopologyManager) getNodeTopology(instanceType string, region string, instanceID string) (*ec2.InstanceTopology, error) { if t.mightSupportTopology(instanceType, region) { - topologyRequest := &ec2.DescribeInstanceTopologyInput{InstanceIds: []*string{&instanceID}} - topology, err := t.cloud.ec2.DescribeInstanceTopology(topologyRequest) + request := &ec2.DescribeInstanceTopologyInput{InstanceIds: []*string{&instanceID}} + topologies, err := t.ec2.DescribeInstanceTopology(request) if err != nil { - klog.Errorf("Error describing instance topology: %q", err) - if strings.Contains(err.Error(), "The functionality you requested is not available in this region") { - t.unsupportedRegion = append(t.unsupportedRegion, region) - return nil, nil - } else if strings.Contains(err.Error(), "You are not authorized to perform this operation") { - // gracefully handle the DecribeInstanceTopology access missing error - klog.Infof("Not authorized to perform: ec2:DescribeInstanceTopology, permission missing") - return nil, nil + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case "UnsupportedOperation": + klog.Infof("ec2:DescribeInstanceTopology is not available in %s.", region) + // If region is unsupported, track it to avoid making the call in the future. + t.addUnsupportedRegion(region) + return &ec2.InstanceTopology{}, nil + case "UnauthorizedOperation": + // gracefully handle the DecribeInstanceTopology access missing error + klog.Warningf("Not authorized to perform: ec2:DescribeInstanceTopology, permission missing") + return &ec2.InstanceTopology{}, nil + } } + + // Unhandled error + klog.Errorf("Error describing instance topology: %q", err) return nil, err - } else if len(topology) == 0 { - // instanceType is not support topology info and the result is empty - t.unsupportedInstance = append(t.unsupportedInstance, instanceType) + } else if len(topologies) == 0 { + // If no topology is returned, track the instance type as unsupported + klog.Infof("Instance type %s unsupported for getting instance topology", instanceType) + t.addUnsupportedInstanceType(instanceType) + return &ec2.InstanceTopology{}, nil } - return topology[0], nil + + return topologies[0], nil } - return nil, nil + return &ec2.InstanceTopology{}, nil } -func (t *topologyCache) mightSupportTopology(instanceType string, region string) bool { - // Initialize the map if it's unset - if t.unsupportedInstance == nil { - t.unsupportedInstance = []string{} +func (t *instanceTopologyManager) addUnsupportedRegion(region string) { + err := t.unsupportedRegion.Add(region) + if err != nil { + klog.Errorf("Failed to cache unsupported region: %q", err) } - if t.unsupportedRegion == nil { - t.unsupportedRegion = []string{} +} + +func (t *instanceTopologyManager) addUnsupportedInstanceType(instanceType string) { + err := t.unsupportedInstanceType.Add(instanceType) + if err != nil { + klog.Errorf("Failed to cache unsupported instance type: %q", err) } - // if both instanceType and region are not in unsupported cache, the instance type and region might be supported - // or we haven't check the supportness and cache it yet. If they are unsupported and not cached, we will run - // describeTopology api once for them - // Initialize the map if it's unset - return !slices.Contains(t.unsupportedInstance, instanceType) && !slices.Contains(t.unsupportedRegion, region) +} + +func (t *instanceTopologyManager) mightSupportTopology(instanceType string, region string) bool { + if _, exists, err := t.unsupportedRegion.GetByKey(region); exists { + return false + } else if err != nil { + klog.Errorf("Failed to get cached unsupported region: %q:", err) + } + + if _, exists, err := t.unsupportedInstanceType.GetByKey(instanceType); exists { + return false + } else if err != nil { + klog.Errorf("Failed to get cached unsupported instance type: %q:", err) + } + + return true }