Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(aws_s3 source): Adds an option max_number_of_messages for the aws_s3 source #20261

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/20172_max_number_of_messages.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds a `max_number_of_messages` to the sqs configuration of the `aws_s3` source

authors: fdamstra
1 change: 1 addition & 0 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ mod integration_tests {
sqs: Some(sqs::Config {
queue_url: queue_url.to_string(),
poll_secs: 1,
max_number_of_messages: 10,
visibility_timeout_secs: 0,
client_concurrency: None,
..Default::default()
Expand Down
28 changes: 27 additions & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ pub(super) struct Config {
#[configurable(metadata(docs::examples = 5))]
pub(super) client_concurrency: Option<NonZeroUsize>,

/// Maximum number of messages to poll from SQS in a batch
///
/// Defaults to 10
///
/// Should be set to a smaller value when the files are large to help prevent the ingestion of
/// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
// NOTE: We restrict this to u32 for safe conversion to i32 later.
#[serde(default = "default_max_number_of_messages")]
#[derivative(Default(value = "default_max_number_of_messages()"))]
#[configurable(metadata(docs::human_name = "Max Messages"))]
#[configurable(metadata(docs::examples = 1))]
pub(super) max_number_of_messages: u32,

#[configurable(derived)]
#[serde(default)]
#[derivative(Default)]
Expand All @@ -136,6 +149,10 @@ const fn default_visibility_timeout_secs() -> u32 {
300
}

const fn default_max_number_of_messages() -> u32 {
10
}

const fn default_true() -> bool {
true
}
Expand All @@ -147,6 +164,8 @@ pub(super) enum IngestorNewError {
source: std::num::TryFromIntError,
timeout: u64,
},
#[snafu(display("Invalid value for max_number_of_messages {}", messages))]
InvalidNumberOfMessages { messages: u32 },
}

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -207,6 +226,7 @@ pub struct State {

queue_url: String,
poll_secs: i32,
max_number_of_messages: i32,
client_concurrency: usize,
visibility_timeout_secs: i32,
delete_message: bool,
Expand All @@ -228,6 +248,11 @@ impl Ingestor {
multiline: Option<line_agg::Config>,
decoder: Decoder,
) -> Result<Ingestor, IngestorNewError> {
if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
return Err(IngestorNewError::InvalidNumberOfMessages {
messages: config.max_number_of_messages,
});
}
let state = Arc::new(State {
region,

Expand All @@ -239,6 +264,7 @@ impl Ingestor {

queue_url: config.queue_url,
poll_secs: config.poll_secs as i32,
max_number_of_messages: config.max_number_of_messages as i32,
client_concurrency: config
.client_concurrency
.map(|n| n.get())
Expand Down Expand Up @@ -637,7 +663,7 @@ impl IngestorProcess {
.sqs_client
.receive_message()
.queue_url(self.state.queue_url.clone())
.max_number_of_messages(10)
.max_number_of_messages(self.state.max_number_of_messages)
.visibility_timeout(self.state.visibility_timeout_secs)
.wait_time_seconds(self.state.poll_secs)
.send()
Expand Down
15 changes: 15 additions & 0 deletions website/cue/reference/components/sources/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,21 @@ base: components: sources: aws_s3: configuration: {
required: false
type: bool: default: true
}
max_number_of_messages: {
description: """
Maximum number of messages to poll from SQS in a batch

Defaults to 10

Should be set to a smaller value when the files are large to help prevent the ingestion of
one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
"""
required: false
type: uint: {
default: 10
examples: [1]
}
}
poll_secs: {
description: """
How long to wait while polling the queue for new messages, in seconds.
Expand Down
Loading