diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 67bb5f16fc..09a9633ea5 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "os" + "strconv" "strings" "sync" "time" @@ -311,13 +312,14 @@ type batcherManager struct { } type cloud struct { - region string - ec2 EC2API - dm dm.DeviceManager - bm *batcherManager - rm *retryManager - vwp volumeWaitParameters - likelyBadNames util.ExpiringCache[string, sync.Map] + region string + ec2 EC2API + dm dm.DeviceManager + bm *batcherManager + rm *retryManager + vwp volumeWaitParameters + likelyBadNames util.ExpiringCache[string, sync.Map] + latestClientTokens util.ExpiringCache[string, int] } var _ Cloud = &cloud{} @@ -366,13 +368,14 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc } return &cloud{ - region: region, - dm: dm.NewDeviceManager(), - ec2: svc, - bm: bm, - rm: newRetryManager(), - vwp: vwp, - likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay), + region: region, + dm: dm.NewDeviceManager(), + ec2: svc, + bm: bm, + rm: newRetryManager(), + vwp: vwp, + likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay), + latestClientTokens: util.NewExpiringCache[string, int](cacheForgetDelay), } } @@ -589,8 +592,13 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * } } + tokenBase := volumeName + if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok { + tokenBase += "-" + strconv.Itoa(*tokenNumber) + } + // We hash the volume name to generate a unique token that is less than or equal to 64 characters - clientToken := sha256.Sum256([]byte(volumeName)) + clientToken := sha256.Sum256([]byte(tokenBase)) requestInput := &ec2.CreateVolumeInput{ AvailabilityZone: aws.String(zone), @@ -633,6 +641,11 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return nil, ErrNotFound } if isAWSErrorIdempotentParameterMismatch(err) { + nextTokenNumber := 2 + if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok { + nextTokenNumber = *tokenNumber + 1 + } + c.latestClientTokens.Set(volumeName, &nextTokenNumber) return nil, ErrIdempotentParameterMismatch } return nil, fmt.Errorf("could not create volume in EC2: %w", err) diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 2fabc44d32..d29c406737 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -1281,6 +1281,67 @@ func TestCreateDisk(t *testing.T) { } } +// Test client error IdempotentParameterMismatch by forcing it to progress twice +func TestCreateDiskClientToken(t *testing.T) { + t.Parallel() + + const volumeName = "test-vol-client-token" + const volumeId = "vol-abcd1234" + diskOptions := &DiskOptions{ + CapacityBytes: util.GiBToBytes(1), + Tags: map[string]string{VolumeNameTagKey: volumeName, AwsEbsDriverTagKey: "true"}, + AvailabilityZone: defaultZone, + } + + // Hash of "test-vol-client-token" + const expectedClientToken1 = "6a1b29bd7c5c5541d9d6baa2938e954fc5739dc77e97facf23590bd13f8582c2" + // Hash of "test-vol-client-token-2" + const expectedClientToken2 = "21465f5586388bb8804d0cec2df13c00f9a975c8cddec4bc35e964cdce59015b" + // Hash of "test-vol-client-token-3" + const expectedClientToken3 = "1bee5a79d83981c0041df2c414bb02e0c10aeb49343b63f50f71470edbaa736b" + + mockCtrl := gomock.NewController(t) + mockEC2 := NewMockEC2API(mockCtrl) + c := newCloud(mockEC2) + + gomock.InOrder( + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) { + assert.Equal(t, expectedClientToken1, *input.ClientToken) + return nil, &smithy.GenericAPIError{Code: "IdempotentParameterMismatch"} + }), + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) { + assert.Equal(t, expectedClientToken2, *input.ClientToken) + return nil, &smithy.GenericAPIError{Code: "IdempotentParameterMismatch"} + }), + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) { + assert.Equal(t, expectedClientToken3, *input.ClientToken) + return &ec2.CreateVolumeOutput{ + VolumeId: aws.String(volumeId), + Size: aws.Int32(util.BytesToGiB(diskOptions.CapacityBytes)), + }, nil + }), + mockEC2.EXPECT().DescribeVolumes(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{ + Volumes: []types.Volume{ + { + VolumeId: aws.String(volumeId), + Size: aws.Int32(util.BytesToGiB(diskOptions.CapacityBytes)), + State: types.VolumeState("available"), + AvailabilityZone: aws.String(diskOptions.AvailabilityZone), + }, + }, + }, nil).AnyTimes(), + ) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(defaultCreateDiskDeadline)) + defer cancel() + for _ = range 3 { + c.CreateDisk(ctx, volumeName, diskOptions) + } +} + func TestDeleteDisk(t *testing.T) { testCases := []struct { name string @@ -3069,12 +3130,13 @@ func testVolumeWaitParameters() volumeWaitParameters { func newCloud(mockEC2 EC2API) Cloud { c := &cloud{ - region: "test-region", - dm: dm.NewDeviceManager(), - ec2: mockEC2, - rm: newRetryManager(), - vwp: testVolumeWaitParameters(), - likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay), + region: "test-region", + dm: dm.NewDeviceManager(), + ec2: mockEC2, + rm: newRetryManager(), + vwp: testVolumeWaitParameters(), + likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay), + latestClientTokens: util.NewExpiringCache[string, int](cacheForgetDelay), } return c }