Skip to content

Commit

Permalink
Merge pull request #2075 from ConnorJC3/fix-createvolume-idempotency-…
Browse files Browse the repository at this point in the history
…race-condition

Use new client token when `CreateVolume` returns `IdempotentParameterMismatch`
  • Loading branch information
k8s-ci-robot authored Jul 8, 2024
2 parents 3ea9041 + 0a0d5ef commit c257e5b
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 130 deletions.
114 changes: 52 additions & 62 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/aws/smithy-go"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/batcher"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/expiringcache"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -91,6 +93,7 @@ var (
const (
volumeDetachedState = "detached"
volumeAttachedState = "attached"
cacheForgetDelay = 1 * time.Hour
)

// AWS provisioning limits.
Expand Down Expand Up @@ -310,12 +313,14 @@ type batcherManager struct {
}

type cloud struct {
region string
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
region string
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
likelyBadDeviceNames expiringcache.ExpiringCache[string, sync.Map]
latestClientTokens expiringcache.ExpiringCache[string, int]
}

var _ Cloud = &cloud{}
Expand Down Expand Up @@ -364,12 +369,14 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
}

return &cloud{
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: newRetryManager(),
vwp: vwp,
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: newRetryManager(),
vwp: vwp,
likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay),
latestClientTokens: expiringcache.New[string, int](cacheForgetDelay),
}
}

Expand Down Expand Up @@ -586,8 +593,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
}
}

// We hash the volume name to generate a unique token that is less than or equal to 64 characters
clientToken := sha256.Sum256([]byte(volumeName))
// The first client token used for any volume is the volume name as provided via CSI
// However, if a volume fails to create asyncronously (that is, the CreateVolume call
// succeeds but the volume ultimately fails to create), the client token is burned until
// EC2 forgets about its use (measured as 12 hours under normal conditions)
//
// To prevent becoming stuck for 12 hours when this occurs, we sequentially append "-2",
// "-3", "-4", etc to the volume name before hashing on the subsequent attempt after a
// volume fails to create because of an IdempotentParameterMismatch AWS error
// The most recent appended value is stored in an expiring cache to prevent memory leaks
tokenBase := volumeName
if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok {
tokenBase += "-" + strconv.Itoa(*tokenNumber)
}

// We use a sha256 hash to guarantee the token that is less than or equal to 64 characters
clientToken := sha256.Sum256([]byte(tokenBase))

requestInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
Expand Down Expand Up @@ -630,6 +651,11 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, ErrNotFound
}
if isAWSErrorIdempotentParameterMismatch(err) {
nextTokenNumber := 2
if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok {
nextTokenNumber = *tokenNumber + 1
}
c.latestClientTokens.Set(volumeName, &nextTokenNumber)
return nil, ErrIdempotentParameterMismatch
}
return nil, fmt.Errorf("could not create volume in EC2: %w", err)
Expand Down Expand Up @@ -847,34 +873,19 @@ func (c *cloud) batchDescribeInstances(request *ec2.DescribeInstancesInput) (*ty
return r.Result, nil
}

// Node likely bad device names cache
// Remember device names that are already in use on an instance and use them last when attaching volumes
// This works around device names that are used but do not appear in the mapping from DescribeInstanceStatus
const cacheForgetDelay = 1 * time.Hour

type cachedNode struct {
timer *time.Timer
likelyBadNames map[string]struct{}
}

var cacheMutex sync.Mutex
var nodeDeviceCache map[string]cachedNode = map[string]cachedNode{}

func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return "", err
}

likelyBadNames := map[string]struct{}{}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
likelyBadNames = node.likelyBadNames
node.timer.Reset(cacheForgetDelay)
likelyBadDeviceNames, ok := c.likelyBadDeviceNames.Get(nodeID)
if !ok {
likelyBadDeviceNames = new(sync.Map)
c.likelyBadDeviceNames.Set(nodeID, likelyBadDeviceNames)
}
cacheMutex.Unlock()

device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames)
device, err := c.dm.NewDevice(instance, volumeID, likelyBadDeviceNames)
if err != nil {
return "", err
}
Expand All @@ -892,37 +903,16 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
})
if attachErr != nil {
if isAWSErrorBlockDeviceInUse(attachErr) {
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Node already had existing cached bad names, add on to the list
node.likelyBadNames[device.Path] = struct{}{}
node.timer.Reset(cacheForgetDelay)
} else {
// Node has no existing cached bad device names, setup a new struct instance
nodeDeviceCache[nodeID] = cachedNode{
timer: time.AfterFunc(cacheForgetDelay, func() {
// If this ever fires, the node has not had a volume attached for an hour
// In order to prevent a semi-permanent memory leak, delete it from the map
cacheMutex.Lock()
delete(nodeDeviceCache, nodeID)
cacheMutex.Unlock()
}),
likelyBadNames: map[string]struct{}{
device.Path: {},
},
}
}
cacheMutex.Unlock()
// If block device is "in use", that likely indicates a bad name that is in use by a block
// device that we do not know about (example: block devices attached in the AMI, which are
// not reported in DescribeInstance's block device map)
//
// Store such bad names in the "likely bad" map to be considered last in future attempts
likelyBadDeviceNames.Store(device.Path, struct{}{})
}
return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr)
}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Remove succesfully attached devices from the "likely bad" list
delete(node.likelyBadNames, device.Path)
node.timer.Reset(cacheForgetDelay)
}
cacheMutex.Unlock()
likelyBadDeviceNames.Delete(device.Path)
klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp)
}

Expand Down
Loading

0 comments on commit c257e5b

Please sign in to comment.