Skip to content

Commit

Permalink
Batch DescribeVolume API requests
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Oct 31, 2023
1 parent 46f86af commit c1a894c
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 50 deletions.
193 changes: 180 additions & 13 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/batcher"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -137,6 +138,12 @@ const (
AwsEbsDriverTagKey = "ebs.csi.aws.com/cluster"
)

// Batcher
const (
volumeIDBatcher batcherType = iota
volumeTagBatcher
)

var (
// ErrMultiDisks is an error that is returned when multiple
// disks are found with the same volume name.
Expand Down Expand Up @@ -234,21 +241,41 @@ type ec2ListSnapshotsResponse struct {
NextToken *string
}

// batcherType is an enum representing the types of batchers available.
type batcherType int

// batcherManager maintains a collection of batchers for different types of tasks.
type batcherManager struct {
batchers map[batcherType]*batcher.Batcher[string, *ec2.Volume]
}

type cloud struct {
region string
ec2 ec2iface.EC2API
dm dm.DeviceManager
bm *batcherManager
}

var _ Cloud = &cloud{}

// NewCloud returns a new instance of AWS cloud
// It panics if session is invalid
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string, batching bool) (Cloud, error) {
c := newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)

if batching {
klog.V(4).InfoS("NewCloud: batching enabled")
cloudInstance, ok := c.(*cloud)
if !ok {
return nil, fmt.Errorf("expected *cloud type but got %T", c)
}
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
}

return c, nil
}

func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string) Cloud {
awsConfig := &aws.Config{
Region: aws.String(region),
CredentialsChainVerboseErrors: aws.Bool(true),
Expand Down Expand Up @@ -295,7 +322,135 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Clo
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
}, nil
}
}

// newBatcherManager initializes a new instance of batcherManager.
func newBatcherManager(svc ec2iface.EC2API) *batcherManager {
return &batcherManager{
batchers: map[batcherType]*batcher.Batcher[string, *ec2.Volume]{
volumeIDBatcher: batcher.New(500, 1*time.Second, func(ids []string) (map[string]*ec2.Volume, error) {
return execBatchDescribeVolumes(svc, ids, volumeIDBatcher)
}),
volumeTagBatcher: batcher.New(500, 1*time.Second, func(names []string) (map[string]*ec2.Volume, error) {
return execBatchDescribeVolumes(svc, names, volumeTagBatcher)
}),
},
}
}

// getBatcher fetches a specific type of batcher from the batcherManager.
func (bm *batcherManager) getBatcher(b batcherType) *batcher.Batcher[string, *ec2.Volume] {
return bm.batchers[b]
}

