diff --git a/changelog.d/20172_max_number_of_messages.enhancement.md b/changelog.d/20172_max_number_of_messages.enhancement.md new file mode 100644 index 0000000000000..75f12b4e2d26c --- /dev/null +++ b/changelog.d/20172_max_number_of_messages.enhancement.md @@ -0,0 +1,3 @@ +Adds a `max_number_of_messages` to the sqs configuration of the `aws_s3` source + +authors: fdamstra diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 4d6b83079fec4..33418a70c54f8 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -800,6 +800,7 @@ mod integration_tests { sqs: Some(sqs::Config { queue_url: queue_url.to_string(), poll_secs: 1, + max_number_of_messages: 10, visibility_timeout_secs: 0, client_concurrency: None, ..Default::default() diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 3556700211277..47c91afb78e5d 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -122,6 +122,19 @@ pub(super) struct Config { #[configurable(metadata(docs::examples = 5))] pub(super) client_concurrency: Option, + /// Maximum number of messages to poll from SQS in a batch + /// + /// Defaults to 10 + /// + /// Should be set to a smaller value when the files are large to help prevent the ingestion of + /// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10 + // NOTE: We restrict this to u32 for safe conversion to i32 later. + #[serde(default = "default_max_number_of_messages")] + #[derivative(Default(value = "default_max_number_of_messages()"))] + #[configurable(metadata(docs::human_name = "Max Messages"))] + #[configurable(metadata(docs::examples = 1))] + pub(super) max_number_of_messages: u32, + #[configurable(derived)] #[serde(default)] #[derivative(Default)] @@ -136,6 +149,10 @@ const fn default_visibility_timeout_secs() -> u32 { 300 } +const fn default_max_number_of_messages() -> u32 { + 10 +} + const fn default_true() -> bool { true } @@ -147,6 +164,8 @@ pub(super) enum IngestorNewError { source: std::num::TryFromIntError, timeout: u64, }, + #[snafu(display("Invalid value for max_number_of_messages {}", messages))] + InvalidNumberOfMessages { messages: u32 }, } #[allow(clippy::large_enum_variant)] @@ -207,6 +226,7 @@ pub struct State { queue_url: String, poll_secs: i32, + max_number_of_messages: i32, client_concurrency: usize, visibility_timeout_secs: i32, delete_message: bool, @@ -228,6 +248,11 @@ impl Ingestor { multiline: Option, decoder: Decoder, ) -> Result { + if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 { + return Err(IngestorNewError::InvalidNumberOfMessages { + messages: config.max_number_of_messages, + }); + } let state = Arc::new(State { region, @@ -239,6 +264,7 @@ impl Ingestor { queue_url: config.queue_url, poll_secs: config.poll_secs as i32, + max_number_of_messages: config.max_number_of_messages as i32, client_concurrency: config .client_concurrency .map(|n| n.get()) @@ -637,7 +663,7 @@ impl IngestorProcess { .sqs_client .receive_message() .queue_url(self.state.queue_url.clone()) - .max_number_of_messages(10) + .max_number_of_messages(self.state.max_number_of_messages) .visibility_timeout(self.state.visibility_timeout_secs) .wait_time_seconds(self.state.poll_secs) .send() diff --git a/website/cue/reference/components/sources/base/aws_s3.cue b/website/cue/reference/components/sources/base/aws_s3.cue index 6bb6087373d07..1965dcd213277 100644 --- a/website/cue/reference/components/sources/base/aws_s3.cue +++ b/website/cue/reference/components/sources/base/aws_s3.cue @@ -605,6 +605,21 @@ base: components: sources: aws_s3: configuration: { required: false type: bool: default: true } + max_number_of_messages: { + description: """ + Maximum number of messages to poll from SQS in a batch + + Defaults to 10 + + Should be set to a smaller value when the files are large to help prevent the ingestion of + one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10 + """ + required: false + type: uint: { + default: 10 + examples: [1] + } + } poll_secs: { description: """ How long to wait while polling the queue for new messages, in seconds.