Skip to content

Commit

Permalink
Merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianmoisey committed Jul 10, 2024
2 parents edc8091 + 7dfb90a commit 74e7c5f
Show file tree
Hide file tree
Showing 81 changed files with 4,658 additions and 413 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ The following startup parameters are supported for cluster autoscaler:
| `ok-total-unready-count` | Number of allowed unready nodes, irrespective of max-total-unready-percentage | 3
| `max-node-provision-time` | Maximum time CA waits for node to be provisioned | 15 minutes
| `nodes` | sets min,max size and other configuration data for a node group in a format accepted by cloud provider. Can be used multiple times. Format: \<min>:\<max>:<other...> | ""
| `node-group-auto-discovery` | One or more definition(s) of node group auto-discovery.<br>A definition is expressed `<name of discoverer>:[<key>[=<value>]]`<br>The `aws`, `gce`, and `azure` cloud providers are currently supported. AWS matches by ASG tags, e.g. `asg:tag=tagKey,anotherTagKey`<br>GCE matches by IG name prefix, and requires you to specify min and max nodes per IG, e.g. `mig:namePrefix=pfx,min=0,max=10`<br> Azure matches by tags on VMSS, e.g. `label:foo=bar`, and will auto-detect `min` and `max` tags on the VMSS to set scaling limits.<br>Can be used multiple times | ""
| `node-group-auto-discovery` | One or more definition(s) of node group auto-discovery.<br>A definition is expressed `<name of discoverer>:[<key>[=<value>]]`<br>The `aws`, `gce`, and `azure` cloud providers are currently supported. AWS matches by ASG tags, e.g. `asg:tag=tagKey,anotherTagKey`<br>GCE matches by IG name prefix, and requires you to specify min and max nodes per IG, e.g. `mig:namePrefix=pfx,min=0,max=10`<br> Azure matches by VMSS tags, similar to AWS. And you can optionally specify a default min and max size for VMSSs, e.g. `label:tag=tagKey,anotherTagKey=bar,min=0,max=600`.<br>Can be used multiple times | ""
| `emit-per-nodegroup-metrics` | If true, emit per node group metrics. | false
| `estimator` | Type of resource estimator to be used in scale up | binpacking
| `expander` | Type of node group expander to be used in scale up. | random
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,8 @@ const (
// ProvisioningClassBestEffortAtomicScaleUp denotes that CA try to provision the capacity
// in an atomic manner.
ProvisioningClassBestEffortAtomicScaleUp string = "best-effort-atomic-scale-up.autoscaling.x-k8s.io"
// ProvisioningRequestPodAnnotationKey is a key used to annotate pods consuming provisioning request.
ProvisioningRequestPodAnnotationKey = "autoscaling.x-k8s.io/consume-provisioning-request"
// ProvisioningClassPodAnnotationKey is a key used to add annotation about Provisioning Class
ProvisioningClassPodAnnotationKey = "autoscaling.x-k8s.io/provisioning-class-name"
)
115 changes: 80 additions & 35 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,46 +308,78 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
// Check if there are any placeholder instances in the list.
if placeHolderInstancesCount > 0 {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s",
placeHolderInstancesCount, commonAsg.Name)

asgNames := []string{commonAsg.Name}
asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames)

if err != nil {
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}

activeInstancesInAsg := len(asgDetail[0].Instances)
desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)

// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG

err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)

if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}
if m.isPlaceholderInstance(instance) {
// skipping placeholder as placeholder instances don't exist
// and we have already reduced ASG size during placeholder check.
continue
}
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--

}
return nil
}
Expand Down Expand Up @@ -624,3 +656,16 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
func (m *asgCache) Cleanup() {
close(m.interrupt)
}

// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {

placeholderInstancesCount := 0
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
placeholderInstancesCount++

}
}
return placeholderInstancesCount
}
115 changes: 112 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ limitations under the License.
package aws

import (
"testing"

"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
Expand Down Expand Up @@ -739,3 +739,112 @@ func TestHasInstance(t *testing.T) {
assert.NoError(t, err)
assert.False(t, present)
}

func TestDeleteNodesWithPlaceholderAndStaleCache(t *testing.T) {
// This test validates the scenario where ASG cache is not in sync with Autoscaling configuration.
// we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002
// But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" )
// and placeholders, CAS will terminate only these 2 instances after reducing ASG size by the count of placeholders

a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:10:test-asg"}))
asgs := provider.NodeGroups()
commonAsg := &asg{
AwsRef: AwsRef{Name: asgs[0].Id()},
minSize: asgs[0].MinSize(),
maxSize: asgs[0].MaxSize(),
}

// desired capacity will be set as 6 as ASG has 4 placeholders
a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(6),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

// Look up the current number of instances...
var expectedInstancesCount int64 = 10
a.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0000", "i-0001", "i-0002", "i-0003", "i-0004", "i-0005"), false)

expectedInstancesCount = 4
}).Return(nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 10, initialSize)

