Skip to content

Commit

Permalink
Add configurable limit to max files that can be downloaded from the b…
Browse files Browse the repository at this point in the history
…ucket
  • Loading branch information
dejanb committed Oct 31, 2023
1 parent 5c2d3cb commit 27b7c02
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
9 changes: 6 additions & 3 deletions cmd/guacone/cmd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type s3Options struct {
s3bucket string // name of bucket to collect from
region string // AWS region, for s3/sqs configuration (defaults to us-east-1)
s3item string // s3 item (only for non-polling behaviour)
limit int // max number of files to download from bucket
queues string // comma-separated list of queues/topics (only for polling behaviour)
mp string // message provider name (sqs or kafka, will default to kafka)
mpEndpoint string // endpoint for the message provider (only for polling behaviour)
Expand All @@ -58,6 +59,7 @@ var s3Cmd = &cobra.Command{
viper.GetString("s3-bucket"),
viper.GetString("s3-region"),
viper.GetString("s3-item"),
viper.GetInt("limit"),
viper.GetString("s3-mp"),
viper.GetString("s3-mp-endpoint"),
viper.GetString("s3-queues"),
Expand All @@ -77,6 +79,7 @@ var s3Cmd = &cobra.Command{
S3Bucket: s3Opts.s3bucket,
S3Region: s3Opts.region,
S3Item: s3Opts.s3item,
Limit: s3Opts.limit,
MessageProvider: s3Opts.mp,
MessageProviderEndpoint: s3Opts.mpEndpoint,
Queues: s3Opts.queues,
Expand Down Expand Up @@ -130,7 +133,7 @@ var s3Cmd = &cobra.Command{
},
}

func validateS3Opts(s3url string, s3bucket string, region string, s3item string, mp string, mpEndpoint string, queues string, poll bool) (s3Options, error) {
func validateS3Opts(s3url string, s3bucket string, region string, s3item string, limit int, mp string, mpEndpoint string, queues string, poll bool) (s3Options, error) {
var opts s3Options

if poll {
Expand All @@ -148,13 +151,13 @@ func validateS3Opts(s3url string, s3bucket string, region string, s3item string,
return opts, fmt.Errorf("expected s3 bucket")
}

opts = s3Options{s3url, s3bucket, region, s3item, queues, mp, mpEndpoint, poll}
opts = s3Options{s3url, s3bucket, region, s3item, limit, queues, mp, mpEndpoint, poll}

return opts, nil
}

func init() {
set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-region", "s3-item", "s3-mp", "s3-mp-endpoint", "s3-queues", "poll"})
set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-region", "s3-item", "limit", "s3-mp", "s3-mp-endpoint", "s3-queues", "poll"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %s", err)
os.Exit(1)
Expand Down
8 changes: 8 additions & 0 deletions pkg/handler/collector/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type S3CollectorConfig struct {
MessageProviderEndpoint string // optional if using the sqs message provider
S3Url string // optional (uses aws sdk defaults)
S3Bucket string // bucket name to collect from
Limit int // optional max number of files to download from the bucket
S3Item string // optional (only for non-polling behaviour)
S3Region string // optional (defaults to us-east-1, assumes same region for s3 and sqs)
Queues string // optional (comma-separated list of queues/topics)
Expand Down Expand Up @@ -100,6 +101,7 @@ func retrieve(s S3Collector, ctx context.Context, docChannel chan<- *processor.D
} else {
var token *string
const MaxKeys = 100
var total = 0
for {
files, t, err := downloader.ListFiles(ctx, s.config.S3Bucket, token, MaxKeys)
if err != nil {
Expand Down Expand Up @@ -132,6 +134,12 @@ func retrieve(s S3Collector, ctx context.Context, docChannel chan<- *processor.D
},
}
docChannel <- doc

total += 1
if s.config.Limit > 0 && total >= s.config.Limit {
logger.Infof("Configured limit of %d reached. Exiting.", s.config.Limit)
break
}
}

if len(files) < MaxKeys {
Expand Down

0 comments on commit 27b7c02

Please sign in to comment.