Skip to content

Commit

Permalink
Batch DescribeVolume API requests
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Nov 7, 2023
1 parent 02f4ed8 commit b1f13f4
Show file tree
Hide file tree
Showing 7 changed files with 435 additions and 53 deletions.
2 changes: 1 addition & 1 deletion docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ There are a couple of driver options that can be passed as arguments when starti
| logging-format | json | text | Sets the log format. Permitted formats: text, json|
| user-agent-extra | csi-ebs | helm | Extra string appended to user agent|
| enable-otel-tracing | true | false | If set to true, the driver will enable opentelemetry tracing. Might need [additional env variables](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration) to export the traces to the right collector|
| batching | true | true | If set to true, the driver will enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits|
| batching | true | true | If set to true, the driver will enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits at the cost of a small increase to worst-case latency|
192 changes: 179 additions & 13 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/batcher"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -137,6 +138,12 @@ const (
AwsEbsDriverTagKey = "ebs.csi.aws.com/cluster"
)

// Batcher
const (
volumeIDBatcher batcherType = iota
volumeTagBatcher
)

var (
// ErrMultiDisks is an error that is returned when multiple
// disks are found with the same volume name.
Expand Down Expand Up @@ -235,21 +242,41 @@ type ec2ListSnapshotsResponse struct {
NextToken *string
}

// batcherType is an enum representing the types of batchers available.
type batcherType int

// batcherManager maintains a collection of batchers for different types of tasks.
type batcherManager struct {
batchers map[batcherType]*batcher.Batcher[string, *ec2.Volume]
}

type cloud struct {
region string
ec2 ec2iface.EC2API
dm dm.DeviceManager
bm *batcherManager
}

var _ Cloud = &cloud{}

// NewCloud returns a new instance of AWS cloud
// It panics if session is invalid
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string, batching bool) (Cloud, error) {
c := newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)

if batching {
klog.V(4).InfoS("NewCloud: batching enabled")
cloudInstance, ok := c.(*cloud)
if !ok {
return nil, fmt.Errorf("expected *cloud type but got %T", c)
}
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
}

return c, nil
}

func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string) Cloud {
awsConfig := &aws.Config{
Region: aws.String(region),
CredentialsChainVerboseErrors: aws.Bool(true),
Expand Down Expand Up @@ -296,7 +323,135 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Clo
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
}, nil
}
}

// newBatcherManager initializes a new instance of batcherManager.
func newBatcherManager(svc ec2iface.EC2API) *batcherManager {
return &batcherManager{
batchers: map[batcherType]*batcher.Batcher[string, *ec2.Volume]{
volumeIDBatcher: batcher.New(500, 1*time.Second, func(ids []string) (map[string]*ec2.Volume, error) {
return execBatchDescribeVolumes(svc, ids, volumeIDBatcher)
}),
volumeTagBatcher: batcher.New(500, 1*time.Second, func(names []string) (map[string]*ec2.Volume, error) {
return execBatchDescribeVolumes(svc, names, volumeTagBatcher)
}),
},
}
}

// getBatcher fetches a specific type of batcher from the batcherManager.
func (bm *batcherManager) getBatcher(b batcherType) *batcher.Batcher[string, *ec2.Volume] {
return bm.batchers[b]
}

