Skip to content

Commit

Permalink
enhancement(aws_s3 source): Adds an option max_number_of_messages f…
Browse files Browse the repository at this point in the history
…or the aws_s3 source (#20261)

* Adds an option `max_number_of_messages` for the aws_s3 source

* Adds range checking to `max_number_of_messages`; improves documentation

* Adds a changelog entry

* Ran `make fmt`
  • Loading branch information
fdamstra authored Apr 11, 2024
1 parent f1439bc commit 153919d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
3 changes: 3 additions & 0 deletions changelog.d/20172_max_number_of_messages.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds a `max_number_of_messages` to the sqs configuration of the `aws_s3` source

authors: fdamstra
1 change: 1 addition & 0 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ mod integration_tests {
sqs: Some(sqs::Config {
queue_url: queue_url.to_string(),
poll_secs: 1,
max_number_of_messages: 10,
visibility_timeout_secs: 0,
client_concurrency: None,
..Default::default()
Expand Down
28 changes: 27 additions & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ pub(super) struct Config {
#[configurable(metadata(docs::examples = 5))]
pub(super) client_concurrency: Option<NonZeroUsize>,

/// Maximum number of messages to poll from SQS in a batch
///
/// Defaults to 10
///
/// Should be set to a smaller value when the files are large to help prevent the ingestion of
/// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
// NOTE: We restrict this to u32 for safe conversion to i32 later.
#[serde(default = "default_max_number_of_messages")]
#[derivative(Default(value = "default_max_number_of_messages()"))]
#[configurable(metadata(docs::human_name = "Max Messages"))]
#[configurable(metadata(docs::examples = 1))]
pub(super) max_number_of_messages: u32,

#[configurable(derived)]
#[serde(default)]
#[derivative(Default)]
Expand All @@ -136,6 +149,10 @@ const fn default_visibility_timeout_secs() -> u32 {
300
}

const fn default_max_number_of_messages() -> u32 {
10
}

const fn default_true() -> bool {
true
}
Expand All @@ -147,6 +164,8 @@ pub(super) enum IngestorNewError {
source: std::num::TryFromIntError,
timeout: u64,
},
#[snafu(display("Invalid value for max_number_of_messages {}", messages))]
InvalidNumberOfMessages { messages: u32 },
}

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -207,6 +226,7 @@ pub struct State {

queue_url: String,
poll_secs: i32,
max_number_of_messages: i32,
client_concurrency: usize,
visibility_timeout_secs: i32,
delete_message: bool,
Expand All @@ -228,6 +248,11 @@ impl Ingestor {
multiline: Option<line_agg::Config>,
decoder: Decoder,
) -> Result<Ingestor, IngestorNewError> {
if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
return Err(IngestorNewError::InvalidNumberOfMessages {
messages: config.max_number_of_messages,
});
}
let state = Arc::new(State {
region,

Expand All @@ -239,6 +264,7 @@ impl Ingestor {

queue_url: config.queue_url,
poll_secs: config.poll_secs as i32,
max_number_of_messages: config.max_number_of_messages as i32,
client_concurrency: config
.client_concurrency
.map(|n| n.get())
Expand Down Expand Up @@ -637,7 +663,7 @@ impl IngestorProcess {
.sqs_client
.receive_message()
.queue_url(self.state.queue_url.clone())
.max_number_of_messages(10)
.max_number_of_messages(self.state.max_number_of_messages)
.visibility_timeout(self.state.visibility_timeout_secs)
.wait_time_seconds(self.state.poll_secs)
.send()
Expand Down
15 changes: 15 additions & 0 deletions website/cue/reference/components/sources/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,21 @@ base: components: sources: aws_s3: configuration: {
required: false
type: bool: default: true
}
max_number_of_messages: {
description: """
Maximum number of messages to poll from SQS in a batch
Defaults to 10
Should be set to a smaller value when the files are large to help prevent the ingestion of
one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
"""
required: false
type: uint: {
default: 10
examples: [1]
}
}
poll_secs: {
description: """
How long to wait while polling the queue for new messages, in seconds.
Expand Down

0 comments on commit 153919d

Please sign in to comment.