Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick of 0a1b70c792226d5ac31b33b6f21eee25a5bc24b1 to release-1.22 #1744

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hack/e2e/eksctl.sh
Original file line number Diff line number Diff line change
@@ -67,7 +67,8 @@ function eksctl_create_cluster() {
--managed=true \
--ssh-access=false \
--cluster="${CLUSTER_NAME}" \
--node-ami-family=WindowsServer2022FullContainer \
--node-ami-family=WindowsServer2022CoreContainer \
--instance-types=m5.2xlarge \
-n ng-windows \
-m 3 \
-M 3 \
2 changes: 1 addition & 1 deletion hack/e2e/run.sh
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ K8S_VERSION_KOPS=${K8S_VERSION_KOPS:-${K8S_VERSION:-1.27.4}}
K8S_VERSION_EKSCTL=${K8S_VERSION_EKSCTL:-${K8S_VERSION:-1.27}}

KOPS_VERSION=${KOPS_VERSION:-1.27.0}
KOPS_STATE_FILE=${KOPS_STATE_FILE:-s3://k8s-kops-csi-e2e}
KOPS_STATE_FILE=${KOPS_STATE_FILE:-s3://k8s-kops-csi-shared-e2e}
KOPS_PATCH_FILE=${KOPS_PATCH_FILE:-./hack/kops-patch.yaml}
KOPS_PATCH_NODE_FILE=${KOPS_PATCH_NODE_FILE:-./hack/kops-patch-node.yaml}
AMI_ID=$(aws ssm get-parameters --names /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-default-x86_64 --region ${REGION} --query 'Parameters[0].Value' --output text)
137 changes: 82 additions & 55 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
@@ -479,18 +479,14 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB)
if err != nil || volumeSize != 0 {
return volumeSize, err
}
needsModification, volumeSize, err := c.validateModifyVolume(ctx, volumeID, newSizeGiB, options)
if err != nil || !needsModification {
return volumeSize, err
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
@@ -500,7 +496,7 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.VolumeType == VolumeTypeGP3 {
if options.Throughput != 0 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

@@ -509,26 +505,17 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
// If the volume modification isn't immediately completed, wait for it to finish
state := aws.StringValue(response.VolumeModification.ModificationState)
if !volumeModificationDone(state) {
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return 0, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}

// Perform one final check on the volume
return c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
@@ -1170,7 +1157,7 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func (c *cloud) checkDesiredState(ctx context.Context, volumeID string, desiredSizeGiB int64, options *ModifyDiskOptions) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
@@ -1182,15 +1169,25 @@ func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGi
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB := aws.Int64Value(volume.Size)
if oldSizeGiB >= newSizeGiB {
return oldSizeGiB, nil
realSizeGiB := aws.Int64Value(volume.Size)

// Check if there is a mismatch between the requested modification and the current volume
// If there is, the volume is still modifying and we should not return a success
if realSizeGiB < desiredSizeGiB {
return realSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, desiredSizeGiB)
} else if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to iops %d", volumeID, options.IOPS)
} else if options.VolumeType != "" && !strings.EqualFold(*volume.VolumeType, options.VolumeType) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to type %q", volumeID, options.VolumeType)
} else if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to throughput %d", volumeID, options.Throughput)
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB)

return realSizeGiB, nil
}

// waitForVolumeSize waits for a volume modification to finish.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {
// waitForVolumeModification waits for a volume modification to finish.
func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) error {
backoff := wait.Backoff{
Duration: volumeModificationDuration,
Factor: volumeModificationWaitFactor,
@@ -1199,7 +1196,10 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {

waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
// Consider volumes that have never been modified as done
if err != nil && errors.Is(err, VolumeNotBeingModified) {
return true, nil
} else if err != nil {
return false, err
}

@@ -1271,49 +1271,76 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func needsVolumeModification(volume *ec2.Volume, newSizeGiB int64, options *ModifyDiskOptions) bool {
oldSizeGiB := aws.Int64Value(volume.Size)
needsModification := false

if oldSizeGiB < newSizeGiB {
needsModification = true
}
if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
needsModification = true
}
if options.VolumeType != "" && !strings.EqualFold(*volume.VolumeType, options.VolumeType) {
needsModification = true
}
if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
needsModification = true
}

return needsModification
}

func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int64, options *ModifyDiskOptions) (bool, int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
return true, 0, err
}

oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)
latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}

if latestMod != nil && modFetchError == nil {
// latestMod can be nil if the volume has never been modified
if latestMod != nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
// If volume is already modifying, detour to waiting for it to modify
klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID)
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return true, oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
} else if state == ec2.VolumeModificationStateOptimizing {
return true, 0, fmt.Errorf("volume %q in OPTIMIZING state, cannot currently modify", volumeID)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
if !needsVolumeModification(volume, newSizeGiB, options) {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
// state before the volume is actually finished modifying
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return true, oldSizeGiB, err
}
return oldSizeGiB, nil
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
}
return 0, nil

return true, 0, nil
}

func volumeModificationDone(state string) bool {
91 changes: 74 additions & 17 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
@@ -1341,6 +1341,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
reqSizeGiB int64
modifyDiskOptions *ModifyDiskOptions
expErr error
shouldCallDescribe bool
}{
{
name: "success: normal resize",
@@ -1357,9 +1358,10 @@ func TestResizeOrModifyDisk(t *testing.T) {
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: normal modifying state",
@@ -1385,9 +1387,10 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
},
},
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: with previous expansion",
@@ -1406,13 +1409,18 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
},
},
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: modify IOPS, throughput and volume type",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
VolumeType: aws.String("gp2"),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
IOPS: 3000,
@@ -1427,7 +1435,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
expErr: nil,
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: modify size, IOPS, throughput and volume type",
@@ -1436,6 +1445,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp2"),
Iops: aws.Int64(2000),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
@@ -1453,7 +1464,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
expErr: nil,
expErr: nil,
shouldCallDescribe: true,
},
{
name: "fail: volume doesn't exist",
@@ -1489,9 +1501,44 @@ func TestResizeOrModifyDisk(t *testing.T) {
VolumeType: "GP2",
IOPS: 3000,
},
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp2"),
},
modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil),
expErr: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil),
},
{
name: "success: does not call ModifyVolume when no modification required",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp3"),
Iops: aws.Int64(3000),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
IOPS: 3000,
},
shouldCallDescribe: true,
},
{
name: "success: does not call ModifyVolume when no modification required (with size)",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
Size: aws.Int64(13),
Iops: aws.Int64(3000),
},
reqSizeGiB: 13,
modifyDiskOptions: &ModifyDiskOptions{
IOPS: 3000,
},
shouldCallDescribe: true,
},
}

for _, tc := range testCases {
@@ -1511,16 +1558,26 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
}, tc.existingVolumeError)

if tc.expErr == nil && aws.Int64Value(tc.existingVolume.Size) != tc.reqSizeGiB {
resizedVolume := &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(tc.reqSizeGiB),
AvailabilityZone: aws.String(defaultZone),
if tc.shouldCallDescribe {
newVolume := tc.existingVolume
if tc.reqSizeGiB != 0 {
newVolume.Size = aws.Int64(tc.reqSizeGiB)
}
if tc.modifyDiskOptions != nil {
if tc.modifyDiskOptions.IOPS != 0 {
newVolume.Iops = aws.Int64(int64(tc.modifyDiskOptions.IOPS))
}
if tc.modifyDiskOptions.Throughput != 0 {
newVolume.Throughput = aws.Int64(int64(tc.modifyDiskOptions.Throughput))
}
if tc.modifyDiskOptions.VolumeType != "" {
newVolume.VolumeType = aws.String(tc.modifyDiskOptions.VolumeType)
}
}
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
resizedVolume,
newVolume,
},
}, tc.existingVolumeError)
}