Skip to content

Commit

Permalink
Tune batched EC2 Describe* maxDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed May 1, 2024
1 parent c124663 commit c8b79c2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
19 changes: 11 additions & 8 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
var bm *batcherManager
if batchingEnabled {
klog.V(4).InfoS("newEC2Cloud: batching enabled")
bm = newBatcherManager(svc)
batchDelay := 500 * time.Millisecond // Tuned via scalability tests to minimize latency and EC2 API Calls
bm = newBatcherManager(svc, batchDelay)
}

rm := newRetryManager()
Expand All @@ -355,24 +356,26 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
}

// newBatcherManager initializes a new instance of batcherManager.
func newBatcherManager(svc EC2API) *batcherManager {
// Each batcher's `entries` set to maximum results returned by relevant EC2 API call without pagination.
// Each batcher's `delay` minimizes RPC latency and EC2 API calls. Tuned via scalability tests.
func newBatcherManager(svc EC2API, delay time.Duration) *batcherManager {
return &batcherManager{
volumeIDBatcher: batcher.New(500, 1*time.Second, func(ids []string) (map[string]*types.Volume, error) {
volumeIDBatcher: batcher.New(500, delay, func(ids []string) (map[string]*types.Volume, error) {
return execBatchDescribeVolumes(svc, ids, volumeIDBatcher)
}),
volumeTagBatcher: batcher.New(500, 1*time.Second, func(names []string) (map[string]*types.Volume, error) {
volumeTagBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.Volume, error) {
return execBatchDescribeVolumes(svc, names, volumeTagBatcher)
}),
instanceIDBatcher: batcher.New(50, 300*time.Millisecond, func(ids []string) (map[string]*types.Instance, error) {
instanceIDBatcher: batcher.New(50, delay, func(ids []string) (map[string]*types.Instance, error) {
return execBatchDescribeInstances(svc, ids)
}),
snapshotIDBatcher: batcher.New(1000, 300*time.Millisecond, func(ids []string) (map[string]*types.Snapshot, error) {
snapshotIDBatcher: batcher.New(1000, delay, func(ids []string) (map[string]*types.Snapshot, error) {
return execBatchDescribeSnapshots(svc, ids, snapshotIDBatcher)
}),
snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) {
snapshotTagBatcher: batcher.New(1000, delay, func(names []string) (map[string]*types.Snapshot, error) {
return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher)
}),
volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) {
volumeModificationIDBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.VolumeModification, error) {
return execBatchDescribeVolumesModifications(svc, names)
}),
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
defaultPath = "/dev/xvdaa"

defaultCreateDiskDeadline = time.Second * 5

testBatchDelay = 50 * time.Millisecond
)

func generateVolumes(volIdCount, volTagCount int) []types.Volume {
Expand Down Expand Up @@ -183,7 +185,7 @@ func TestBatchDescribeVolumes(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay)

tc.mockFunc(mockEC2, tc.expErr, tc.volumes)
volumeIDs, volumeNames := extractVolumeIdentifiers(tc.volumes)
Expand Down Expand Up @@ -300,7 +302,7 @@ func TestBatchDescribeInstances(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay)

// Setup mocks
var instances []types.Instance
Expand Down Expand Up @@ -475,7 +477,7 @@ func TestBatchDescribeSnapshots(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay)

tc.mockFunc(mockEC2, tc.expErr, tc.snapshots)
snapshotIDs, snapshotNames := extractSnapshotIdentifiers(tc.snapshots)
Expand Down Expand Up @@ -594,7 +596,7 @@ func TestBatchDescribeVolumesModifications(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay)

// Setup mocks
var volumeModifications []types.VolumeModification
Expand Down

0 comments on commit c8b79c2

Please sign in to comment.