From 0a0d5efd7d82bebc7363d7d7a44a627bd7f3cbe8 Mon Sep 17 00:00:00 2001 From: Connor Catlett Date: Tue, 25 Jun 2024 21:35:54 +0000 Subject: [PATCH] Use new client token when CreateVolume returns IdempotentParameterMismatch Signed-off-by: Connor Catlett --- pkg/cloud/cloud.go | 26 ++++++++- pkg/cloud/cloud_test.go | 67 +++++++++++++++++++++++ pkg/cloud/devicemanager/allocator.go | 1 - pkg/cloud/devicemanager/allocator_test.go | 7 +++ 4 files changed, 98 insertions(+), 3 deletions(-) diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 268ca13ee2..e31e6752ad 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "os" + "strconv" "strings" "sync" "time" @@ -319,6 +320,7 @@ type cloud struct { rm *retryManager vwp volumeWaitParameters likelyBadDeviceNames expiringcache.ExpiringCache[string, sync.Map] + latestClientTokens expiringcache.ExpiringCache[string, int] } var _ Cloud = &cloud{} @@ -374,6 +376,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc rm: newRetryManager(), vwp: vwp, likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay), + latestClientTokens: expiringcache.New[string, int](cacheForgetDelay), } } @@ -590,8 +593,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * } } - // We hash the volume name to generate a unique token that is less than or equal to 64 characters - clientToken := sha256.Sum256([]byte(volumeName)) + // The first client token used for any volume is the volume name as provided via CSI + // However, if a volume fails to create asyncronously (that is, the CreateVolume call + // succeeds but the volume ultimately fails to create), the client token is burned until + // EC2 forgets about its use (measured as 12 hours under normal conditions) + // + // To prevent becoming stuck for 12 hours when this occurs, we sequentially append "-2", + // "-3", "-4", etc to the volume name before hashing on the subsequent attempt after a + // volume fails to create because of an IdempotentParameterMismatch AWS error + // The most recent appended value is stored in an expiring cache to prevent memory leaks + tokenBase := volumeName + if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok { + tokenBase += "-" + strconv.Itoa(*tokenNumber) + } + + // We use a sha256 hash to guarantee the token that is less than or equal to 64 characters + clientToken := sha256.Sum256([]byte(tokenBase)) requestInput := &ec2.CreateVolumeInput{ AvailabilityZone: aws.String(zone), @@ -634,6 +651,11 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return nil, ErrNotFound } if isAWSErrorIdempotentParameterMismatch(err) { + nextTokenNumber := 2 + if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok { + nextTokenNumber = *tokenNumber + 1 + } + c.latestClientTokens.Set(volumeName, &nextTokenNumber) return nil, ErrIdempotentParameterMismatch } return nil, fmt.Errorf("could not create volume in EC2: %w", err) diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index e825fa4e75..f0142510eb 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -1282,6 +1282,72 @@ func TestCreateDisk(t *testing.T) { } } +// Test client error IdempotentParameterMismatch by forcing it to progress twice +func TestCreateDiskClientToken(t *testing.T) { + t.Parallel() + + const volumeName = "test-vol-client-token" + const volumeId = "vol-abcd1234" + diskOptions := &DiskOptions{ + CapacityBytes: util.GiBToBytes(1), + Tags: map[string]string{VolumeNameTagKey: volumeName, AwsEbsDriverTagKey: "true"}, + AvailabilityZone: defaultZone, + } + + // Hash of "test-vol-client-token" + const expectedClientToken1 = "6a1b29bd7c5c5541d9d6baa2938e954fc5739dc77e97facf23590bd13f8582c2" + // Hash of "test-vol-client-token-2" + const expectedClientToken2 = "21465f5586388bb8804d0cec2df13c00f9a975c8cddec4bc35e964cdce59015b" + // Hash of "test-vol-client-token-3" + const expectedClientToken3 = "1bee5a79d83981c0041df2c414bb02e0c10aeb49343b63f50f71470edbaa736b" + + mockCtrl := gomock.NewController(t) + mockEC2 := NewMockEC2API(mockCtrl) + c := newCloud(mockEC2) + + gomock.InOrder( + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) { + assert.Equal(t, expectedClientToken1, *input.ClientToken) + return nil, &smithy.GenericAPIError{Code: "IdempotentParameterMismatch"} + }), + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) { + assert.Equal(t, expectedClientToken2, *input.ClientToken) + return nil, &smithy.GenericAPIError{Code: "IdempotentParameterMismatch"} + }), + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) { + assert.Equal(t, expectedClientToken3, *input.ClientToken) + return &ec2.CreateVolumeOutput{ + VolumeId: aws.String(volumeId), + Size: aws.Int32(util.BytesToGiB(diskOptions.CapacityBytes)), + }, nil + }), + mockEC2.EXPECT().DescribeVolumes(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{ + Volumes: []types.Volume{ + { + VolumeId: aws.String(volumeId), + Size: aws.Int32(util.BytesToGiB(diskOptions.CapacityBytes)), + State: types.VolumeState("available"), + AvailabilityZone: aws.String(diskOptions.AvailabilityZone), + }, + }, + }, nil).AnyTimes(), + ) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(defaultCreateDiskDeadline)) + defer cancel() + for i := range 3 { + _, err := c.CreateDisk(ctx, volumeName, diskOptions) + if i < 2 { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + } +} + func TestDeleteDisk(t *testing.T) { testCases := []struct { name string @@ -3076,6 +3142,7 @@ func newCloud(mockEC2 EC2API) Cloud { rm: newRetryManager(), vwp: testVolumeWaitParameters(), likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay), + latestClientTokens: expiringcache.New[string, int](cacheForgetDelay), } return c } diff --git a/pkg/cloud/devicemanager/allocator.go b/pkg/cloud/devicemanager/allocator.go index bc95ba0af3..683eee5b03 100644 --- a/pkg/cloud/devicemanager/allocator.go +++ b/pkg/cloud/devicemanager/allocator.go @@ -59,7 +59,6 @@ func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames *syn finalResortName := "" likelyBadNames.Range(func(name, _ interface{}) bool { if name, ok := name.(string); ok { - fmt.Println(name) if _, existing := existingNames[name]; !existing { finalResortName = name return false diff --git a/pkg/cloud/devicemanager/allocator_test.go b/pkg/cloud/devicemanager/allocator_test.go index e7c9d907fe..eb05e2e889 100644 --- a/pkg/cloud/devicemanager/allocator_test.go +++ b/pkg/cloud/devicemanager/allocator_test.go @@ -68,6 +68,13 @@ func TestNameAllocatorLikelyBadName(t *testing.T) { }) } + onlyExisting := new(sync.Map) + onlyExisting.Store(skippedNameExisting, struct{}{}) + _, err := allocator.GetNext(existingNames, onlyExisting) + if err != nil { + t.Errorf("got nil when error expected (likelyBadNames with only existing names)") + } + lastName, _ := allocator.GetNext(existingNames, likelyBadNames) if lastName != skippedNameNew { t.Errorf("test %q: expected %q, got %q (likelyBadNames fallback)", skippedNameNew, skippedNameNew, lastName)