Skip to content

Commit

Permalink
Migrate likelyBadNames to ExpiringCache
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Catlett <[email protected]>
  • Loading branch information
ConnorJC3 committed Jul 3, 2024
1 parent 381647f commit 8f9dbe2
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 128 deletions.
88 changes: 28 additions & 60 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/aws/smithy-go"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/batcher"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/expiringcache"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -91,6 +92,7 @@ var (
const (
volumeDetachedState = "detached"
volumeAttachedState = "attached"
cacheForgetDelay = 1 * time.Hour
)

// AWS provisioning limits.
Expand Down Expand Up @@ -310,12 +312,13 @@ type batcherManager struct {
}

type cloud struct {
region string
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
region string
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
likelyBadDeviceNames expiringcache.ExpiringCache[string, sync.Map]
}

var _ Cloud = &cloud{}
Expand Down Expand Up @@ -364,12 +367,13 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
}

return &cloud{
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: newRetryManager(),
vwp: vwp,
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: newRetryManager(),
vwp: vwp,
likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay),
}
}

Expand Down Expand Up @@ -847,34 +851,19 @@ func (c *cloud) batchDescribeInstances(request *ec2.DescribeInstancesInput) (*ty
return r.Result, nil
}

// Node likely bad device names cache
// Remember device names that are already in use on an instance and use them last when attaching volumes
// This works around device names that are used but do not appear in the mapping from DescribeInstanceStatus
const cacheForgetDelay = 1 * time.Hour

type cachedNode struct {
timer *time.Timer
likelyBadNames map[string]struct{}
}

var cacheMutex sync.Mutex
var nodeDeviceCache map[string]cachedNode = map[string]cachedNode{}

func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return "", err
}

likelyBadNames := map[string]struct{}{}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
likelyBadNames = node.likelyBadNames
node.timer.Reset(cacheForgetDelay)
likelyBadDeviceNames, ok := c.likelyBadDeviceNames.Get(nodeID)
if !ok {
likelyBadDeviceNames = new(sync.Map)
c.likelyBadDeviceNames.Set(nodeID, likelyBadDeviceNames)
}
cacheMutex.Unlock()

device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames)
device, err := c.dm.NewDevice(instance, volumeID, likelyBadDeviceNames)
if err != nil {
return "", err
}
Expand All @@ -892,37 +881,16 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
})
if attachErr != nil {
if isAWSErrorBlockDeviceInUse(attachErr) {
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Node already had existing cached bad names, add on to the list
node.likelyBadNames[device.Path] = struct{}{}
node.timer.Reset(cacheForgetDelay)
} else {
// Node has no existing cached bad device names, setup a new struct instance
nodeDeviceCache[nodeID] = cachedNode{
timer: time.AfterFunc(cacheForgetDelay, func() {
// If this ever fires, the node has not had a volume attached for an hour
// In order to prevent a semi-permanent memory leak, delete it from the map
cacheMutex.Lock()
delete(nodeDeviceCache, nodeID)
cacheMutex.Unlock()
}),
likelyBadNames: map[string]struct{}{
device.Path: {},
},
}
}
cacheMutex.Unlock()
// If block device is "in use", that likely indicates a bad name that is in use by a block
// device that we do not know about (example: block devices attached in the AMI, which are
// not reported in DescribeInstance's block device map)
//
// Store such bad names in the "likely bad" map to be considered last in future attempts
likelyBadDeviceNames.Store(device.Path, struct{}{})
}
return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr)
}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Remove succesfully attached devices from the "likely bad" list
delete(node.likelyBadNames, device.Path)
node.timer.Reset(cacheForgetDelay)
}
cacheMutex.Unlock()
likelyBadDeviceNames.Delete(device.Path)
klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp)
}

Expand Down
69 changes: 27 additions & 42 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"context"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"reflect"
"strings"
"sync"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
Expand All @@ -35,6 +36,7 @@ import (

"github.com/golang/mock/gomock"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/expiringcache"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1341,14 +1343,13 @@ func TestAttachDisk(t *testing.T) {
}

testCases := []struct {
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
validateFunc func(t *testing.T)
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
}{
{
name: "success: AttachVolume normal",
Expand Down Expand Up @@ -1377,32 +1378,30 @@ func TestAttachDisk(t *testing.T) {
name: "success: AttachVolume skip likely bad name",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
nodeID2: defaultNodeID, // Induce second attach
path: "/dev/xvdab",
expErr: nil,
expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, blockDeviceInUseErr),
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
volumeRequest := createVolumeRequest(volumeID)
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)
attachRequest1 := createAttachRequest(volumeID, nodeID, defaultPath)
attachRequest2 := createAttachRequest(volumeID, nodeID, path)

gomock.InOrder(
// First call - fail with "already in use" error
mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest1), gomock.Any()).Return(nil, blockDeviceInUseErr),

