From 737ae6b9bb09e5bdf05800ecfa4b2ee5d73e5ab5 Mon Sep 17 00:00:00 2001 From: Drew Sirenko <68304519+AndrewSirenko@users.noreply.github.com> Date: Wed, 1 May 2024 14:24:19 +0000 Subject: [PATCH] Tune batched EC2 Describe* maxDelay --- pkg/cloud/cloud.go | 19 +++++++++++-------- pkg/cloud/cloud_test.go | 10 ++++++---- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index fa8aea8987..cdb25d4a54 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -340,7 +340,8 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc var bm *batcherManager if batchingEnabled { klog.V(4).InfoS("newEC2Cloud: batching enabled") - bm = newBatcherManager(svc) + batchDelay := 500 * time.Millisecond // Tuned via scalability tests to minimize latency and EC2 API Calls + bm = newBatcherManager(svc, batchDelay) } rm := newRetryManager() @@ -355,24 +356,26 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc } // newBatcherManager initializes a new instance of batcherManager. -func newBatcherManager(svc EC2API) *batcherManager { +// Each batcher's `entries` set to maximum results returned by relevant EC2 API call without pagination. +// Each batcher's `delay` minimizes RPC latency and EC2 API calls. Tuned via scalability tests. +func newBatcherManager(svc EC2API, delay time.Duration) *batcherManager { return &batcherManager{ - volumeIDBatcher: batcher.New(500, 1*time.Second, func(ids []string) (map[string]*types.Volume, error) { + volumeIDBatcher: batcher.New(500, delay, func(ids []string) (map[string]*types.Volume, error) { return execBatchDescribeVolumes(svc, ids, volumeIDBatcher) }), - volumeTagBatcher: batcher.New(500, 1*time.Second, func(names []string) (map[string]*types.Volume, error) { + volumeTagBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.Volume, error) { return execBatchDescribeVolumes(svc, names, volumeTagBatcher) }), - instanceIDBatcher: batcher.New(50, 300*time.Millisecond, func(ids []string) (map[string]*types.Instance, error) { + instanceIDBatcher: batcher.New(50, delay, func(ids []string) (map[string]*types.Instance, error) { return execBatchDescribeInstances(svc, ids) }), - snapshotIDBatcher: batcher.New(1000, 300*time.Millisecond, func(ids []string) (map[string]*types.Snapshot, error) { + snapshotIDBatcher: batcher.New(1000, delay, func(ids []string) (map[string]*types.Snapshot, error) { return execBatchDescribeSnapshots(svc, ids, snapshotIDBatcher) }), - snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) { + snapshotTagBatcher: batcher.New(1000, delay, func(names []string) (map[string]*types.Snapshot, error) { return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher) }), - volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) { + volumeModificationIDBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.VolumeModification, error) { return execBatchDescribeVolumesModifications(svc, names) }), } diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 65fd1ceea0..c503e65c1f 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -48,6 +48,8 @@ const ( defaultPath = "/dev/xvdaa" defaultCreateDiskDeadline = time.Second * 5 + + testBatchDelay = 100 * time.Millisecond ) func generateVolumes(volIdCount, volTagCount int) []types.Volume { @@ -183,7 +185,7 @@ func TestBatchDescribeVolumes(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay) tc.mockFunc(mockEC2, tc.expErr, tc.volumes) volumeIDs, volumeNames := extractVolumeIdentifiers(tc.volumes) @@ -300,7 +302,7 @@ func TestBatchDescribeInstances(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay) // Setup mocks var instances []types.Instance @@ -475,7 +477,7 @@ func TestBatchDescribeSnapshots(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay) tc.mockFunc(mockEC2, tc.expErr, tc.snapshots) snapshotIDs, snapshotNames := extractSnapshotIdentifiers(tc.snapshots) @@ -594,7 +596,7 @@ func TestBatchDescribeVolumesModifications(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, testBatchDelay) // Setup mocks var volumeModifications []types.VolumeModification