// executes a batched DescribeVolumes API call depending on the type of batcher.
func execBatchDescribeVolumes(svc ec2iface.EC2API, input []string, batcher batcherType) (map[string]*ec2.Volume, error) {
var request *ec2.DescribeVolumesInput

switch batcher {
case volumeIDBatcher:
klog.V(7).InfoS("execBatchDescribeVolumes", "volumeIds", input)
request = &ec2.DescribeVolumesInput{
VolumeIds: aws.StringSlice(input),
}

case volumeTagBatcher:
klog.V(7).InfoS("execBatchDescribeVolumes", "names", input)
filters := []*ec2.Filter{
{
Name: aws.String("tag:" + VolumeNameTagKey),
Values: aws.StringSlice(input),
},
}
request = &ec2.DescribeVolumesInput{
Filters: filters,
}

default:
return nil, fmt.Errorf("execBatchDescribeVolumes: unsupported request type")
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

resp, err := describeVolumes(ctx, svc, request)
if err != nil {
return nil, err
}

result := make(map[string]*ec2.Volume)

for _, volume := range resp {
key, err := extractVolumeKey(volume, batcher)
if err != nil {
klog.Warningf("execBatchDescribeVolumes: skipping volume: %v, reason: %v", volume, err)
continue
}
result[key] = volume
}

klog.V(7).InfoS("execBatchDescribeVolumes: success", "result", result)
return result, nil
}

// batchDescribeVolumes processes a DescribeVolumes request. Depending on the request,
// it determines the appropriate batcher to use, queues the task, and waits for the result.
func (c *cloud) batchDescribeVolumes(request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var bType batcherType
var task string

switch {
case len(request.VolumeIds) == 1 && request.VolumeIds[0] != nil:
bType = volumeIDBatcher
task = *request.VolumeIds[0]

case len(request.Filters) == 1 && *request.Filters[0].Name == "tag:"+VolumeNameTagKey && len(request.Filters[0].Values) == 1:
bType = volumeTagBatcher
task = *request.Filters[0].Values[0]

default:
return nil, fmt.Errorf("batchDescribeVolumes: invalid request, request: %v", request)
}

ch := make(chan batcher.BatchResult[*ec2.Volume])

b := c.bm.getBatcher(bType)
b.AddTask(task, ch)

r := <-ch

if r.Err != nil {
return nil, r.Err
}
if r.Result == nil {
return nil, fmt.Errorf("batchDescribeVolumes: no volume found %s", task)
}
return r.Result, nil
}

// extractVolumeKey retrieves the key associated with a given volume based on the batcher type.
// For the volumeIDBatcher type, it returns the volume's ID.
// For other types, it searches for the VolumeNameTagKey within the volume's tags.
func extractVolumeKey(v *ec2.Volume, batcher batcherType) (string, error) {
if batcher == volumeIDBatcher {
if v.VolumeId == nil {
return "", errors.New("extractVolumeKey: missing volume ID")
}
return *v.VolumeId, nil
}
for _, tag := range v.Tags {
klog.V(7).InfoS("extractVolumeKey: processing tag", "volume", v, "*tag.Key", *tag.Key, "VolumeNameTagKey", VolumeNameTagKey)
if tag.Key == nil || tag.Value == nil {
klog.V(7).InfoS("extractVolumeKey: skipping volume due to missing tag", "volume", v, "tag", tag)
continue
}
if *tag.Key == VolumeNameTagKey {
klog.V(7).InfoS("extractVolumeKey: found volume name tag", "volume", v, "tag", tag)
return *tag.Value, nil
}
}
return "", errors.New("extractVolumeKey: missing VolumeNameTagKey in volume tags")
}

func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
Expand Down Expand Up @@ -704,7 +859,7 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedSt
return true, nil
}
// continue waiting
klog.V(4).InfoS("Waiting for volume state", "volumeID", volumeID, "actual", attachmentState, "desired", expectedState)
klog.InfoS("Waiting for volume state", "volumeID", volumeID, "actual", attachmentState, "desired", expectedState)
return false, nil
}

Expand Down Expand Up @@ -929,11 +1084,11 @@ func (c *cloud) EnableFastSnapshotRestores(ctx context.Context, availabilityZone
return response, nil
}

func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
func describeVolumes(ctx context.Context, svc ec2iface.EC2API, request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string
for {
response, err := c.ec2.DescribeVolumesWithContext(ctx, request)
response, err := svc.DescribeVolumesWithContext(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -944,14 +1099,25 @@ func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput
}
request.NextToken = nextToken
}
return volumes, nil
}

if l := len(volumes); l > 1 {
return nil, ErrMultiDisks
} else if l < 1 {
return nil, ErrNotFound
}
func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
if c.bm == nil {
volumes, err := describeVolumes(ctx, c.ec2, request)
if err != nil {
return nil, err
}

return volumes[0], nil
if l := len(volumes); l > 1 {
return nil, ErrMultiDisks
} else if l < 1 {
return nil, ErrNotFound
}
return volumes[0], nil
} else {
return c.batchDescribeVolumes(request)
}
}

func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance, error) {
Expand Down
Loading

0 comments on commit b1f13f4

Please sign in to comment.