Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 63 additions & 46 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ limitations under the License.
package aws

import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
autoscalingtypes "github.com/aws/aws-sdk-go-v2/service/autoscaling/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
klog "k8s.io/klog/v2"
)
Expand All @@ -36,12 +39,20 @@ const (
placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled"
)

// getStringValue returns the value of a string pointer or empty string if nil
func getStringValue(s *string) string {
if s == nil {
return ""
}
return *s
}

type asgCache struct {
registeredAsgs map[AwsRef]*asg
asgToInstances map[AwsRef][]AwsInstanceRef
instanceToAsg map[AwsInstanceRef]*asg
instanceStatus map[AwsInstanceRef]*string
instanceLifecycle map[AwsInstanceRef]*string
instanceLifecycle map[AwsInstanceRef]autoscalingtypes.LifecycleState
asgInstanceTypeCache *instanceTypeExpirationStore
mutex sync.Mutex
awsService *awsWrapper
Expand All @@ -60,8 +71,8 @@ type launchTemplate struct {
type mixedInstancesPolicy struct {
launchTemplate *launchTemplate
instanceTypesOverrides []string
instanceRequirementsOverrides *autoscaling.InstanceRequirements
instanceRequirements *ec2.InstanceRequirements
instanceRequirementsOverrides *autoscalingtypes.InstanceRequirements
instanceRequirements *ec2types.InstanceRequirements
}

type asg struct {
Expand All @@ -76,7 +87,7 @@ type asg struct {
LaunchConfigurationName string
LaunchTemplate *launchTemplate
MixedInstancesPolicy *mixedInstancesPolicy
Tags []*autoscaling.TagDescription
Tags []autoscalingtypes.TagDescription
}

func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) {
Expand All @@ -86,7 +97,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
instanceToAsg: make(map[AwsInstanceRef]*asg),
instanceStatus: make(map[AwsInstanceRef]*string),
instanceLifecycle: make(map[AwsInstanceRef]*string),
instanceLifecycle: make(map[AwsInstanceRef]autoscalingtypes.LifecycleState),
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
Expand Down Expand Up @@ -243,12 +254,12 @@ func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
return nil, fmt.Errorf("could not find instance %v", ref)
}

func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) {
func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (autoscalingtypes.LifecycleState, error) {
if lifecycle, found := m.instanceLifecycle[ref]; found {
return lifecycle, nil
}

return nil, fmt.Errorf("could not find instance %v", ref)
return "", fmt.Errorf("could not find instance %v", ref)
}

func (m *asgCache) SetAsgSize(asg *asg, size int) error {
Expand All @@ -259,14 +270,15 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
}

func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
ctx := context.Background()
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
DesiredCapacity: aws.Int32(int32(size)),
HonorCooldown: aws.Bool(false),
}
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
start := time.Now()
_, err := m.awsService.SetDesiredCapacity(params)
_, err := m.awsService.SetDesiredCapacity(ctx, params)
observeAWSRequest("SetDesiredCapacity", err, start)
if err != nil {
return err
Expand Down Expand Up @@ -356,21 +368,21 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return err
}

if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
if lifecycle == autoscalingtypes.LifecycleStateTerminated ||
lifecycle == autoscalingtypes.LifecycleStateTerminating ||
lifecycle == autoscalingtypes.LifecycleStateTerminatingWait ||
lifecycle == autoscalingtypes.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, lifecycle)
continue
}