// executes a batched DescribeVolumes API call depending on the type of batcher.
func execBatchDescribeVolumes(svc ec2iface.EC2API, input []string, batcher batcherType) (map[string]*ec2.Volume, error) {
var request *ec2.DescribeVolumesInput

switch batcher {
case volumeIDBatcher:
klog.V(7).InfoS("execBatchDescribeVolumes", "volumeIds", input)
request = &ec2.DescribeVolumesInput{
VolumeIds: aws.StringSlice(input),
}

case volumeTagBatcher:
klog.V(7).InfoS("execBatchDescribeVolumes", "names", input)
filters := []*ec2.Filter{
{
Name: aws.String("tag:" + VolumeNameTagKey),
Values: aws.StringSlice(input),
},
}
request = &ec2.DescribeVolumesInput{
Filters: filters,
}

default:
return nil, fmt.Errorf("execBatchDescribeVolumes: unsupported request type")
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

resp, err := describeVolumes(ctx, svc, request)
if err != nil {
return nil, err
}

result := make(map[string]*ec2.Volume)

for _, volume := range resp {
key, err := extractVolumeKey(volume, batcher)
if err != nil {
klog.Warningf("execBatchDescribeVolumes: skipping volume: %v, reason: %v", volume, err)
continue
}
result[key] = volume
}

klog.V(7).InfoS("execBatchDescribeVolumes: success", "result", result)
return result, nil
}

// batchDescribeVolumes processes a DescribeVolumes request. Depending on the request,
// it determines the appropriate batcher to use, queues the task, and waits for the result.
func (c *cloud) batchDescribeVolumes(request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var bType batcherType
var task string

switch {
case len(request.VolumeIds) == 1 && request.VolumeIds[0] != nil:
bType = volumeIDBatcher
task = *request.VolumeIds[0]

case len(request.Filters) == 1 && *request.Filters[0].Name == "tag:"+VolumeNameTagKey && len(request.Filters[0].Values) == 1:
bType = volumeTagBatcher
task = *request.Filters[0].Values[0]

default:
return nil, fmt.Errorf("batchDescribeVolumes: invalid request, request: %v", request)
}

ch := make(chan batcher.BatchResult[*ec2.Volume])

b := c.bm.getBatcher(bType)
b.AddTask(task, ch)

r := <-ch

if r.Err != nil {
return nil, r.Err
}
if r.Result == nil {
return nil, fmt.Errorf("batchDescribeVolumes: no volume found %s", task)
}
return r.Result, nil
}

// extractVolumeKey retrieves the key associated with a given volume based on the batcher type.
// For the volumeIDBatcher type, it returns the volume's ID.
// For other types, it searches for the VolumeNameTagKey within the volume's tags.
func extractVolumeKey(v *ec2.Volume, batcher batcherType) (string, error) {
if batcher == volumeIDBatcher {
if v.VolumeId == nil {
return "", errors.New("extractVolumeKey: missing volume ID")
}
return *v.VolumeId, nil
}
for _, tag := range v.Tags {
klog.V(7).InfoS("extractVolumeKey: processing tag", "volume", v, "*tag.Key", *tag.Key, "VolumeNameTagKey", VolumeNameTagKey)
if tag.Key == nil || tag.Value == nil {
klog.V(7).InfoS("extractVolumeKey: skipping volume due to missing tag", "volume", v, "tag", tag)
continue
}
if *tag.Key == VolumeNameTagKey {
klog.V(7).InfoS("extractVolumeKey: found volume name tag", "volume", v, "tag", tag)
return *tag.Value, nil
}
}
return "", errors.New("extractVolumeKey: missing VolumeNameTagKey in volume tags")
}

func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
Expand Down Expand Up @@ -463,6 +618,7 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, fmt.Errorf("could not attach tags to volume: %v. %w", volumeID, err)
}
}
klog.InfoS("volume created", "volumeID", volumeID)
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil
}

Expand Down Expand Up @@ -729,7 +885,7 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedSt
return true, nil
}
// continue waiting
klog.V(4).InfoS("Waiting for volume state", "volumeID", volumeID, "actual", attachmentState, "desired", expectedState)
klog.InfoS("Waiting for volume state", "volumeID", volumeID, "actual", attachmentState, "desired", expectedState)
return false, nil
}

Expand Down Expand Up @@ -954,11 +1110,11 @@ func (c *cloud) EnableFastSnapshotRestores(ctx context.Context, availabilityZone
return response, nil
}

func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
func describeVolumes(ctx context.Context, svc ec2iface.EC2API, request *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string
for {
response, err := c.ec2.DescribeVolumesWithContext(ctx, request)
response, err := svc.DescribeVolumesWithContext(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -969,14 +1125,25 @@ func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput
}
request.NextToken = nextToken
}
return volumes, nil
}

if l := len(volumes); l > 1 {
return nil, ErrMultiDisks
} else if l < 1 {
return nil, ErrNotFound
}
func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
if c.bm == nil {
volumes, err := describeVolumes(ctx, c.ec2, request)
if err != nil {
return nil, err
}

return volumes[0], nil
if l := len(volumes); l > 1 {
return nil, ErrMultiDisks
} else if l < 1 {
return nil, ErrNotFound
}
return volumes[0], nil
} else {
return c.batchDescribeVolumes(request)
}
}

func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance, error) {
Expand Down
Loading

0 comments on commit c1a894c

Please sign in to comment.