From 27b7c028be611a80d1488125ffad34156e3edbb3 Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Tue, 31 Oct 2023 08:42:05 +0100 Subject: [PATCH] Add configurable limit to max files that can be downloaded from the bucket --- cmd/guacone/cmd/s3.go | 9 ++++++--- pkg/handler/collector/s3/s3.go | 8 ++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cmd/guacone/cmd/s3.go b/cmd/guacone/cmd/s3.go index 7cc8022f87..8c886f4926 100644 --- a/cmd/guacone/cmd/s3.go +++ b/cmd/guacone/cmd/s3.go @@ -39,6 +39,7 @@ type s3Options struct { s3bucket string // name of bucket to collect from region string // AWS region, for s3/sqs configuration (defaults to us-east-1) s3item string // s3 item (only for non-polling behaviour) + limit int // max number of files to download from bucket queues string // comma-separated list of queues/topics (only for polling behaviour) mp string // message provider name (sqs or kafka, will default to kafka) mpEndpoint string // endpoint for the message provider (only for polling behaviour) @@ -58,6 +59,7 @@ var s3Cmd = &cobra.Command{ viper.GetString("s3-bucket"), viper.GetString("s3-region"), viper.GetString("s3-item"), + viper.GetInt("limit"), viper.GetString("s3-mp"), viper.GetString("s3-mp-endpoint"), viper.GetString("s3-queues"), @@ -77,6 +79,7 @@ var s3Cmd = &cobra.Command{ S3Bucket: s3Opts.s3bucket, S3Region: s3Opts.region, S3Item: s3Opts.s3item, + Limit: s3Opts.limit, MessageProvider: s3Opts.mp, MessageProviderEndpoint: s3Opts.mpEndpoint, Queues: s3Opts.queues, @@ -130,7 +133,7 @@ var s3Cmd = &cobra.Command{ }, } -func validateS3Opts(s3url string, s3bucket string, region string, s3item string, mp string, mpEndpoint string, queues string, poll bool) (s3Options, error) { +func validateS3Opts(s3url string, s3bucket string, region string, s3item string, limit int, mp string, mpEndpoint string, queues string, poll bool) (s3Options, error) { var opts s3Options if poll { @@ -148,13 +151,13 @@ func validateS3Opts(s3url string, s3bucket string, region string, s3item string, return opts, fmt.Errorf("expected s3 bucket") } - opts = s3Options{s3url, s3bucket, region, s3item, queues, mp, mpEndpoint, poll} + opts = s3Options{s3url, s3bucket, region, s3item, limit, queues, mp, mpEndpoint, poll} return opts, nil } func init() { - set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-region", "s3-item", "s3-mp", "s3-mp-endpoint", "s3-queues", "poll"}) + set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-region", "s3-item", "limit", "s3-mp", "s3-mp-endpoint", "s3-queues", "poll"}) if err != nil { fmt.Fprintf(os.Stderr, "failed to setup flag: %s", err) os.Exit(1) diff --git a/pkg/handler/collector/s3/s3.go b/pkg/handler/collector/s3/s3.go index 9336aa7b0d..90037ddcf2 100644 --- a/pkg/handler/collector/s3/s3.go +++ b/pkg/handler/collector/s3/s3.go @@ -41,6 +41,7 @@ type S3CollectorConfig struct { MessageProviderEndpoint string // optional if using the sqs message provider S3Url string // optional (uses aws sdk defaults) S3Bucket string // bucket name to collect from + Limit int // optional max number of files to download from the bucket S3Item string // optional (only for non-polling behaviour) S3Region string // optional (defaults to us-east-1, assumes same region for s3 and sqs) Queues string // optional (comma-separated list of queues/topics) @@ -100,6 +101,7 @@ func retrieve(s S3Collector, ctx context.Context, docChannel chan<- *processor.D } else { var token *string const MaxKeys = 100 + var total = 0 for { files, t, err := downloader.ListFiles(ctx, s.config.S3Bucket, token, MaxKeys) if err != nil { @@ -132,6 +134,12 @@ func retrieve(s S3Collector, ctx context.Context, docChannel chan<- *processor.D }, } docChannel <- doc + + total += 1 + if s.config.Limit > 0 && total >= s.config.Limit { + logger.Infof("Configured limit of %d reached. Exiting.", s.config.Limit) + break + } } if len(files) < MaxKeys {