var awsInstanceRefs []AwsInstanceRef
instanceToAsg := make(map[AwsInstanceRef]*asg)

var nodes []*apiv1.Node
for i := 3; i <= 9; i++ {
providerId := fmt.Sprintf("aws:///us-east-1a/i-placeholder-test-asg-%d", i)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: providerId,
},
}
nodes = append(nodes, node)
awsInstanceRef := AwsInstanceRef{
ProviderID: providerId,
Name: fmt.Sprintf("i-placeholder-test-asg-%d", i),
}
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
instanceToAsg[awsInstanceRef] = commonAsg
}

for i := 0; i <= 2; i++ {
providerId := fmt.Sprintf("aws:///us-east-1a/i-000%d", i)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: providerId,
},
}
// only setting 2 instances to be terminated out of 3 active instances
if i < 2 {
nodes = append(nodes, node)
a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(fmt.Sprintf("i-000%d", i)),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{
Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")},
})
}
awsInstanceRef := AwsInstanceRef{
ProviderID: providerId,
Name: fmt.Sprintf("i-000%d", i),
}
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
instanceToAsg[awsInstanceRef] = commonAsg
}

// modifying provider to bring disparity between ASG and cache
provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs
provider.awsManager.asgCache.instanceToAsg = instanceToAsg

// calling delete nodes 2 nodes and remaining placeholders
err = asgs[0].DeleteNodes(nodes)
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

// This ensures only 2 instances are terminated which are mocked in this unit test
a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 2)

}
64 changes: 55 additions & 9 deletions cluster-autoscaler/cloudprovider/azure/azure_autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,31 @@ package azure

import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"strconv"
"strings"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)

const (
autoDiscovererTypeLabel = "label"
autoDiscovererTypeLabel = "label"
vmssAutoDiscovererKeyMinNodes = "min"
vmssAutoDiscovererKeyMaxNodes = "max"
)

// A labelAutoDiscoveryConfig specifies how to auto-discover Azure node groups.
type labelAutoDiscoveryConfig struct {
// Key-values to match on.
Selector map[string]string
// MinSize specifies the minimum size for all VMSSs that match Selector.
MinSize *int
// MazSize specifies the maximum size for all VMSSs that match Selector.
MaxSize *int
}

type autoDiscoveryConfigSizes struct {
Min int
Max int
}

// ParseLabelAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs
Expand Down Expand Up @@ -70,34 +83,67 @@ func parseLabelAutoDiscoverySpec(spec string) (labelAutoDiscoveryConfig, error)
if k == "" || v == "" {
return cfg, fmt.Errorf("empty value not allowed in key=value tag pairs")
}
cfg.Selector[k] = v

switch k {
case vmssAutoDiscovererKeyMinNodes:
minSize, err := strconv.Atoi(v)
if err != nil || minSize < 0 {
return cfg, fmt.Errorf("invalid minimum nodes: %s", v)
}
cfg.MinSize = &minSize
case vmssAutoDiscovererKeyMaxNodes:
maxSize, err := strconv.Atoi(v)
if err != nil || maxSize < 0 {
return cfg, fmt.Errorf("invalid maximum nodes: %s", v)
}
cfg.MaxSize = &maxSize
default:
cfg.Selector[k] = v
}
}
if cfg.MaxSize != nil && cfg.MinSize != nil && *cfg.MaxSize < *cfg.MinSize {
return cfg, fmt.Errorf("maximum size %d must be greater than or equal to minimum size %d", *cfg.MaxSize, *cfg.MinSize)
}
return cfg, nil
}

func matchDiscoveryConfig(labels map[string]*string, configs []labelAutoDiscoveryConfig) bool {
// returns an autoDiscoveryConfigSizes struct if the VMSS's tags match the autodiscovery configs
// if the VMSS's tags do not match then return nil
// if there are multiple min/max sizes defined, return the highest min value and the lowest max value
func matchDiscoveryConfig(labels map[string]*string, configs []labelAutoDiscoveryConfig) *autoDiscoveryConfigSizes {
if len(configs) == 0 {
return false
return nil
}
minSize := -1
maxSize := -1

for _, c := range configs {
if len(c.Selector) == 0 {
return false
return nil
}

for k, v := range c.Selector {
value, ok := labels[k]
if !ok {
return false
return nil
}

if len(v) > 0 {
if value == nil || *value != v {
return false
return nil
}
}
}
if c.MinSize != nil && minSize < *c.MinSize {
minSize = *c.MinSize
}
if c.MaxSize != nil && (maxSize == -1 || maxSize > *c.MaxSize) {
maxSize = *c.MaxSize
}
}

return true
return &autoDiscoveryConfigSizes{
Min: minSize,
Max: maxSize,
}
}
Loading

0 comments on commit 74e7c5f

Please sign in to comment.