From 872f2d48885eb3e3aef37b758601940fc560880e Mon Sep 17 00:00:00 2001 From: Connor Catlett Date: Tue, 9 Jan 2024 20:31:23 +0000 Subject: [PATCH] Remember likely bad device names for an hour Signed-off-by: Connor Catlett --- pkg/cloud/cloud.go | 69 +++++++++++++++++++++-- pkg/cloud/cloud_test.go | 68 ++++++++++++++++++---- pkg/cloud/devicemanager/allocator.go | 16 +++++- pkg/cloud/devicemanager/allocator_test.go | 35 +++++++++++- pkg/cloud/devicemanager/manager.go | 6 +- pkg/cloud/devicemanager/manager_test.go | 14 ++--- 6 files changed, 177 insertions(+), 31 deletions(-) diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 178c0e722a..761c124778 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -24,6 +24,7 @@ import ( "fmt" "os" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -162,9 +163,6 @@ var ( // ErrAlreadyExists is returned when a resource is already existent. ErrAlreadyExists = errors.New("Resource already exists") - // ErrVolumeInUse is returned when a volume is already attached to an instance. - ErrVolumeInUse = errors.New("Request volume is already attached to an instance") - // ErrMultiSnapshots is returned when multiple snapshots are found // with the same ID ErrMultiSnapshots = errors.New("Multiple snapshots with the same name found") @@ -689,13 +687,34 @@ func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { return true, nil } +// Node likely bad device names cache +// Remember device names that are already in use on an instance and use them last when attaching volumes +// This works around device names that are used but do not appear in the mapping from DescribeInstanceStatus +const cacheForgetDelay = 1 * time.Hour + +type cachedNode struct { + timer *time.Timer + likelyBadNames map[string]struct{} +} + +var cacheMutex sync.Mutex +var nodeDeviceCache map[string]cachedNode = map[string]cachedNode{} + func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) { instance, err := c.getInstance(ctx, nodeID) if err != nil { return "", err } - device, err := c.dm.NewDevice(instance, volumeID) + likelyBadNames := map[string]struct{}{} + cacheMutex.Lock() + if node, ok := nodeDeviceCache[nodeID]; ok { + likelyBadNames = node.likelyBadNames + node.timer.Reset(cacheForgetDelay) + } + cacheMutex.Unlock() + + device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames) if err != nil { return "", err } @@ -710,8 +729,38 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string resp, attachErr := c.ec2.AttachVolumeWithContext(ctx, request) if attachErr != nil { + if isAWSErrorBlockDeviceInUse(attachErr) { + cacheMutex.Lock() + if node, ok := nodeDeviceCache[nodeID]; ok { + // Node already had existing cached bad names, add on to the list + node.likelyBadNames[device.Path] = struct{}{} + node.timer.Reset(cacheForgetDelay) + } else { + // Node has no existing cached bad device names, setup a new struct instance + nodeDeviceCache[nodeID] = cachedNode{ + timer: time.AfterFunc(cacheForgetDelay, func() { + // If this ever fires, the node has not had a volume attached for an hour + // In order to prevent a semi-permanent memory leak, delete it from the map + cacheMutex.Lock() + delete(nodeDeviceCache, nodeID) + cacheMutex.Unlock() + }), + likelyBadNames: map[string]struct{}{ + device.Path: {}, + }, + } + } + cacheMutex.Unlock() + } return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr) } + cacheMutex.Lock() + if node, ok := nodeDeviceCache[nodeID]; ok { + // Remove succesfully attached devices from the "likely bad" list + delete(node.likelyBadNames, device.Path) + node.timer.Reset(cacheForgetDelay) + } + cacheMutex.Unlock() klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp) } @@ -1294,6 +1343,18 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool { return isAWSError(err, "IdempotentParameterMismatch") } +// isAWSErrorIdempotentParameterMismatch returns a boolean indicating whether the +// given error appears to be a block device name already in use error. +func isAWSErrorBlockDeviceInUse(err error) bool { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + if awsErr.Code() == "InvalidParameterValue" && strings.Contains(awsErr.Message(), "already in use") { + return true + } + } + return false +} + // Checks for desired size on volume by also verifying volume size by describing volume. // This is to get around potential eventual consistency problems with describing volume modifications // objects and ensuring that we read two different objects to verify volume state. diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 806ee9845e..36bc4dfd5f 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -951,14 +952,17 @@ func TestDeleteDisk(t *testing.T) { } func TestAttachDisk(t *testing.T) { + blockDeviceInUseErr := awserr.New("InvalidParameterValue", "Invalid value '"+defaultPath+"' for unixDevice. Attachment point "+defaultPath+" is already in use", nil) + testCases := []struct { - name string - volumeID string - nodeID string - nodeID2 string - path string - expErr error - mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager) + name string + volumeID string + nodeID string + nodeID2 string + path string + expErr error + mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager) + validateFunc func(t *testing.T) }{ { name: "success: AttachVolume normal", @@ -978,6 +982,33 @@ func TestAttachDisk(t *testing.T) { ) }, }, + { + name: "success: AttachVolume skip likely bad name", + volumeID: defaultVolumeID, + nodeID: defaultNodeID, + path: "/dev/xvdab", + expErr: nil, + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { + volumeRequest := createVolumeRequest(volumeID) + instanceRequest := createInstanceRequest(nodeID) + attachRequest := createAttachRequest(volumeID, nodeID, path) + + gomock.InOrder( + mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), + mockEC2.EXPECT().AttachVolumeWithContext(gomock.Any(), attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path), nil), + mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil), + ) + + nodeDeviceCache = map[string]cachedNode{ + defaultNodeID: { + timer: time.NewTimer(1 * time.Hour), + likelyBadNames: map[string]struct{}{ + defaultPath: {}, + }, + }, + } + }, + }, { name: "success: AttachVolume device already assigned", volumeID: defaultVolumeID, @@ -989,7 +1020,7 @@ func TestAttachDisk(t *testing.T) { instanceRequest := createInstanceRequest(nodeID) fakeInstance := newFakeInstance(nodeID, volumeID, path) - _, err := dm.NewDevice(fakeInstance, volumeID) + _, err := dm.NewDevice(fakeInstance, volumeID, map[string]struct{}{}) assert.NoError(t, err) gomock.InOrder( @@ -1012,22 +1043,30 @@ func TestAttachDisk(t *testing.T) { mockEC2.EXPECT().AttachVolumeWithContext(gomock.Any(), attachRequest).Return(nil, errors.New("AttachVolume error")), ) }, + validateFunc: func(t *testing.T) { + assert.NotContains(t, nodeDeviceCache, defaultNodeID) + }, }, { - name: "fail: AttachVolume returned error volumeInUse", + name: "fail: AttachVolume returned block device already in use error", volumeID: defaultVolumeID, nodeID: defaultNodeID, path: defaultPath, - expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, ErrVolumeInUse), + expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, blockDeviceInUseErr), mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) gomock.InOrder( mockEC2.EXPECT().DescribeInstancesWithContext(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(nil, ErrVolumeInUse), + mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(nil, blockDeviceInUseErr), ) }, + validateFunc: func(t *testing.T) { + assert.Contains(t, nodeDeviceCache, defaultNodeID) + assert.NotNil(t, nodeDeviceCache[defaultNodeID].timer) + assert.Contains(t, nodeDeviceCache[defaultNodeID].likelyBadNames, defaultPath) + }, }, { name: "success: AttachVolume multi-attach", @@ -1079,6 +1118,9 @@ func TestAttachDisk(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + // Reset node likely bad names cache + nodeDeviceCache = map[string]cachedNode{} + mockCtrl := gomock.NewController(t) mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) @@ -1104,6 +1146,10 @@ func TestAttachDisk(t *testing.T) { assert.Equal(t, tc.path, devicePath) } + if tc.validateFunc != nil { + tc.validateFunc(t) + } + mockCtrl.Finish() }) } diff --git a/pkg/cloud/devicemanager/allocator.go b/pkg/cloud/devicemanager/allocator.go index 911a64efea..52ebb7829c 100644 --- a/pkg/cloud/devicemanager/allocator.go +++ b/pkg/cloud/devicemanager/allocator.go @@ -34,7 +34,7 @@ type ExistingNames map[string]string // call), so all available device names are used eventually and it minimizes // device name reuse. type NameAllocator interface { - GetNext(existingNames ExistingNames) (name string, err error) + GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (name string, err error) } type nameAllocator struct{} @@ -43,9 +43,19 @@ var _ NameAllocator = &nameAllocator{} // GetNext returns a free device name or error when there is no free device name // It does this by using a list of legal EBS device names from device_names.go -func (d *nameAllocator) GetNext(existingNames ExistingNames) (string, error) { +// +// likelyBadNames is a map of names that have previously returned an "in use" error when attempting to mount to them +// These names are unlikely to result in a successful mount, and may be permanently unavailable, so use them last +func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (string, error) { for _, name := range deviceNames { - if _, found := existingNames[name]; !found { + _, existing := existingNames[name] + _, likelyBad := likelyBadNames[name] + if !existing && !likelyBad { + return name, nil + } + } + for name := range likelyBadNames { + if _, existing := existingNames[name]; !existing { return name, nil } } diff --git a/pkg/cloud/devicemanager/allocator_test.go b/pkg/cloud/devicemanager/allocator_test.go index eae46d9001..d1b3f8082e 100644 --- a/pkg/cloud/devicemanager/allocator_test.go +++ b/pkg/cloud/devicemanager/allocator_test.go @@ -26,7 +26,7 @@ func TestNameAllocator(t *testing.T) { for _, name := range deviceNames { t.Run(name, func(t *testing.T) { - actual, err := allocator.GetNext(existingNames) + actual, err := allocator.GetNext(existingNames, map[string]struct{}{}) if err != nil { t.Errorf("test %q: unexpected error: %v", name, err) } @@ -38,15 +38,44 @@ func TestNameAllocator(t *testing.T) { } } +func TestNameAllocatorLikelyBadName(t *testing.T) { + skippedName := deviceNames[32] + existingNames := map[string]string{} + allocator := nameAllocator{} + + for _, name := range deviceNames { + if name == skippedName { + // Name in likelyBadNames should be skipped until it is the last available name + continue + } + + t.Run(name, func(t *testing.T) { + actual, err := allocator.GetNext(existingNames, map[string]struct{}{skippedName: {}}) + if err != nil { + t.Errorf("test %q: unexpected error: %v", name, err) + } + if actual != name { + t.Errorf("test %q: expected %q, got %q", name, name, actual) + } + existingNames[actual] = "" + }) + } + + lastName, _ := allocator.GetNext(existingNames, map[string]struct{}{skippedName: {}}) + if lastName != skippedName { + t.Errorf("test %q: expected %q, got %q (likelyBadNames fallback)", skippedName, skippedName, lastName) + } +} + func TestNameAllocatorError(t *testing.T) { allocator := nameAllocator{} existingNames := map[string]string{} for i := 0; i < len(deviceNames); i++ { - name, _ := allocator.GetNext(existingNames) + name, _ := allocator.GetNext(existingNames, map[string]struct{}{}) existingNames[name] = "" } - name, err := allocator.GetNext(existingNames) + name, err := allocator.GetNext(existingNames, map[string]struct{}{}) if err == nil { t.Errorf("expected error, got device %q", name) } diff --git a/pkg/cloud/devicemanager/manager.go b/pkg/cloud/devicemanager/manager.go index 05612edb15..8ab6b78997 100644 --- a/pkg/cloud/devicemanager/manager.go +++ b/pkg/cloud/devicemanager/manager.go @@ -52,7 +52,7 @@ type DeviceManager interface { // NewDevice retrieves the device if the device is already assigned. // Otherwise it creates a new device with next available device name // and mark it as unassigned device. - NewDevice(instance *ec2.Instance, volumeID string) (device *Device, err error) + NewDevice(instance *ec2.Instance, volumeID string, likelyBadNames map[string]struct{}) (device *Device, err error) // GetDevice returns the device already assigned to the volume. GetDevice(instance *ec2.Instance, volumeID string) (device *Device, err error) @@ -103,7 +103,7 @@ func NewDeviceManager() DeviceManager { } } -func (d *deviceManager) NewDevice(instance *ec2.Instance, volumeID string) (*Device, error) { +func (d *deviceManager) NewDevice(instance *ec2.Instance, volumeID string, likelyBadNames map[string]struct{}) (*Device, error) { d.mux.Lock() defer d.mux.Unlock() @@ -124,7 +124,7 @@ func (d *deviceManager) NewDevice(instance *ec2.Instance, volumeID string) (*Dev return nil, err } - name, err := d.nameAllocator.GetNext(inUse) + name, err := d.nameAllocator.GetNext(inUse, likelyBadNames) if err != nil { return nil, fmt.Errorf("could not get a free device name to assign to node %s", nodeID) } diff --git a/pkg/cloud/devicemanager/manager_test.go b/pkg/cloud/devicemanager/manager_test.go index 9a9055297e..8c41f6210a 100644 --- a/pkg/cloud/devicemanager/manager_test.go +++ b/pkg/cloud/devicemanager/manager_test.go @@ -59,7 +59,7 @@ func TestNewDevice(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Should fail if instance is nil - dev1, err := dm.NewDevice(nil, tc.volumeID) + dev1, err := dm.NewDevice(nil, tc.volumeID, map[string]struct{}{}) if err == nil { t.Fatalf("Expected error when nil instance is passed in, got nothing") } @@ -70,11 +70,11 @@ func TestNewDevice(t *testing.T) { fakeInstance := newFakeInstance(tc.instanceID, tc.existingVolumeID, tc.existingDevicePath) // Should create valid Device with valid path - dev1, err = dm.NewDevice(fakeInstance, tc.volumeID) + dev1, err = dm.NewDevice(fakeInstance, tc.volumeID, map[string]struct{}{}) assertDevice(t, dev1, false, err) // Devices with same instance and volume should have same paths - dev2, err := dm.NewDevice(fakeInstance, tc.volumeID) + dev2, err := dm.NewDevice(fakeInstance, tc.volumeID, map[string]struct{}{}) assertDevice(t, dev2, true /*IsAlreadyAssigned*/, err) if dev1.Path != dev2.Path { t.Fatalf("Expected equal paths, got %v and %v", dev1.Path, dev2.Path) @@ -82,7 +82,7 @@ func TestNewDevice(t *testing.T) { // Should create new Device with the same path after releasing dev2.Release(false) - dev3, err := dm.NewDevice(fakeInstance, tc.volumeID) + dev3, err := dm.NewDevice(fakeInstance, tc.volumeID, map[string]struct{}{}) assertDevice(t, dev3, false, err) if dev3.Path != dev1.Path { t.Fatalf("Expected equal paths, got %v and %v", dev1.Path, dev3.Path) @@ -136,7 +136,7 @@ func TestNewDeviceWithExistingDevice(t *testing.T) { t.Run(tc.name, func(t *testing.T) { fakeInstance := newFakeInstance("fake-instance", tc.existingID, tc.existingPath) - dev, err := dm.NewDevice(fakeInstance, tc.volumeID) + dev, err := dm.NewDevice(fakeInstance, tc.volumeID, map[string]struct{}{}) assertDevice(t, dev, tc.existingID == tc.volumeID, err) if dev.Path != tc.expectedPath { @@ -169,7 +169,7 @@ func TestGetDevice(t *testing.T) { fakeInstance := newFakeInstance(tc.instanceID, tc.existingVolumeID, tc.existingDevicePath) // Should create valid Device with valid path - dev1, err := dm.NewDevice(fakeInstance, tc.volumeID) + dev1, err := dm.NewDevice(fakeInstance, tc.volumeID, map[string]struct{}{}) assertDevice(t, dev1, false /*IsAlreadyAssigned*/, err) // Devices with same instance and volume should have same paths @@ -205,7 +205,7 @@ func TestReleaseDevice(t *testing.T) { fakeInstance := newFakeInstance(tc.instanceID, tc.existingVolumeID, tc.existingDevicePath) // Should get assigned Device after releasing tainted device - dev, err := dm.NewDevice(fakeInstance, tc.volumeID) + dev, err := dm.NewDevice(fakeInstance, tc.volumeID, map[string]struct{}{}) assertDevice(t, dev, false /*IsAlreadyAssigned*/, err) dev.Taint() dev.Release(false)