Skip to content

Commit

Permalink
Merge pull request #7075 from comtalyst/comtalyst/azure-changes-from-…
Browse files Browse the repository at this point in the history
…managed-1.27

refactor: upstream most of Azure managed CAS changes in cloudprovider/azure for 1.27
  • Loading branch information
k8s-ci-robot authored Jul 31, 2024
2 parents 06465a3 + ba52b16 commit 16175ea
Show file tree
Hide file tree
Showing 51 changed files with 1,067 additions and 15,507 deletions.
57 changes: 30 additions & 27 deletions cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute" //nolint SA1019 - deprecated package
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" //nolint SA1019 - deprecated package
azStorage "github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest/to"

apiv1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -145,7 +144,7 @@ func (as *AgentPool) getVMsFromCache() ([]compute.VirtualMachine, error) {
}

// GetVMIndexes gets indexes of all virtual machines belonging to the agent pool.
func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) {
func (as *AgentPool) GetVMIndexes() (sortedIndexes []int, indexToVM map[int]string, err error) {
klog.V(6).Infof("GetVMIndexes: starts for as %v", as)

instances, err := as.getVMsFromCache()
Expand All @@ -155,23 +154,23 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) {
klog.V(6).Infof("GetVMIndexes: got instances, length = %d", len(instances))

indexes := make([]int, 0)
indexToVM := make(map[int]string)
indexToVM = make(map[int]string)
for _, instance := range instances {
index, err := GetVMNameIndex(instance.StorageProfile.OsDisk.OsType, *instance.Name)
if err != nil {
return nil, nil, err
}

indexes = append(indexes, index)
resourceID, err := convertResourceGroupNameToLower("azure://" + *instance.ID)
resourceID, err := convertResourceGroupNameToLower(azurePrefix + *instance.ID)
if err != nil {
return nil, nil, err
}
indexToVM[index] = resourceID
}

sortedIndexes := sort.IntSlice(indexes)
sortedIndexes.Sort()
sortedIndexes = indexes
sort.Ints(sortedIndexes)
return sortedIndexes, indexToVM, nil
}

Expand Down Expand Up @@ -216,7 +215,8 @@ func (as *AgentPool) getAllSucceededAndFailedDeployments() (succeededAndFailedDe
defer cancel()

deploymentsFilter := "provisioningState eq 'Succeeded' or provisioningState eq 'Failed'"
succeededAndFailedDeployments, err = as.manager.azClient.deploymentsClient.List(ctx, as.manager.config.ResourceGroup, deploymentsFilter, nil)
succeededAndFailedDeployments, err = as.manager.azClient.deploymentsClient.List(ctx, as.manager.config.ResourceGroup,
deploymentsFilter, nil)
if err != nil {
klog.Errorf("getAllSucceededAndFailedDeployments: failed to list succeeded or failed deployments with error: %v", err)
return nil, err
Expand Down Expand Up @@ -258,9 +258,12 @@ func (as *AgentPool) deleteOutdatedDeployments() (err error) {
errList := make([]error, 0)
for _, deployment := range toBeDeleted {
klog.V(4).Infof("deleteOutdatedDeployments: starts deleting outdated deployment (%s)", *deployment.Name)
_, err := as.manager.azClient.deploymentsClient.Delete(ctx, as.manager.config.ResourceGroup, *deployment.Name)
if err != nil {
errList = append(errList, err)
resp, errResp := as.manager.azClient.deploymentsClient.Delete(ctx, as.manager.config.ResourceGroup, *deployment.Name)
if errResp != nil {
errList = append(errList, errResp)
}
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}

Expand Down Expand Up @@ -317,8 +320,12 @@ func (as *AgentPool) IncreaseSize(delta int) error {
}
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(3).Infof("Waiting for deploymentsClient.CreateOrUpdate(%s, %s, %v)", as.manager.config.ResourceGroup, newDeploymentName, newDeployment)
klog.V(3).Infof("Waiting for deploymentsClient.CreateOrUpdate(%s, %s, %v)", as.manager.config.ResourceGroup,
newDeploymentName, newDeployment)
resp, err := as.manager.azClient.deploymentsClient.CreateOrUpdate(ctx, as.manager.config.ResourceGroup, newDeploymentName, newDeployment)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
isSuccess, realError := isSuccessHTTPResponse(resp, err)
if isSuccess {
klog.V(3).Infof("deploymentsClient.CreateOrUpdate(%s, %s, %v) success", as.manager.config.ResourceGroup, newDeploymentName, newDeployment)
Expand Down Expand Up @@ -404,7 +411,7 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error {
}

for _, instance := range instances {
name, err := resourceName((*instance).Name)
name, err := resourceName(instance.Name)
if err != nil {
klog.Errorf("Get name for instance %q failed: %v", *instance, err)
return err
Expand Down Expand Up @@ -436,12 +443,12 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error {

refs := make([]*azureRef, 0, len(nodes))
for _, node := range nodes {
belongs, err := as.Belongs(node)
if err != nil {
return err
belongs, err2 := as.Belongs(node)
if err2 != nil {
return err2
}

if belongs != true {
if !belongs {
return fmt.Errorf("%s belongs to a different asg than %s", node.Name, as.Name)
}

Expand Down Expand Up @@ -478,13 +485,13 @@ func (as *AgentPool) Nodes() ([]cloudprovider.Instance, error) {

nodes := make([]cloudprovider.Instance, 0, len(instances))
for _, instance := range instances {
if len(*instance.ID) == 0 {
if *instance.ID == "" {
continue
}

// To keep consistent with providerID from kubernetes cloud provider, convert
// resourceGroupName in the ID to lower case.
resourceID, err := convertResourceGroupNameToLower("azure://" + *instance.ID)
resourceID, err := convertResourceGroupNameToLower(azurePrefix + *instance.ID)
if err != nil {
return nil, err
}
Expand All @@ -504,7 +511,7 @@ func (as *AgentPool) deleteBlob(accountName, vhdContainer, vhdBlob string) error
}

keys := *storageKeysResult.Keys
client, err := azStorage.NewBasicClientOnSovereignCloud(accountName, to.String(keys[0].Value), as.manager.env)
client, err := azStorage.NewBasicClientOnSovereignCloud(accountName, to.String(keys[0].Value), *as.manager.env)
if err != nil {
return err
}
Expand Down Expand Up @@ -541,11 +548,12 @@ func (as *AgentPool) deleteVirtualMachine(name string) error {

osDiskName := vm.VirtualMachineProperties.StorageProfile.OsDisk.Name
var nicName string
var err error
nicID := (*vm.VirtualMachineProperties.NetworkProfile.NetworkInterfaces)[0].ID
if nicID == nil {
klog.Warningf("NIC ID is not set for VM (%s/%s)", as.manager.config.ResourceGroup, name)
} else {
nicName, err := resourceName(*nicID)
nicName, err = resourceName(*nicID)
if err != nil {
return err
}
Expand Down Expand Up @@ -611,8 +619,3 @@ func (as *AgentPool) deleteVirtualMachine(name string) error {

return nil
}

// getAzureRef gets AzureRef for the as.
func (as *AgentPool) getAzureRef() azureRef {
return as.azureRef
}
71 changes: 38 additions & 33 deletions cluster-autoscaler/cloudprovider/azure/azure_agent_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,26 @@ package azure
import (
"context"
"fmt"
"net/http"
"strings"
"testing"
"time"

apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/storageaccountclient/mockstorageaccountclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient"
"sigs.k8s.io/cloud-provider-azure/pkg/retry"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-03-01/compute" //nolint SA1019 - deprecated package
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" //nolint SA1019 - deprecated package
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage" //nolint SA1019 - deprecated package
"github.com/Azure/go-autorest/autorest/date"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/storageaccountclient/mockstorageaccountclient"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient"
)

var (
rerrTooManyReqs = retry.Error{HTTPStatusCode: http.StatusTooManyRequests}
rerrInternalErr = retry.Error{HTTPStatusCode: http.StatusInternalServerError}
testValidProviderID0 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/as-vm-0"
testValidProviderID1 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/as-vm-1"
testInvalidProviderID = "/subscriptions/sub/resourceGroups/rg/providers/provider/virtualMachines/as-vm-0/"
Expand Down Expand Up @@ -163,6 +160,7 @@ func TestDeleteOutdatedDeployments(t *testing.T) {
err := testAS.deleteOutdatedDeployments()
assert.Equal(t, test.expectedErr, err, test.desc)
existedDeployments, err := testAS.manager.azClient.deploymentsClient.List(context.Background(), "", "", to.Int32Ptr(0))
assert.Nil(t, err)
existedDeploymentsNames := make(map[string]bool)
for _, deployment := range existedDeployments {
existedDeploymentsNames[*deployment.Name] = true
Expand All @@ -185,8 +183,8 @@ func TestGetVMsFromCache(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
testAS.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), testAS.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(testAS.manager.azClient, refreshInterval, testAS.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
testAS.manager.config.VMType = vmTypeStandard
ac := newAzureCache(testAS.manager.azClient, refreshInterval, testAS.manager.config)
testAS.manager.azureCache = ac

vms, err := testAS.getVMsFromCache()
Expand All @@ -203,8 +201,8 @@ func TestGetVMIndexes(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
as.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.azureCache = ac

sortedIndexes, indexToVM, err := as.GetVMIndexes()
Expand All @@ -225,6 +223,8 @@ func TestGetVMIndexes(t *testing.T) {
expectedVMs[0].Name = to.StringPtr("foo")
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
err = as.manager.forceRefresh()
assert.NoError(t, err)

sortedIndexes, indexToVM, err = as.GetVMIndexes()
expectedErr = fmt.Errorf("resource name was missing from identifier")
assert.Equal(t, expectedErr, err)
Expand All @@ -242,8 +242,8 @@ func TestGetCurSize(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
as.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.azureCache = ac

as.lastRefresh = time.Now()
Expand All @@ -266,8 +266,8 @@ func TestAgentPoolTargetSize(t *testing.T) {
as.manager.azClient.virtualMachinesClient = mockVMClient
expectedVMs := getExpectedVMs()
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.azureCache = ac

as.lastRefresh = time.Now().Add(-1 * 15 * time.Second)
Expand All @@ -285,19 +285,21 @@ func TestAgentPoolIncreaseSize(t *testing.T) {
as.manager.azClient.virtualMachinesClient = mockVMClient
expectedVMs := getExpectedVMs()
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(2)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.azureCache = ac

err = as.IncreaseSize(-1)
err := as.IncreaseSize(-1)
expectedErr := fmt.Errorf("size increase must be positive")
assert.Equal(t, expectedErr, err)

mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(2)
err = as.manager.Refresh()
assert.NoError(t, err)

err = as.IncreaseSize(4)
expectedErr = fmt.Errorf("size increase too large - desired:6 max:5")
assert.Equal(t, expectedErr, err)

err = as.IncreaseSize(2)
assert.NoError(t, err)
Expand All @@ -313,11 +315,11 @@ func TestDecreaseTargetSize(t *testing.T) {
as.manager.azClient.virtualMachinesClient = mockVMClient
expectedVMs := getExpectedVMs()
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(3)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.azureCache = ac

err = as.DecreaseTargetSize(-1)
err := as.DecreaseTargetSize(-1)
assert.NoError(t, err)
assert.Equal(t, int64(2), as.curSize)

Expand Down Expand Up @@ -409,7 +411,8 @@ func TestDeleteInstances(t *testing.T) {
}
mockVMClient.EXPECT().Get(gomock.Any(), as.manager.config.ResourceGroup, "as-vm-0", gomock.Any()).Return(getExpectedVMs()[0], nil)
mockVMClient.EXPECT().Delete(gomock.Any(), as.manager.config.ResourceGroup, "as-vm-0").Return(nil)
mockSAClient.EXPECT().ListKeys(gomock.Any(), as.manager.config.SubscriptionID, as.manager.config.ResourceGroup, "foo").Return(storage.AccountListKeysResult{
mockSAClient.EXPECT().ListKeys(gomock.Any(), as.manager.config.SubscriptionID, as.manager.config.ResourceGroup,
"foo").Return(storage.AccountListKeysResult{
Keys: &[]storage.AccountKey{
{Value: to.StringPtr("dmFsdWUK")},
},
Expand All @@ -431,11 +434,12 @@ func TestAgentPoolDeleteNodes(t *testing.T) {
mockSAClient := mockstorageaccountclient.NewMockInterface(ctrl)
as.manager.azClient.storageAccountsClient = mockSAClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.config.VMType = vmTypeVMSS
as.manager.azureCache = ac

err = as.DeleteNodes([]*apiv1.Node{
err := as.DeleteNodes([]*apiv1.Node{
{
Spec: apiv1.NodeSpec{ProviderID: testInvalidProviderID},
ObjectMeta: v1.ObjectMeta{Name: "node"},
Expand All @@ -458,7 +462,8 @@ func TestAgentPoolDeleteNodes(t *testing.T) {
as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as
mockVMClient.EXPECT().Get(gomock.Any(), as.manager.config.ResourceGroup, "as-vm-0", gomock.Any()).Return(getExpectedVMs()[0], nil)
mockVMClient.EXPECT().Delete(gomock.Any(), as.manager.config.ResourceGroup, "as-vm-0").Return(nil)
mockSAClient.EXPECT().ListKeys(gomock.Any(), as.manager.config.SubscriptionID, as.manager.config.ResourceGroup, "foo").Return(storage.AccountListKeysResult{
mockSAClient.EXPECT().ListKeys(gomock.Any(), as.manager.config.SubscriptionID, as.manager.config.ResourceGroup,
"foo").Return(storage.AccountListKeysResult{
Keys: &[]storage.AccountKey{
{Value: to.StringPtr("dmFsdWUK")},
},
Expand Down Expand Up @@ -497,8 +502,8 @@ func TestAgentPoolNodes(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
as.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.config.VMType = vmTypeStandard
ac := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config)
as.manager.azureCache = ac

nodes, err := as.Nodes()
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/cloudprovider/azure/azure_autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package azure

import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"strings"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)

const (
Expand Down Expand Up @@ -91,7 +92,7 @@ func matchDiscoveryConfig(labels map[string]*string, configs []labelAutoDiscover
return false
}

if len(v) > 0 {
if v != "" {
if value == nil || *value != v {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package azure

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"testing"
)

func TestParseLabelAutoDiscoverySpecs(t *testing.T) {
Expand Down
Loading

0 comments on commit 16175ea

Please sign in to comment.