Skip to content

Commit

Permalink
add rate limit to asset inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
orouz committed Mar 24, 2024
1 parent d765314 commit b83c741
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions internal/resources/providers/gcplib/inventory/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import (
"fmt"
"strings"
"sync"
"time"

asset "cloud.google.com/go/asset/apiv1"
"cloud.google.com/go/asset/apiv1/assetpb"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/googleapis/gax-go/v2"
"github.com/samber/lo"
"golang.org/x/time/rate"
"google.golang.org/api/cloudresourcemanager/v3"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/cloudbeat/internal/resources/fetching"
Expand Down Expand Up @@ -129,17 +133,55 @@ type ProviderInitializerAPI interface {
Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error)
}

var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 10 * time.Second,
Multiplier: 1.2,
})
})

// https://cloud.google.com/asset-inventory/docs/quota
type AssetsInventoryRateLimiter struct {
methods map[string]*rate.Limiter
log *logp.Logger
}

func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter {
return &AssetsInventoryRateLimiter{
methods: map[string]*rate.Limiter{
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1),
},
log: log,
}
}

func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
limiter := rl.methods[method]
if limiter != nil {
err := limiter.Wait(ctx)
if err != nil {
rl.log.Errorf("Failed to wait for %s, error: %v", method, err)
}
}
return invoker(ctx, method, req, reply, cc, opts...)
})
}

func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) {
limiter := NewAssetsInventoryRateLimiter(log)

// initialize GCP assets inventory client
client, err := asset.NewClient(ctx, gcpConfig.ClientOpts...)
client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...)
if err != nil {
return nil, err
}
// wrap the assets inventory client for mocking
assetsInventoryWrapper := &AssetsInventoryWrapper{
Close: client.Close,
ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator {
return client.ListAssets(ctx, req, opts...)
return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...)
},
}

Expand Down

0 comments on commit b83c741

Please sign in to comment.