diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 5fdf7af351..8bf049488d 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -479,18 +479,14 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize } newSizeGiB := util.RoundUpGiB(newSizeBytes) - var oldSizeGiB int64 - if newSizeBytes != 0 { - volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB) - if err != nil || volumeSize != 0 { - return volumeSize, err - } + needsModification, volumeSize, err := c.validateModifyVolume(ctx, volumeID, newSizeGiB, options) + if err != nil || !needsModification { + return volumeSize, err } req := &ec2.ModifyVolumeInput{ VolumeId: aws.String(volumeID), } - // Only set req.size for resizing volume if newSizeBytes != 0 { req.Size = aws.Int64(newSizeGiB) } @@ -500,7 +496,7 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize if options.VolumeType != "" { req.VolumeType = aws.String(options.VolumeType) } - if options.VolumeType == VolumeTypeGP3 { + if options.Throughput != 0 { req.Throughput = aws.Int64(int64(options.Throughput)) } @@ -509,26 +505,17 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err) } - mod := response.VolumeModification - state := aws.StringValue(mod.ModificationState) - - if volumeModificationDone(state) { - if newSizeBytes != 0 { - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) - } else { - return 0, nil - } - } - - err = c.waitForVolumeSize(ctx, volumeID) - if newSizeBytes != 0 { + // If the volume modification isn't immediately completed, wait for it to finish + state := aws.StringValue(response.VolumeModification.ModificationState) + if !volumeModificationDone(state) { + err = c.waitForVolumeModification(ctx, volumeID) if err != nil { - return oldSizeGiB, err + return 0, err } - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) - } else { - return 0, c.waitForVolumeSize(ctx, volumeID) } + + // Perform one final check on the volume + return c.checkDesiredState(ctx, volumeID, newSizeGiB, options) } func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { @@ -1170,7 +1157,7 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool { // Checks for desired size on volume by also verifying volume size by describing volume. // This is to get around potential eventual consistency problems with describing volume modifications // objects and ensuring that we read two different objects to verify volume state. -func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) { +func (c *cloud) checkDesiredState(ctx context.Context, volumeID string, newSizeGiB int64, _ *ModifyDiskOptions) (int64, error) { request := &ec2.DescribeVolumesInput{ VolumeIds: []*string{ aws.String(volumeID), @@ -1189,8 +1176,8 @@ func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGi return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB) } -// waitForVolumeSize waits for a volume modification to finish. -func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error { +// waitForVolumeModification waits for a volume modification to finish. +func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) error { backoff := wait.Backoff{ Duration: volumeModificationDuration, Factor: volumeModificationWaitFactor, @@ -1199,7 +1186,10 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error { waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) { m, err := c.getLatestVolumeModification(ctx, volumeID) - if err != nil { + // Consider volumes that have never been modified as done + if err != nil && errors.Is(err, VolumeNotBeingModified) { + return true, nil + } else if err != nil { return false, err } @@ -1271,7 +1261,7 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err return zones, nil } -func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) { +func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int64, options *ModifyDiskOptions) (bool, int64, error) { request := &ec2.DescribeVolumesInput{ VolumeIds: []*string{ aws.String(volumeID), @@ -1279,41 +1269,63 @@ func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSize } volume, err := c.getVolume(ctx, request) if err != nil { - return 0, err + return true, 0, err } oldSizeGiB := aws.Int64Value(volume.Size) - latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID) + latestMod, err := c.getLatestVolumeModification(ctx, volumeID) + if err != nil && !errors.Is(err, VolumeNotBeingModified) { + return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err) + } - if latestMod != nil && modFetchError == nil { + // latestMod can be nil if the volume has never been modified + if latestMod != nil { state := aws.StringValue(latestMod.ModificationState) if state == ec2.VolumeModificationStateModifying { - err = c.waitForVolumeSize(ctx, volumeID) + // If volume is already modifying, detour to waiting for it to modify + klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID) + err = c.waitForVolumeModification(ctx, volumeID) if err != nil { - return oldSizeGiB, err + return true, oldSizeGiB, err } - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options) + return false, returnGiB, returnErr + } else if state == ec2.VolumeModificationStateOptimizing { + return true, 0, fmt.Errorf("volume %q in OPTIMIZING state, cannot currently modify", volumeID) } } - // if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error - // that means we have an API problem. - if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) { - return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError) + // At this point, we know we are starting a new volume modification + // If we're asked to modify a volume to its current state, ignore the request and immediately return a success + needsModification := false + + if newSizeGiB != 0 && oldSizeGiB < newSizeGiB { + needsModification = true + } + if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) { + needsModification = true + } + if options.VolumeType != "" && strings.ToLower(*volume.VolumeType) != strings.ToLower(options.VolumeType) { + needsModification = true + } + if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) { + needsModification = true } - // Even if existing volume size is greater than user requested size, we should ensure that there are no pending - // volume modifications objects or volume has completed previously issued modification request. - if oldSizeGiB >= newSizeGiB { - klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB) - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil && !errors.Is(err, VolumeNotBeingModified) { - return oldSizeGiB, err + if !needsModification { + klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID) + // Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new + // state before the volume is actually finished modifying + err = c.waitForVolumeModification(ctx, volumeID) + if err != nil { + return true, oldSizeGiB, err } - return oldSizeGiB, nil + returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options) + return false, returnGiB, returnErr } - return 0, nil + + return true, 0, nil } func volumeModificationDone(state string) bool { diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index e4361749cd..fad4952e74 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -1341,6 +1341,7 @@ func TestResizeOrModifyDisk(t *testing.T) { reqSizeGiB int64 modifyDiskOptions *ModifyDiskOptions expErr error + shouldCallDescribe bool }{ { name: "success: normal resize", @@ -1360,6 +1361,7 @@ func TestResizeOrModifyDisk(t *testing.T) { reqSizeGiB: 2, modifyDiskOptions: &ModifyDiskOptions{}, expErr: nil, + shouldCallDescribe: true, }, { name: "success: normal modifying state", @@ -1388,6 +1390,7 @@ func TestResizeOrModifyDisk(t *testing.T) { reqSizeGiB: 2, modifyDiskOptions: &ModifyDiskOptions{}, expErr: nil, + shouldCallDescribe: true, }, { name: "success: with previous expansion", @@ -1409,10 +1412,15 @@ func TestResizeOrModifyDisk(t *testing.T) { reqSizeGiB: 2, modifyDiskOptions: &ModifyDiskOptions{}, expErr: nil, + shouldCallDescribe: true, }, { name: "success: modify IOPS, throughput and volume type", volumeID: "vol-test", + existingVolume: &ec2.Volume{ + VolumeId: aws.String("vol-test"), + VolumeType: aws.String("gp2"), + }, modifyDiskOptions: &ModifyDiskOptions{ VolumeType: "GP3", IOPS: 3000, @@ -1428,6 +1436,7 @@ func TestResizeOrModifyDisk(t *testing.T) { }, }, expErr: nil, + shouldCallDescribe: true, }, { name: "success: modify size, IOPS, throughput and volume type", @@ -1436,6 +1445,8 @@ func TestResizeOrModifyDisk(t *testing.T) { VolumeId: aws.String("vol-test"), Size: aws.Int64(1), AvailabilityZone: aws.String(defaultZone), + VolumeType: aws.String("gp2"), + Iops: aws.Int64(2000), }, modifyDiskOptions: &ModifyDiskOptions{ VolumeType: "GP3", @@ -1454,6 +1465,7 @@ func TestResizeOrModifyDisk(t *testing.T) { }, }, expErr: nil, + shouldCallDescribe: true, }, { name: "fail: volume doesn't exist", @@ -1489,9 +1501,44 @@ func TestResizeOrModifyDisk(t *testing.T) { VolumeType: "GP2", IOPS: 3000, }, + existingVolume: &ec2.Volume{ + VolumeId: aws.String("vol-test"), + AvailabilityZone: aws.String(defaultZone), + VolumeType: aws.String("gp2"), + }, modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), expErr: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), }, + { + name: "success: does not call ModifyVolume when no modification required", + volumeID: "vol-test", + existingVolume: &ec2.Volume{ + VolumeId: aws.String("vol-test"), + AvailabilityZone: aws.String(defaultZone), + VolumeType: aws.String("gp3"), + Iops: aws.Int64(3000), + }, + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP3", + IOPS: 3000, + }, + shouldCallDescribe: true, + }, + { + name: "success: does not call ModifyVolume when no modification required (with size)", + volumeID: "vol-test", + existingVolume: &ec2.Volume{ + VolumeId: aws.String("vol-test"), + AvailabilityZone: aws.String(defaultZone), + Size: aws.Int64(13), + Iops: aws.Int64(3000), + }, + reqSizeGiB: 13, + modifyDiskOptions: &ModifyDiskOptions{ + IOPS: 3000, + }, + shouldCallDescribe: true, + }, } for _, tc := range testCases { @@ -1511,16 +1558,26 @@ func TestResizeOrModifyDisk(t *testing.T) { }, }, tc.existingVolumeError) - if tc.expErr == nil && aws.Int64Value(tc.existingVolume.Size) != tc.reqSizeGiB { - resizedVolume := &ec2.Volume{ - VolumeId: aws.String("vol-test"), - Size: aws.Int64(tc.reqSizeGiB), - AvailabilityZone: aws.String(defaultZone), + if tc.shouldCallDescribe { + newVolume := tc.existingVolume + if tc.reqSizeGiB != 0 { + newVolume.Size = aws.Int64(tc.reqSizeGiB) + } + if tc.modifyDiskOptions != nil { + if tc.modifyDiskOptions.IOPS != 0 { + newVolume.Iops = aws.Int64(int64(tc.modifyDiskOptions.IOPS)) + } + if tc.modifyDiskOptions.Throughput != 0 { + newVolume.Throughput = aws.Int64(int64(tc.modifyDiskOptions.Throughput)) + } + if tc.modifyDiskOptions.VolumeType != "" { + newVolume.VolumeType = aws.String(tc.modifyDiskOptions.VolumeType) + } } mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return( &ec2.DescribeVolumesOutput{ Volumes: []*ec2.Volume{ - resizedVolume, + newVolume, }, }, tc.existingVolumeError) }