Skip to content

Commit

Permalink
Merge pull request #1889 from ConnorJC3/remember-bad-block-device-names
Browse files Browse the repository at this point in the history
Remember likely bad device names for an hour
  • Loading branch information
k8s-ci-robot authored Jan 18, 2024
2 parents 889b1e4 + 872f2d4 commit 80d1adf
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 31 deletions.
69 changes: 65 additions & 4 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -162,9 +163,6 @@ var (
// ErrAlreadyExists is returned when a resource is already existent.
ErrAlreadyExists = errors.New("Resource already exists")

// ErrVolumeInUse is returned when a volume is already attached to an instance.
ErrVolumeInUse = errors.New("Request volume is already attached to an instance")

// ErrMultiSnapshots is returned when multiple snapshots are found
// with the same ID
ErrMultiSnapshots = errors.New("Multiple snapshots with the same name found")
Expand Down Expand Up @@ -686,13 +684,34 @@ func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
return true, nil
}

// Node likely bad device names cache
// Remember device names that are already in use on an instance and use them last when attaching volumes
// This works around device names that are used but do not appear in the mapping from DescribeInstanceStatus
const cacheForgetDelay = 1 * time.Hour

type cachedNode struct {
timer *time.Timer
likelyBadNames map[string]struct{}
}

var cacheMutex sync.Mutex
var nodeDeviceCache map[string]cachedNode = map[string]cachedNode{}

func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return "", err
}

device, err := c.dm.NewDevice(instance, volumeID)
likelyBadNames := map[string]struct{}{}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
likelyBadNames = node.likelyBadNames
node.timer.Reset(cacheForgetDelay)
}
cacheMutex.Unlock()

device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames)
if err != nil {
return "", err
}
Expand All @@ -707,8 +726,38 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string

resp, attachErr := c.ec2.AttachVolumeWithContext(ctx, request)
if attachErr != nil {
if isAWSErrorBlockDeviceInUse(attachErr) {
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Node already had existing cached bad names, add on to the list
node.likelyBadNames[device.Path] = struct{}{}
node.timer.Reset(cacheForgetDelay)
} else {
// Node has no existing cached bad device names, setup a new struct instance
nodeDeviceCache[nodeID] = cachedNode{
timer: time.AfterFunc(cacheForgetDelay, func() {
// If this ever fires, the node has not had a volume attached for an hour
// In order to prevent a semi-permanent memory leak, delete it from the map
cacheMutex.Lock()
delete(nodeDeviceCache, nodeID)
cacheMutex.Unlock()
}),
likelyBadNames: map[string]struct{}{
device.Path: {},
},
}
}
cacheMutex.Unlock()
}
return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr)
}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Remove succesfully attached devices from the "likely bad" list
delete(node.likelyBadNames, device.Path)
node.timer.Reset(cacheForgetDelay)
}
cacheMutex.Unlock()
klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp)
}

Expand Down Expand Up @@ -1291,6 +1340,18 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
return isAWSError(err, "IdempotentParameterMismatch")
}

// isAWSErrorIdempotentParameterMismatch returns a boolean indicating whether the
// given error appears to be a block device name already in use error.
func isAWSErrorBlockDeviceInUse(err error) bool {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
if awsErr.Code() == "InvalidParameterValue" && strings.Contains(awsErr.Message(), "already in use") {
return true
}
}
return false
}

// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
Expand Down
68 changes: 57 additions & 11 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -917,14 +918,17 @@ func TestDeleteDisk(t *testing.T) {
}

func TestAttachDisk(t *testing.T) {
blockDeviceInUseErr := awserr.New("InvalidParameterValue", "Invalid value '"+defaultPath+"' for unixDevice. Attachment point "+defaultPath+" is already in use", nil)

testCases := []struct {
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
validateFunc func(t *testing.T)
}{
{
name: "success: AttachVolume normal",
Expand All @@ -944,6 +948,33 @@ func TestAttachDisk(t *testing.T) {
)
},
},
{
name: "success: AttachVolume skip likely bad name",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
path: "/dev/xvdab",
expErr: nil,
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
volumeRequest := createVolumeRequest(volumeID)
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)

gomock.InOrder(
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolumeWithContext(gomock.Any(), attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path), nil),
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil),
)

nodeDeviceCache = map[string]cachedNode{
defaultNodeID: {
timer: time.NewTimer(1 * time.Hour),
likelyBadNames: map[string]struct{}{
defaultPath: {},
},
},
}
},
},
{
name: "success: AttachVolume device already assigned",
volumeID: defaultVolumeID,
Expand All @@ -955,7 +986,7 @@ func TestAttachDisk(t *testing.T) {
instanceRequest := createInstanceRequest(nodeID)

fakeInstance := newFakeInstance(nodeID, volumeID, path)
_, err := dm.NewDevice(fakeInstance, volumeID)
_, err := dm.NewDevice(fakeInstance, volumeID, map[string]struct{}{})
assert.NoError(t, err)

gomock.InOrder(
Expand All @@ -978,22 +1009,30 @@ func TestAttachDisk(t *testing.T) {
mockEC2.EXPECT().AttachVolumeWithContext(gomock.Any(), attachRequest).Return(nil, errors.New("AttachVolume error")),
)
},
validateFunc: func(t *testing.T) {
assert.NotContains(t, nodeDeviceCache, defaultNodeID)
},
},
{
name: "fail: AttachVolume returned error volumeInUse",
name: "fail: AttachVolume returned block device already in use error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
path: defaultPath,
expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, ErrVolumeInUse),
expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, blockDeviceInUseErr),
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)

