Skip to content

Commit

Permalink
Adds an option max_number_of_messages for the aws_s3 source
Browse files Browse the repository at this point in the history
  • Loading branch information
fdamstra committed Apr 8, 2024
1 parent 36d9cce commit 919bf14
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ mod integration_tests {
sqs: Some(sqs::Config {
queue_url: queue_url.to_string(),
poll_secs: 1,
max_number_of_messages: 10,
visibility_timeout_secs: 0,
client_concurrency: None,
..Default::default()
Expand Down
21 changes: 20 additions & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ pub(super) struct Config {
#[configurable(metadata(docs::examples = 5))]
pub(super) client_concurrency: Option<NonZeroUsize>,

/// Maximum number of messages to poll from sqs in a batch
///
/// Defaults to 10
///
/// Should be set to smaller numbers when the files are larger to help prevent large ingestions

Check failure on line 129 in src/sources/aws_s3/sqs.rs

View workflow job for this annotation

GitHub Actions / Check Spelling

`ingestions` is not a recognized word. (unrecognized-spelling)
/// from causing other files to exceed the visibility_timeout. Valid values are 1 - 10
// NOTE: We restrict this to u32 for safe conversion to i32 later.
#[serde(default = "default_max_number_of_messages")]
#[derivative(Default(value = "default_max_number_of_messages()"))]
#[configurable(metadata(docs::human_name = "Max Messages"))]
#[configurable(metadata(docs::examples = 1))]
pub(super) max_number_of_messages: u32,

#[configurable(derived)]
#[serde(default)]
#[derivative(Default)]
Expand All @@ -136,6 +149,10 @@ const fn default_visibility_timeout_secs() -> u32 {
300
}

const fn default_max_number_of_messages() -> u32 {
10
}

const fn default_true() -> bool {
true
}
Expand Down Expand Up @@ -207,6 +224,7 @@ pub struct State {

queue_url: String,
poll_secs: i32,
max_number_of_messages: i32,
client_concurrency: usize,
visibility_timeout_secs: i32,
delete_message: bool,
Expand Down Expand Up @@ -239,6 +257,7 @@ impl Ingestor {

queue_url: config.queue_url,
poll_secs: config.poll_secs as i32,
max_number_of_messages: config.max_number_of_messages as i32,
client_concurrency: config
.client_concurrency
.map(|n| n.get())
Expand Down Expand Up @@ -637,7 +656,7 @@ impl IngestorProcess {
.sqs_client
.receive_message()
.queue_url(self.state.queue_url.clone())
.max_number_of_messages(10)
.max_number_of_messages(self.state.max_number_of_messages)
.visibility_timeout(self.state.visibility_timeout_secs)
.wait_time_seconds(self.state.poll_secs)
.send()
Expand Down
15 changes: 15 additions & 0 deletions website/cue/reference/components/sources/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,21 @@ base: components: sources: aws_s3: configuration: {
required: false
type: bool: default: true
}
max_number_of_messages: {
description: """
Maximum number of messages to poll from sqs in a batch
Defaults to 10
Should be set to smaller numbers when the files are larger to help prevent large ingestions

Check failure on line 614 in website/cue/reference/components/sources/base/aws_s3.cue

View workflow job for this annotation

GitHub Actions / Check Spelling

`ingestions` is not a recognized word. (unrecognized-spelling)
from causing other files to exceed the visibility_timeout. Valid values are 1 - 10
"""
required: false
type: uint: {
default: 10
examples: [1]
}
}
poll_secs: {
description: """
How long to wait while polling the queue for new messages, in seconds.
Expand Down

0 comments on commit 919bf14

Please sign in to comment.