ctx := context.Background()
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(ctx, params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
Expand Down Expand Up @@ -421,7 +433,7 @@ func (m *asgCache) regenerate() error {
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
newInstanceLifecycleMap := make(map[AwsInstanceRef]*string)
newInstanceLifecycleMap := make(map[AwsInstanceRef]autoscalingtypes.LifecycleState)

// Fetch details of all ASGs
refreshNames := m.buildAsgNames()
Expand All @@ -447,7 +459,8 @@ func (m *asgCache) regenerate() error {

// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
for i := range groups {
group := &groups[i]
asg, err := m.buildAsgFromAWS(group)
if err != nil {
return err
Expand All @@ -458,10 +471,11 @@ func (m *asgCache) regenerate() error {

newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))

for i, instance := range group.Instances {
for j := range group.Instances {
instance := &group.Instances[j]
ref := m.buildInstanceRefFromAWS(instance)
newInstanceToAsgCache[ref] = asg
newAsgToInstancesCache[asg.AwsRef][i] = ref
newAsgToInstancesCache[asg.AwsRef][j] = ref
newInstanceStatusMap[ref] = instance.HealthStatus
newInstanceLifecycleMap[ref] = instance.LifecycleState
}
Expand Down Expand Up @@ -497,9 +511,10 @@ func (m *asgCache) regenerate() error {
return nil
}

func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
for _, g := range groups {
desired := *g.DesiredCapacity
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []autoscalingtypes.AutoScalingGroup) []autoscalingtypes.AutoScalingGroup {
for idx := range groups {
g := &groups[idx]
desired := int64(*g.DesiredCapacity)
realInstances := int64(len(g.Instances))
if desired <= realInstances {
continue
Expand All @@ -519,23 +534,25 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut

for i := realInstances; i < desired; i++ {
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
g.Instances = append(g.Instances, &autoscaling.Instance{
az := g.AvailabilityZones[0]
g.Instances = append(g.Instances, autoscalingtypes.Instance{
InstanceId: &id,
AvailabilityZone: g.AvailabilityZones[0],
AvailabilityZone: &az,
HealthStatus: &healthStatus,
})
}
}
return groups
}

func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
func (m *asgCache) isNodeGroupAvailable(group *autoscalingtypes.AutoScalingGroup) (bool, error) {
ctx := context.Background()
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: group.AutoScalingGroupName,
}

start := time.Now()
response, err := m.awsService.DescribeScalingActivities(input)
response, err := m.awsService.DescribeScalingActivities(ctx, input)
observeAWSRequest("DescribeScalingActivities", err, start)
if err != nil {
return true, err // If we can't describe the scaling activities we assume the node group is available
Expand All @@ -547,8 +564,8 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
lut := a.lastUpdateTime
if activity.StartTime.Before(lut) {
break
} else if *activity.StatusCode == "Failed" {
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
} else if activity.StatusCode == autoscalingtypes.ScalingActivityStatusCodeFailed {
klog.Warningf("ASG %s scaling failed with %v", asgRef.Name, activity)
return false, nil
}
} else {
Expand All @@ -558,11 +575,11 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
return true, nil
}

func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
func (m *asgCache) buildAsgFromAWS(g *autoscalingtypes.AutoScalingGroup) (*asg, error) {
spec := dynamic.NodeGroupSpec{
Name: aws.StringValue(g.AutoScalingGroupName),
MinSize: int(aws.Int64Value(g.MinSize)),
MaxSize: int(aws.Int64Value(g.MaxSize)),
Name: *g.AutoScalingGroupName,
MinSize: int(*g.MinSize),
MaxSize: int(*g.MaxSize),
SupportScaleToZero: scaleToZeroSupported,
}

Expand All @@ -575,9 +592,9 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
minSize: spec.MinSize,
maxSize: spec.MaxSize,

curSize: int(aws.Int64Value(g.DesiredCapacity)),
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
curSize: int(*g.DesiredCapacity),
AvailabilityZones: g.AvailabilityZones,
LaunchConfigurationName: getStringValue(g.LaunchConfigurationName),
Tags: g.Tags,
}

Expand All @@ -586,7 +603,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
}

if g.MixedInstancesPolicy != nil {
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
getInstanceTypes := func(overrides []autoscalingtypes.LaunchTemplateOverrides) []string {
res := []string{}
for _, override := range overrides {
if override.InstanceType != nil {
Expand All @@ -596,7 +613,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
return res
}

getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
getInstanceTypeRequirements := func(overrides []autoscalingtypes.LaunchTemplateOverrides) *autoscalingtypes.InstanceRequirements {
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
return overrides[0].InstanceRequirements
}
Expand Down Expand Up @@ -625,8 +642,8 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
return asg, nil
}

func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2.InstanceRequirements, error) {
instanceRequirements := &ec2.InstanceRequirements{}
func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2types.InstanceRequirements, error) {
instanceRequirements := &ec2types.InstanceRequirements{}
if policy.instanceRequirementsOverrides != nil {
var err error
instanceRequirements, err = m.awsService.getEC2RequirementsFromAutoscaling(policy.instanceRequirementsOverrides)
Expand All @@ -646,11 +663,11 @@ func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixed
return instanceRequirements, nil
}

func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef {
providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId))
func (m *asgCache) buildInstanceRefFromAWS(instance *autoscalingtypes.Instance) AwsInstanceRef {
providerID := fmt.Sprintf("aws:///%s/%s", getStringValue(instance.AvailabilityZone), getStringValue(instance.InstanceId))
return AwsInstanceRef{
ProviderID: providerID,
Name: aws.StringValue(instance.InstanceId),
Name: getStringValue(instance.InstanceId),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
if opts.AWSUseStaticInstanceList {
klog.Warningf("Using static EC2 Instance Types, this list could be outdated. Last update time: %s", lastUpdateTime)
} else {
generatedInstanceTypes, err := GenerateEC2InstanceTypes(sdkProvider.session)
generatedInstanceTypes, err := GenerateEC2InstanceTypes(sdkProvider.cfg)
if err != nil {
klog.Errorf("Failed to generate AWS EC2 Instance Types: %v, falling back to static list with last update time: %s", err, lastUpdateTime)
}
Expand Down
Loading