gomock.InOrder(
mockEC2.EXPECT().DescribeInstancesWithContext(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(nil, ErrVolumeInUse),
mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(nil, blockDeviceInUseErr),
)
},
validateFunc: func(t *testing.T) {
assert.Contains(t, nodeDeviceCache, defaultNodeID)
assert.NotNil(t, nodeDeviceCache[defaultNodeID].timer)
assert.Contains(t, nodeDeviceCache[defaultNodeID].likelyBadNames, defaultPath)
},
},
{
name: "success: AttachVolume multi-attach",
Expand Down Expand Up @@ -1045,6 +1084,9 @@ func TestAttachDisk(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Reset node likely bad names cache
nodeDeviceCache = map[string]cachedNode{}

mockCtrl := gomock.NewController(t)
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
Expand All @@ -1070,6 +1112,10 @@ func TestAttachDisk(t *testing.T) {
assert.Equal(t, tc.path, devicePath)
}

if tc.validateFunc != nil {
tc.validateFunc(t)
}

mockCtrl.Finish()
})
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/cloud/devicemanager/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ExistingNames map[string]string
// call), so all available device names are used eventually and it minimizes
// device name reuse.
type NameAllocator interface {
GetNext(existingNames ExistingNames) (name string, err error)
GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (name string, err error)
}

type nameAllocator struct{}
Expand All @@ -43,9 +43,19 @@ var _ NameAllocator = &nameAllocator{}

// GetNext returns a free device name or error when there is no free device name
// It does this by using a list of legal EBS device names from device_names.go
func (d *nameAllocator) GetNext(existingNames ExistingNames) (string, error) {
//
// likelyBadNames is a map of names that have previously returned an "in use" error when attempting to mount to them
// These names are unlikely to result in a successful mount, and may be permanently unavailable, so use them last
func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (string, error) {
for _, name := range deviceNames {
if _, found := existingNames[name]; !found {
_, existing := existingNames[name]
_, likelyBad := likelyBadNames[name]
if !existing && !likelyBad {
return name, nil
}
}
for name := range likelyBadNames {
if _, existing := existingNames[name]; !existing {
return name, nil
}
}
Expand Down
35 changes: 32 additions & 3 deletions pkg/cloud/devicemanager/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestNameAllocator(t *testing.T) {

for _, name := range deviceNames {
t.Run(name, func(t *testing.T) {
actual, err := allocator.GetNext(existingNames)
actual, err := allocator.GetNext(existingNames, map[string]struct{}{})
if err != nil {
t.Errorf("test %q: unexpected error: %v", name, err)
}
Expand All @@ -38,15 +38,44 @@ func TestNameAllocator(t *testing.T) {
}
}

func TestNameAllocatorLikelyBadName(t *testing.T) {
skippedName := deviceNames[32]
existingNames := map[string]string{}
allocator := nameAllocator{}

for _, name := range deviceNames {
if name == skippedName {
// Name in likelyBadNames should be skipped until it is the last available name
continue
}

t.Run(name, func(t *testing.T) {
actual, err := allocator.GetNext(existingNames, map[string]struct{}{skippedName: {}})
if err != nil {
t.Errorf("test %q: unexpected error: %v", name, err)
}
if actual != name {
t.Errorf("test %q: expected %q, got %q", name, name, actual)
}
existingNames[actual] = ""
})
}

lastName, _ := allocator.GetNext(existingNames, map[string]struct{}{skippedName: {}})
if lastName != skippedName {
t.Errorf("test %q: expected %q, got %q (likelyBadNames fallback)", skippedName, skippedName, lastName)
}
}

func TestNameAllocatorError(t *testing.T) {
allocator := nameAllocator{}
existingNames := map[string]string{}

for i := 0; i < len(deviceNames); i++ {
name, _ := allocator.GetNext(existingNames)
name, _ := allocator.GetNext(existingNames, map[string]struct{}{})
existingNames[name] = ""
}
name, err := allocator.GetNext(existingNames)
name, err := allocator.GetNext(existingNames, map[string]struct{}{})
if err == nil {
t.Errorf("expected error, got device %q", name)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cloud/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type DeviceManager interface {
// NewDevice retrieves the device if the device is already assigned.
// Otherwise it creates a new device with next available device name
// and mark it as unassigned device.
NewDevice(instance *ec2.Instance, volumeID string) (device *Device, err error)
NewDevice(instance *ec2.Instance, volumeID string, likelyBadNames map[string]struct{}) (device *Device, err error)

// GetDevice returns the device already assigned to the volume.
GetDevice(instance *ec2.Instance, volumeID string) (device *Device, err error)
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewDeviceManager() DeviceManager {
}
}

func (d *deviceManager) NewDevice(instance *ec2.Instance, volumeID string) (*Device, error) {
func (d *deviceManager) NewDevice(instance *ec2.Instance, volumeID string, likelyBadNames map[string]struct{}) (*Device, error) {
d.mux.Lock()
defer d.mux.Unlock()

Expand All @@ -124,7 +124,7 @@ func (d *deviceManager) NewDevice(instance *ec2.Instance, volumeID string) (*Dev
return nil, err
}

name, err := d.nameAllocator.GetNext(inUse)
name, err := d.nameAllocator.GetNext(inUse, likelyBadNames)
if err != nil {
return nil, fmt.Errorf("could not get a free device name to assign to node %s", nodeID)
}
Expand Down
Loading

0 comments on commit 80d1adf

Please sign in to comment.