diff --git a/pkg/networking/security_group_manager.go b/pkg/networking/security_group_manager.go index c90ba36cb..18b4bc194 100644 --- a/pkg/networking/security_group_manager.go +++ b/pkg/networking/security_group_manager.go @@ -33,7 +33,7 @@ func (opts *FetchSGInfoOptions) ApplyOptions(options ...FetchSGInfoOption) { type FetchSGInfoOption func(opts *FetchSGInfoOptions) -// WithReloadIgnoringCache is a option that sets the ReloadIgnoringCache to true. +// WithReloadIgnoringCache is an option that sets the ReloadIgnoringCache to true. func WithReloadIgnoringCache() FetchSGInfoOption { return func(opts *FetchSGInfoOptions) { opts.ReloadIgnoringCache = true diff --git a/pkg/networking/security_group_reconciler.go b/pkg/networking/security_group_reconciler.go index 8b9e5731f..1a54a8655 100644 --- a/pkg/networking/security_group_reconciler.go +++ b/pkg/networking/security_group_reconciler.go @@ -77,23 +77,24 @@ func (r *defaultSecurityGroupReconciler) ReconcileIngress(ctx context.Context, s return err } sgInfo := sgInfoByID[sgID] - if err := r.reconcileIngressWithSGInfo(ctx, sgInfo, desiredPermissions, reconcileOpts); err != nil { + + if err := r.reconcileIngressWithSGInfo(ctx, sgInfo, desiredPermissions, false, reconcileOpts); err != nil { if !r.shouldRetryWithoutCache(err) { return err } + revokeFirst := r.shouldRemoveSGRulesFirst(err) + r.logger.Info("Retrying ReconcileIngress without using cache", "revokeFirst", revokeFirst) sgInfoByID, err := r.sgManager.FetchSGInfosByID(ctx, []string{sgID}, WithReloadIgnoringCache()) if err != nil { return err } sgInfo := sgInfoByID[sgID] - if err := r.reconcileIngressWithSGInfo(ctx, sgInfo, desiredPermissions, reconcileOpts); err != nil { - return err - } + return r.reconcileIngressWithSGInfo(ctx, sgInfo, desiredPermissions, revokeFirst, reconcileOpts) } return nil } -func (r *defaultSecurityGroupReconciler) reconcileIngressWithSGInfo(ctx context.Context, sgInfo SecurityGroupInfo, desiredPermissions []IPPermissionInfo, reconcileOpts SecurityGroupReconcileOptions) error { +func (r *defaultSecurityGroupReconciler) reconcileIngressWithSGInfo(ctx context.Context, sgInfo SecurityGroupInfo, desiredPermissions []IPPermissionInfo, revokeFirst bool, reconcileOpts SecurityGroupReconcileOptions) error { extraPermissions := diffIPPermissionInfos(sgInfo.Ingress, desiredPermissions) permissionsToRevoke := make([]IPPermissionInfo, 0, len(extraPermissions)) for _, permission := range extraPermissions { @@ -102,16 +103,29 @@ func (r *defaultSecurityGroupReconciler) reconcileIngressWithSGInfo(ctx context. } } permissionsToGrant := diffIPPermissionInfos(desiredPermissions, sgInfo.Ingress) - if len(permissionsToRevoke) > 0 && !reconcileOpts.AuthorizeOnly { - if err := r.sgManager.RevokeSGIngress(ctx, sgInfo.SecurityGroupID, permissionsToRevoke); err != nil { - return err + + if revokeFirst { + if len(permissionsToRevoke) > 0 && !reconcileOpts.AuthorizeOnly { + if err := r.sgManager.RevokeSGIngress(ctx, sgInfo.SecurityGroupID, permissionsToRevoke); err != nil { + return err + } } } + if len(permissionsToGrant) > 0 { if err := r.sgManager.AuthorizeSGIngress(ctx, sgInfo.SecurityGroupID, permissionsToGrant); err != nil { return err } } + + if !revokeFirst { + if len(permissionsToRevoke) > 0 && !reconcileOpts.AuthorizeOnly { + if err := r.sgManager.RevokeSGIngress(ctx, sgInfo.SecurityGroupID, permissionsToRevoke); err != nil { + return err + } + } + } + return nil } @@ -119,7 +133,16 @@ func (r *defaultSecurityGroupReconciler) reconcileIngressWithSGInfo(ctx context. func (r *defaultSecurityGroupReconciler) shouldRetryWithoutCache(err error) bool { var apiErr smithy.APIError if errors.As(err, &apiErr) { - return apiErr.ErrorCode() == "InvalidPermission.Duplicate" || apiErr.ErrorCode() == "InvalidPermission.NotFound" + return apiErr.ErrorCode() == "InvalidPermission.Duplicate" || apiErr.ErrorCode() == "InvalidPermission.NotFound" || apiErr.ErrorCode() == "RulesPerSecurityGroupLimitExceeded" + } + return false +} + +// shouldRemoveSGRulesFirst tests whether we should retry SecurityGroup rules reconcile but revoking rules prior to adding new rules. +func (r *defaultSecurityGroupReconciler) shouldRemoveSGRulesFirst(err error) bool { + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + return apiErr.ErrorCode() == "RulesPerSecurityGroupLimitExceeded" } return false } diff --git a/pkg/networking/security_group_reconciler_test.go b/pkg/networking/security_group_reconciler_test.go index 2bb3c7124..34f119db3 100644 --- a/pkg/networking/security_group_reconciler_test.go +++ b/pkg/networking/security_group_reconciler_test.go @@ -1,10 +1,15 @@ package networking import ( + "context" + "errors" awssdk "github.com/aws/aws-sdk-go-v2/aws" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/aws/smithy-go" + "github.com/go-logr/logr" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "sigs.k8s.io/controller-runtime/pkg/log" "testing" ) @@ -31,6 +36,13 @@ func Test_defaultSecurityGroupReconciler_shouldRetryWithoutCache(t *testing.T) { }, want: true, }, + { + name: "should retry without cache when got too many rules error", + args: args{ + err: &smithy.GenericAPIError{Code: "RulesPerSecurityGroupLimitExceeded", Message: ""}, + }, + want: true, + }, { name: "shouldn't retry when got some other error", args: args{ @@ -215,3 +227,369 @@ func Test_diffIPPermissionInfos(t *testing.T) { }) } } + +func TestReconcileSGIngress(t *testing.T) { + sgId := "sgId" + type fetchSGInfosByIDCall struct { + req []string + resp map[string]SecurityGroupInfo + err error + } + + tests := []struct { + name string + inputSGRules []IPPermissionInfo + sgGetCall fetchSGInfosByIDCall + authorizeData []IPPermissionInfo + revokeData []IPPermissionInfo + revokeCalls int + authorizeCalls int + authorizeError error + revokeError error + expectErr bool + }{ + { + name: "no permissions in either ec2 or kube", + inputSGRules: []IPPermissionInfo{}, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + }, + }, + }, + }, + { + name: "sg get failure blocks revoke / authorize", + inputSGRules: []IPPermissionInfo{}, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + }, + }, + err: errors.New("bad thing"), + }, + expectErr: true, + }, + { + name: "permission present in kube but not ec2 should lead to authorize call", + inputSGRules: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + }, + }, + }, + authorizeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + authorizeCalls: 1, + }, + { + name: "permission present in ec2 but not kube should lead to revoke call", + inputSGRules: []IPPermissionInfo{}, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + Ingress: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + }, + }, + }, + revokeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + revokeCalls: 1, + }, + { + name: "revoke and authorize together", + inputSGRules: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + Ingress: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + }, + }, + }, + authorizeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + authorizeCalls: 1, + revokeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + revokeCalls: 1, + }, + { + name: "authorize error should block revoke call", + inputSGRules: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + Ingress: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + }, + }, + }, + authorizeError: errors.New("authorize error"), + expectErr: true, + authorizeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + authorizeCalls: 1, + }, + { + name: "revoke error should not block authorize call", + inputSGRules: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + sgGetCall: fetchSGInfosByIDCall{ + req: []string{sgId}, + resp: map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + Ingress: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + }, + }, + }, + revokeError: errors.New("revoke error"), + expectErr: true, + authorizeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + authorizeCalls: 1, + revokeData: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + revokeCalls: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + sgManager := NewMockSecurityGroupManager(ctrl) + reconciler := &defaultSecurityGroupReconciler{ + sgManager: sgManager, + logger: logr.New(&log.NullLogSink{}), + } + + ctx := context.Background() + sgManager.EXPECT().FetchSGInfosByID(gomock.Any(), tt.sgGetCall.req, gomock.Any()).Return(tt.sgGetCall.resp, tt.sgGetCall.err) + + sgManager.EXPECT().AuthorizeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(tt.authorizeData)).Return(tt.authorizeError).Times(tt.authorizeCalls) + sgManager.EXPECT().RevokeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(tt.revokeData)).Return(tt.revokeError).Times(tt.revokeCalls) + + err := reconciler.ReconcileIngress(ctx, sgId, tt.inputSGRules) + ctrl.Finish() + if tt.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestReconcileSGIngress_RehydrateCache(t *testing.T) { + + testCases := []struct { + name string + initialAuthorizeError error + secondAuthorizeError error + revokeFirst bool + }{ + { + name: "not found error leads to cache re-hydrate", + initialAuthorizeError: &smithy.GenericAPIError{Code: "InvalidPermission.NotFound", Message: ""}, + revokeFirst: false, + }, + { + name: "too many rules error leads to cache re-hydrate and inverse operations", + initialAuthorizeError: &smithy.GenericAPIError{Code: "RulesPerSecurityGroupLimitExceeded", Message: ""}, + revokeFirst: true, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + sgId := "sgId" + ctrl := gomock.NewController(t) + + sgManager := NewMockSecurityGroupManager(ctrl) + reconciler := &defaultSecurityGroupReconciler{ + sgManager: sgManager, + logger: logr.New(&log.NullLogSink{}), + } + + ctx := context.Background() + + sgManager.EXPECT().FetchSGInfosByID(gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(map[string]SecurityGroupInfo{ + sgId: { + SecurityGroupID: sgId, + Ingress: []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + }, + }, + }, nil) + + revokeData := []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(10), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + } + + inputSGRules := []IPPermissionInfo{ + { + Permission: ec2types.IpPermission{ + FromPort: awssdk.Int32(12), + ToPort: awssdk.Int32(15), + IpProtocol: awssdk.String("tcp"), + }, + }, + } + + if tt.revokeFirst { + gomock.InOrder( + sgManager.EXPECT().AuthorizeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(inputSGRules)).Times(1).Return(tt.initialAuthorizeError), + sgManager.EXPECT().RevokeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(revokeData)).Return(nil).Times(1), + sgManager.EXPECT().AuthorizeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(inputSGRules)).Times(1).Return(tt.secondAuthorizeError), + ) + } else { + gomock.InOrder( + sgManager.EXPECT().AuthorizeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(inputSGRules)).Times(1).Return(tt.initialAuthorizeError), + sgManager.EXPECT().AuthorizeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(inputSGRules)).Times(1).Return(tt.secondAuthorizeError), + sgManager.EXPECT().RevokeSGIngress(gomock.Eq(ctx), gomock.Eq(sgId), gomock.Eq(revokeData)).Return(nil).Times(1), + ) + } + + err := reconciler.ReconcileIngress(ctx, sgId, inputSGRules) + ctrl.Finish() + assert.NoError(t, err) + }) + } +}