Skip to content

Commit

Permalink
Do not call ModifyVolume if the volume is already in the desired state
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Catlett <[email protected]>
  • Loading branch information
ConnorJC3 committed Sep 13, 2023
1 parent ff266e6 commit 2ad0f0c
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 60 deletions.
128 changes: 74 additions & 54 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,14 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB)
if err != nil || volumeSize != 0 {
return volumeSize, err
}
needsModification, volumeSize, err := c.validateModifyVolume(ctx, volumeID, newSizeGiB, options)
if err != nil || !needsModification {
return volumeSize, err
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
Expand All @@ -500,7 +496,7 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.VolumeType == VolumeTypeGP3 {
if options.Throughput != 0 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

Expand All @@ -509,26 +505,17 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
// If the volume modification isn't immediately completed, wait for it to finish
state := aws.StringValue(response.VolumeModification.ModificationState)
if !volumeModificationDone(state) {
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return 0, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}

// Perform one final check on the volume
return c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -1170,7 +1157,7 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func (c *cloud) checkDesiredState(ctx context.Context, volumeID string, desiredSizeGiB int64, options *ModifyDiskOptions) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
Expand All @@ -1182,15 +1169,23 @@ func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGi
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB := aws.Int64Value(volume.Size)
if oldSizeGiB >= newSizeGiB {
return oldSizeGiB, nil
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB)
realSizeGiB := aws.Int64Value(volume.Size)

if realSizeGiB < desiredSizeGiB {
return realSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, desiredSizeGiB)
} else if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to iops %d", volumeID, options.IOPS)
} else if options.VolumeType != "" && strings.ToLower(*volume.VolumeType) != strings.ToLower(options.VolumeType) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to type %q", volumeID, options.VolumeType)
} else if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to throughput %d", volumeID, options.Throughput)
}

return realSizeGiB, nil
}

// waitForVolumeSize waits for a volume modification to finish.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {
// waitForVolumeModification waits for a volume modification to finish.
func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) error {
backoff := wait.Backoff{
Duration: volumeModificationDuration,
Factor: volumeModificationWaitFactor,
Expand All @@ -1199,7 +1194,10 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {

waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
// Consider volumes that have never been modified as done
if err != nil && errors.Is(err, VolumeNotBeingModified) {
return true, nil
} else if err != nil {
return false, err
}

Expand Down Expand Up @@ -1271,49 +1269,71 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int64, options *ModifyDiskOptions) (bool, int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
return true, 0, err
}

oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)
latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}

if latestMod != nil && modFetchError == nil {
// latestMod can be nil if the volume has never been modified
if latestMod != nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
// If volume is already modifying, detour to waiting for it to modify
klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID)
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return true, oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
} else if state == ec2.VolumeModificationStateOptimizing {
return true, 0, fmt.Errorf("volume %q in OPTIMIZING state, cannot currently modify", volumeID)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
needsModification := false

if newSizeGiB != 0 && oldSizeGiB < newSizeGiB {
needsModification = true
}
if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
needsModification = true
}
if options.VolumeType != "" && strings.ToLower(*volume.VolumeType) != strings.ToLower(options.VolumeType) {
needsModification = true
}
if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
needsModification = true
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
if !needsModification {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
// state before the volume is actually finished modifying
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return true, oldSizeGiB, err
}
return oldSizeGiB, nil
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
}
return 0, nil

return true, 0, nil
}

func volumeModificationDone(state string) bool {
Expand Down
69 changes: 63 additions & 6 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
reqSizeGiB int64
modifyDiskOptions *ModifyDiskOptions
expErr error
shouldCallDescribe bool
}{
{
name: "success: normal resize",
Expand All @@ -1360,6 +1361,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: normal modifying state",
Expand Down Expand Up @@ -1388,6 +1390,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: with previous expansion",
Expand All @@ -1409,10 +1412,15 @@ func TestResizeOrModifyDisk(t *testing.T) {
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: modify IOPS, throughput and volume type",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
VolumeType: aws.String("gp2"),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
IOPS: 3000,
Expand All @@ -1428,6 +1436,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: modify size, IOPS, throughput and volume type",
Expand All @@ -1436,6 +1445,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp2"),
Iops: aws.Int64(2000),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
Expand All @@ -1454,6 +1465,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "fail: volume doesn't exist",
Expand Down Expand Up @@ -1489,9 +1501,44 @@ func TestResizeOrModifyDisk(t *testing.T) {
VolumeType: "GP2",
IOPS: 3000,
},
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp2"),
},
modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil),
expErr: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil),
},
{
name: "success: does not call ModifyVolume when no modification required",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp3"),
Iops: aws.Int64(3000),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
IOPS: 3000,
},
shouldCallDescribe: true,
},
{
name: "success: does not call ModifyVolume when no modification required (with size)",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
Size: aws.Int64(13),
Iops: aws.Int64(3000),
},
reqSizeGiB: 13,
modifyDiskOptions: &ModifyDiskOptions{
IOPS: 3000,
},
shouldCallDescribe: true,
},
}

for _, tc := range testCases {
Expand All @@ -1511,16 +1558,26 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
}, tc.existingVolumeError)

if tc.expErr == nil && aws.Int64Value(tc.existingVolume.Size) != tc.reqSizeGiB {
resizedVolume := &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(tc.reqSizeGiB),
AvailabilityZone: aws.String(defaultZone),
if tc.shouldCallDescribe {
newVolume := tc.existingVolume
if tc.reqSizeGiB != 0 {
newVolume.Size = aws.Int64(tc.reqSizeGiB)
}
if tc.modifyDiskOptions != nil {
if tc.modifyDiskOptions.IOPS != 0 {
newVolume.Iops = aws.Int64(int64(tc.modifyDiskOptions.IOPS))
}
if tc.modifyDiskOptions.Throughput != 0 {
newVolume.Throughput = aws.Int64(int64(tc.modifyDiskOptions.Throughput))
}
if tc.modifyDiskOptions.VolumeType != "" {
newVolume.VolumeType = aws.String(tc.modifyDiskOptions.VolumeType)
}
}
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
resizedVolume,
newVolume,
},
}, tc.existingVolumeError)
}
Expand Down

0 comments on commit 2ad0f0c

Please sign in to comment.