Skip to content

Commit

Permalink
Merge pull request #1744 from ConnorJC3/release-1.22
Browse files Browse the repository at this point in the history
Cherry pick of 0a1b70c to release-1.22
  • Loading branch information
k8s-ci-robot authored Sep 18, 2023
2 parents 97cb107 + 23df5cd commit ea48b74
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 74 deletions.
3 changes: 2 additions & 1 deletion hack/e2e/eksctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ function eksctl_create_cluster() {
--managed=true \
--ssh-access=false \
--cluster="${CLUSTER_NAME}" \
--node-ami-family=WindowsServer2022FullContainer \
--node-ami-family=WindowsServer2022CoreContainer \
--instance-types=m5.2xlarge \
-n ng-windows \
-m 3 \
-M 3 \
Expand Down
2 changes: 1 addition & 1 deletion hack/e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ K8S_VERSION_KOPS=${K8S_VERSION_KOPS:-${K8S_VERSION:-1.27.4}}
K8S_VERSION_EKSCTL=${K8S_VERSION_EKSCTL:-${K8S_VERSION:-1.27}}

KOPS_VERSION=${KOPS_VERSION:-1.27.0}
KOPS_STATE_FILE=${KOPS_STATE_FILE:-s3://k8s-kops-csi-e2e}
KOPS_STATE_FILE=${KOPS_STATE_FILE:-s3://k8s-kops-csi-shared-e2e}
KOPS_PATCH_FILE=${KOPS_PATCH_FILE:-./hack/kops-patch.yaml}
KOPS_PATCH_NODE_FILE=${KOPS_PATCH_NODE_FILE:-./hack/kops-patch-node.yaml}
AMI_ID=$(aws ssm get-parameters --names /aws/service/ami-amazon-linux-latest/al2023-ami-kernel-default-x86_64 --region ${REGION} --query 'Parameters[0].Value' --output text)
Expand Down
137 changes: 82 additions & 55 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,14 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB)
if err != nil || volumeSize != 0 {
return volumeSize, err
}
needsModification, volumeSize, err := c.validateModifyVolume(ctx, volumeID, newSizeGiB, options)
if err != nil || !needsModification {
return volumeSize, err
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
Expand All @@ -500,7 +496,7 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.VolumeType == VolumeTypeGP3 {
if options.Throughput != 0 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

Expand All @@ -509,26 +505,17 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
// If the volume modification isn't immediately completed, wait for it to finish
state := aws.StringValue(response.VolumeModification.ModificationState)
if !volumeModificationDone(state) {
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return 0, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}

// Perform one final check on the volume
return c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -1170,7 +1157,7 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func (c *cloud) checkDesiredState(ctx context.Context, volumeID string, desiredSizeGiB int64, options *ModifyDiskOptions) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
Expand All @@ -1182,15 +1169,25 @@ func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGi
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB := aws.Int64Value(volume.Size)
if oldSizeGiB >= newSizeGiB {
return oldSizeGiB, nil
realSizeGiB := aws.Int64Value(volume.Size)

// Check if there is a mismatch between the requested modification and the current volume
// If there is, the volume is still modifying and we should not return a success
if realSizeGiB < desiredSizeGiB {
return realSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, desiredSizeGiB)
} else if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to iops %d", volumeID, options.IOPS)
} else if options.VolumeType != "" && !strings.EqualFold(*volume.VolumeType, options.VolumeType) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to type %q", volumeID, options.VolumeType)
} else if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to throughput %d", volumeID, options.Throughput)
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB)

return realSizeGiB, nil
}

// waitForVolumeSize waits for a volume modification to finish.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {
// waitForVolumeModification waits for a volume modification to finish.
func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) error {
backoff := wait.Backoff{
Duration: volumeModificationDuration,
Factor: volumeModificationWaitFactor,
Expand All @@ -1199,7 +1196,10 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {

waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
// Consider volumes that have never been modified as done
if err != nil && errors.Is(err, VolumeNotBeingModified) {
return true, nil
} else if err != nil {
return false, err
}

Expand Down Expand Up @@ -1271,49 +1271,76 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func needsVolumeModification(volume *ec2.Volume, newSizeGiB int64, options *ModifyDiskOptions) bool {
oldSizeGiB := aws.Int64Value(volume.Size)
needsModification := false

if oldSizeGiB < newSizeGiB {
needsModification = true
}
if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
needsModification = true
}
if options.VolumeType != "" && !strings.EqualFold(*volume.VolumeType, options.VolumeType) {
needsModification = true
}
if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
needsModification = true
}

return needsModification
}

func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int64, options *ModifyDiskOptions) (bool, int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
return true, 0, err
}

oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)
latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}

if latestMod != nil && modFetchError == nil {
// latestMod can be nil if the volume has never been modified
if latestMod != nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
// If volume is already modifying, detour to waiting for it to modify
klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID)
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return true, oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
} else if state == ec2.VolumeModificationStateOptimizing {
return true, 0, fmt.Errorf("volume %q in OPTIMIZING state, cannot currently modify", volumeID)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
if !needsVolumeModification(volume, newSizeGiB, options) {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
// state before the volume is actually finished modifying
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return true, oldSizeGiB, err
}
return oldSizeGiB, nil
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
}
return 0, nil

return true, 0, nil
}

func volumeModificationDone(state string) bool {
Expand Down
Loading

0 comments on commit ea48b74

Please sign in to comment.