// Second call - succeed, expect bad device name to be skipped
mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest2), gomock.Any()).Return(&ec2.AttachVolumeOutput{
Device: aws.String(path),
InstanceId: aws.String(nodeID),
VolumeId: aws.String(volumeID),
State: types.VolumeAttachmentStateAttaching,
}, nil),
mockEC2.EXPECT().DescribeVolumes(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil),
)

nodeDeviceCache = map[string]cachedNode{
defaultNodeID: {
timer: time.NewTimer(1 * time.Hour),
likelyBadNames: map[string]struct{}{
defaultPath: {},
},
},
}
},
},
{
Expand All @@ -1416,7 +1415,7 @@ func TestAttachDisk(t *testing.T) {
instanceRequest := createInstanceRequest(nodeID)

fakeInstance := newFakeInstance(nodeID, volumeID, path)
_, err := dm.NewDevice(&fakeInstance, volumeID, map[string]struct{}{})
_, err := dm.NewDevice(&fakeInstance, volumeID, new(sync.Map))
require.NoError(t, err)

gomock.InOrder(
Expand All @@ -1439,9 +1438,6 @@ func TestAttachDisk(t *testing.T) {
mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest, gomock.Any()).Return(nil, errors.New("AttachVolume error")),
)
},
validateFunc: func(t *testing.T) {
assert.NotContains(t, nodeDeviceCache, defaultNodeID)
},
},
{
name: "fail: AttachVolume returned block device already in use error",
Expand All @@ -1458,11 +1454,6 @@ func TestAttachDisk(t *testing.T) {
mockEC2.EXPECT().AttachVolume(ctx, attachRequest, gomock.Any()).Return(nil, blockDeviceInUseErr),
)
},
validateFunc: func(t *testing.T) {
assert.Contains(t, nodeDeviceCache, defaultNodeID)
assert.NotNil(t, nodeDeviceCache[defaultNodeID].timer)
assert.Contains(t, nodeDeviceCache[defaultNodeID].likelyBadNames, defaultPath)
},
},
{
name: "success: AttachVolume multi-attach",
Expand Down Expand Up @@ -1524,9 +1515,6 @@ func TestAttachDisk(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Reset node likely bad names cache
nodeDeviceCache = map[string]cachedNode{}

mockCtrl := gomock.NewController(t)
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
Expand All @@ -1552,10 +1540,6 @@ func TestAttachDisk(t *testing.T) {
assert.Equal(t, tc.path, devicePath)
}

if tc.validateFunc != nil {
tc.validateFunc(t)
}

mockCtrl.Finish()
})
}
Expand Down Expand Up @@ -3086,11 +3070,12 @@ func testVolumeWaitParameters() volumeWaitParameters {

func newCloud(mockEC2 EC2API) Cloud {
c := &cloud{
region: "test-region",
dm: dm.NewDeviceManager(),
ec2: mockEC2,
rm: newRetryManager(),
vwp: testVolumeWaitParameters(),
region: "test-region",
dm: dm.NewDeviceManager(),
ec2: mockEC2,
rm: newRetryManager(),
vwp: testVolumeWaitParameters(),
likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay),
}
return c
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/cloud/devicemanager/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package devicemanager

import (
"fmt"
"sync"
)

// ExistingNames is a map of assigned device names. Presence of a key with a device
Expand All @@ -34,7 +35,7 @@ type ExistingNames map[string]string
// call), so all available device names are used eventually and it minimizes
// device name reuse.
type NameAllocator interface {
GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (name string, err error)
GetNext(existingNames ExistingNames, likelyBadNames *sync.Map) (name string, err error)
}

type nameAllocator struct{}
Expand All @@ -46,18 +47,28 @@ var _ NameAllocator = &nameAllocator{}
//
// likelyBadNames is a map of names that have previously returned an "in use" error when attempting to mount to them
// These names are unlikely to result in a successful mount, and may be permanently unavailable, so use them last
func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (string, error) {
func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames *sync.Map) (string, error) {
for _, name := range deviceNames {
_, existing := existingNames[name]
_, likelyBad := likelyBadNames[name]
_, likelyBad := likelyBadNames.Load(name)
if !existing && !likelyBad {
return name, nil
}
}
for name := range likelyBadNames {
if _, existing := existingNames[name]; !existing {
return name, nil

finalResortName := ""
likelyBadNames.Range(func(name, _ interface{}) bool {
if name, ok := name.(string); ok {
fmt.Println(name)
if _, existing := existingNames[name]; !existing {
finalResortName = name
return false
}
}
return true
})
if finalResortName != "" {
return finalResortName, nil
}

return "", fmt.Errorf("there are no names available")
Expand Down
Loading

0 comments on commit 8f9dbe2

Please sign in to comment.