diff --git a/internal/resources/providers/gcplib/inventory/provider.go b/internal/resources/providers/gcplib/inventory/provider.go index 98de5f53fb..772bb1cb00 100644 --- a/internal/resources/providers/gcplib/inventory/provider.go +++ b/internal/resources/providers/gcplib/inventory/provider.go @@ -22,15 +22,19 @@ import ( "fmt" "strings" "sync" + "time" asset "cloud.google.com/go/asset/apiv1" "cloud.google.com/go/asset/apiv1/assetpb" "github.com/elastic/elastic-agent-libs/logp" "github.com/googleapis/gax-go/v2" "github.com/samber/lo" + "golang.org/x/time/rate" "google.golang.org/api/cloudresourcemanager/v3" "google.golang.org/api/iterator" "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/cloudbeat/internal/resources/fetching" @@ -129,9 +133,47 @@ type ProviderInitializerAPI interface { Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) } +var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{ + Initial: 1 * time.Second, + Max: 10 * time.Second, + Multiplier: 1.2, + }) +}) + +// https://cloud.google.com/asset-inventory/docs/quota +type AssetsInventoryRateLimiter struct { + methods map[string]*rate.Limiter + log *logp.Logger +} + +func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter { + return &AssetsInventoryRateLimiter{ + methods: map[string]*rate.Limiter{ + "/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1), + }, + log: log, + } +} + +func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption { + return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + limiter := rl.methods[method] + if limiter != nil { + err := limiter.Wait(ctx) + if err != nil { + rl.log.Errorf("Failed to wait for %s, error: %v", method, err) + } + } + return invoker(ctx, method, req, reply, cc, opts...) + }) +} + func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) { + limiter := NewAssetsInventoryRateLimiter(log) + // initialize GCP assets inventory client - client, err := asset.NewClient(ctx, gcpConfig.ClientOpts...) + client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...) if err != nil { return nil, err } @@ -139,7 +181,7 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon assetsInventoryWrapper := &AssetsInventoryWrapper{ Close: client.Close, ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator { - return client.ListAssets(ctx, req, opts...) + return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...) }, }