Skip to content

Commit 2151d3b

Browse files
authored
Add automatic SQS request batching support component: sqs (#1438)
1 parent fe9f347 commit 2151d3b

File tree

4 files changed

+900
-0
lines changed

4 files changed

+900
-0
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,90 @@ NOTE: The same factory can be used to create both `single message` and `batch` c
806806

807807
IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods.
808808

809+
==== Automatic Request Batching with SqsAsyncBatchManager
810+
811+
Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. Please note that this feature is primarily intended for use with `SqsTemplate` and is not recommended for use as a default `SqsAsyncClient` bean that could be injected into `@SqsListener` infrastructure.
812+
813+
IMPORTANT: This is different from the <<Batch Processing,Batch Processing>> feature for `@SqsListener`. Listener batch processing deals with handling multiple messages within a single listener invocation, whereas automatic request batching optimizes the underlying API calls to AWS.
814+
815+
===== Manual Configuration of the Batching Client
816+
817+
Since automatic batching is a powerful feature with specific trade-offs, Spring Cloud AWS does not auto-configure it. You can enable it by creating your own `SqsAsyncClient` bean using the provided `BatchingSqsClientAdapter`.
818+
819+
====== 1. Defining the Batching Client Bean
820+
The following example shows how to define a bean named `batchingSqsAsyncClient`. Notice the use of `@Qualifier("sqsAsyncClient")` in the method parameter. This is crucial to explicitly inject the standard, auto-configured `SqsAsyncClient` and avoid ambiguity.
821+
822+
[source,java]
823+
----
824+
@Configuration
825+
public class SqsBatchingConfiguration {
826+
827+
// Define a constant for the bean name to avoid typos
828+
public static final String BATCHING_SQS_ASYNC_CLIENT = "batchingSqsAsyncClient";
829+
830+
@Bean(name = BATCHING_SQS_ASYNC_CLIENT)
831+
public SqsAsyncClient batchingSqsAsyncClient(
832+
// Inject the standard, auto-configured client to wrap it
833+
@Qualifier("sqsAsyncClient") SqsAsyncClient standardSqsAsyncClient) {
834+
835+
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5);
836+
837+
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder()
838+
.sqsAsyncClient(standardSqsAsyncClient)
839+
.scheduledExecutor(scheduledExecutor)
840+
.build();
841+
842+
return new BatchingSqsClientAdapter(batchManager);
843+
}
844+
}
845+
----
846+
847+
====== 2. Using the Batching Client
848+
Now, use `@Qualifier` to inject your named bean. The most common use case is configuring a dedicated `SqsTemplate`.
849+
850+
[source,java]
851+
----
852+
@Service
853+
public class MyBatchingMessageService {
854+
855+
private final SqsTemplate batchingSqsTemplate;
856+
857+
public MyBatchingMessageService(
858+
@Qualifier(SqsBatchingConfiguration.BATCHING_SQS_ASYNC_CLIENT) SqsAsyncClient batchingClient) {
859+
this.batchingSqsTemplate = SqsTemplate.builder()
860+
.sqsAsyncClient(batchingClient)
861+
.build();
862+
}
863+
// ... service methods using batchingSqsTemplate
864+
}
865+
----
866+
867+
===== Important Considerations & Best Practices
868+
869+
WARNING: A call to `sqsTemplate.sendAsync(...)` returns a `CompletableFuture` that may complete successfully before the message is actually sent to AWS if the batch isn't yet flushed. The actual transmission happens later in a background thread, typically after the configured `sendRequestFrequency` interval. This can lead to **false positives** where the future appears to succeed prematurely. Always attach error handling to the `CompletableFuture` (e.g., via `.exceptionally()` or `.handle()`) to detect and handle real transmission failures. Note: Calling `.join()` on the future will block until the batch is flushed (after `sendRequestFrequency`), ensuring the operation is attempted, but still check for exceptions.
870+
871+
NOTE: The background buffering for `receiveMessage` operations is initialized **lazily**. It only starts after the first call to `sqsTemplate.receiveMessage(...)` is made on a specific queue URL. **This means that no background polling or resource consumption for receiving messages will occur until the application actively attempts to receive a message from that queue for the first time.**
872+
873+
[source,java]
874+
----
875+
CompletableFuture<SendResult<String>> future = batchingSqsTemplate.sendAsync(queueName, message);
876+
877+
future.whenComplete((result, ex) -> {
878+
if (ex != null) {
879+
// This is where you handle the actual transmission error
880+
log.error("Failed to send message to queue {}: {}", queueName, ex.getMessage());
881+
} else {
882+
log.info("Message acknowledged for batch sending with ID: {}", result.messageId());
883+
}
884+
});
885+
----
886+
887+
WARNING: **Not Recommended for `@SqsListener`**. While technically compatible, using this batching client with `@SqsListener` for receiving messages is **not recommended**. The `@SqsListener` infrastructure already performs efficient batch receiving and has a complex acknowledgment lifecycle. Adding another layer of asynchronous batching provides limited performance benefits while significantly increasing complexity. For listeners, it's best to rely on the default `SqsAsyncClient`.
888+
889+
IMPORTANT: **Bean Injection Safety**. By using a named bean and `@Qualifier` as shown in the configuration examples, you ensure the batching client is only used where intended. This prevents it from accidentally being injected into `@SqsListener` infrastructure, which should use the default `SqsAsyncClient`.
890+
891+
IMPORTANT: **AWS SDK Batching Bypass**. The `SqsAsyncBatchManager` will bypass batching for `receiveMessage` calls if certain parameters like `messageAttributeNames` are set on a per-request basis. To ensure batching works effectively, these should be configured globally on the `SqsAsyncBatchManager` builder, not on individual `receiveMessage` calls. See the `BatchingSqsClientAdapter` Javadoc for more details.
892+
809893
==== Container Options
810894

811895
Each `MessageListenerContainer` can have a different set of options.
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Copyright 2013-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.operations;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.function.Consumer;
20+
import org.springframework.util.Assert;
21+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
22+
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
23+
import software.amazon.awssdk.services.sqs.model.*;
24+
25+
/**
26+
* An {@link SqsAsyncClient} adapter that provides automatic batching capabilities using AWS SDK's
27+
* {@link SqsAsyncBatchManager}.
28+
*
29+
* <p>
30+
* This adapter automatically batches SQS operations to improve performance and reduce costs by combining multiple
31+
* requests into fewer AWS API calls. All standard SQS operations are supported: send message, receive message, delete
32+
* message, and change message visibility.
33+
*
34+
* <p>
35+
* <strong>Important - Asynchronous Behavior:</strong> This adapter processes requests asynchronously through
36+
* batching. The returned {@link CompletableFuture} reflects the batching operation,
37+
* not the final transmission to AWS SQS. This can lead to false positives where the operation appears successful locally but fails during actual transmission.
38+
* The actual transmission happens in a background thread, up to the configured {@code sendRequestFrequency} after enqueuing.
39+
* Applications must:
40+
* <ul>
41+
* <li>Handle the returned {@link CompletableFuture} to detect transmission errors.
42+
* Calling {@code .join()} will block until the batch is sent (up to {@code sendRequestFrequency}),
43+
* while {@code .exceptionally()} or {@code .handle()} are required for non-blocking error handling.</li>
44+
* <li>Implement appropriate error handling, monitoring, and retry mechanisms for critical operations.</li>
45+
* </ul>
46+
*
47+
* <p>
48+
* <strong>Batch Optimization:</strong> The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses batching
49+
* for {@code receiveMessage} calls that include per-request configurations for certain parameters. To ensure batching
50+
* is not bypassed, it is recommended to configure these settings globally on the {@code SqsAsyncBatchManager} builder
51+
* instead of on each {@code ReceiveMessageRequest}. The parameters that trigger this bypass are:
52+
* <ul>
53+
* <li>{@code messageAttributeNames}</li>
54+
* <li>{@code messageSystemAttributeNames}</li>
55+
* <li>{@code messageSystemAttributeNamesWithStrings}</li>
56+
* <li>{@code overrideConfiguration}</li>
57+
* </ul>
58+
* <p>
59+
* By configuring these globally on the manager, you ensure consistent batching performance. If you require per-request
60+
* attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be more appropriate.
61+
* @author Heechul Kang
62+
* @since 4.0.0
63+
* @see SqsAsyncBatchManager
64+
* @see SqsAsyncClient
65+
*/
66+
public class BatchingSqsClientAdapter implements SqsAsyncClient {
67+
private final SqsAsyncBatchManager batchManager;
68+
69+
/**
70+
* Creates a new {@code BatchingSqsClientAdapter} with the specified batch manager.
71+
*
72+
* @param batchManager the {@link SqsAsyncBatchManager} to use for batching operations
73+
* @throws IllegalArgumentException if batchManager is null
74+
*/
75+
public BatchingSqsClientAdapter(SqsAsyncBatchManager batchManager) {
76+
Assert.notNull(batchManager, "batchManager cannot be null");
77+
this.batchManager = batchManager;
78+
}
79+
80+
@Override
81+
public String serviceName() {
82+
return SqsAsyncClient.SERVICE_NAME;
83+
}
84+
85+
/**
86+
* Closes the underlying batch manager and releases associated resources.
87+
*
88+
* <p>
89+
* This method should be called when the adapter is no longer needed to ensure proper cleanup of threads and
90+
* connections.
91+
*/
92+
@Override
93+
public void close() {
94+
batchManager.close();
95+
}
96+
97+
/**
98+
* Sends a message to the specified SQS queue using automatic batching.
99+
*
100+
* <p>
101+
* <strong>Important:</strong> This method returns immediately, but the actual sending is performed asynchronously.
102+
* Handle the returned {@link CompletableFuture} to detect transmission errors.
103+
*
104+
* @param sendMessageRequest the request containing queue URL and message details
105+
* @return a {@link CompletableFuture} that completes with the send result
106+
*/
107+
@Override
108+
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest sendMessageRequest) {
109+
return batchManager.sendMessage(sendMessageRequest);
110+
}
111+
112+
/**
113+
* Sends a message to the specified SQS queue using automatic batching.
114+
*
115+
* <p>
116+
* <strong>Important:</strong> This method returns immediately, but the actual sending is performed asynchronously.
117+
* Handle the returned {@link CompletableFuture} to detect transmission errors.
118+
*
119+
* @param sendMessageRequest a consumer to configure the send message request
120+
* @return a {@link CompletableFuture} that completes with the send result
121+
*/
122+
@Override
123+
public CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) {
124+
return batchManager.sendMessage(sendMessageRequest);
125+
}
126+
127+
/**
128+
* Receives messages from the specified SQS queue using automatic batching.
129+
*
130+
* <p>
131+
* The batching manager may combine multiple receive requests to optimize AWS API usage.
132+
*
133+
* @param receiveMessageRequest the request containing queue URL and receive options
134+
* @return a {@link CompletableFuture} that completes with the received messages
135+
*/
136+
@Override
137+
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest receiveMessageRequest) {
138+
return batchManager.receiveMessage(receiveMessageRequest);
139+
}
140+
141+
/**
142+
* Receives messages from the specified SQS queue using automatic batching.
143+
*
144+
* <p>
145+
* The batching manager may combine multiple receive requests to optimize AWS API usage.
146+
*
147+
* @param receiveMessageRequest a consumer to configure the receive message request
148+
* @return a {@link CompletableFuture} that completes with the received messages
149+
*/
150+
@Override
151+
public CompletableFuture<ReceiveMessageResponse> receiveMessage(
152+
Consumer<ReceiveMessageRequest.Builder> receiveMessageRequest) {
153+
return batchManager.receiveMessage(receiveMessageRequest);
154+
}
155+
156+
/**
157+
* Deletes a message from the specified SQS queue using automatic batching.
158+
*
159+
* <p>
160+
* <strong>Important:</strong> The actual deletion may be delayed due to batching. Handle the returned
161+
* {@link CompletableFuture} to confirm successful deletion.
162+
*
163+
* @param deleteMessageRequest the request containing queue URL and receipt handle
164+
* @return a {@link CompletableFuture} that completes with the deletion result
165+
*/
166+
@Override
167+
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest deleteMessageRequest) {
168+
return batchManager.deleteMessage(deleteMessageRequest);
169+
}
170+
171+
/**
172+
* Deletes a message from the specified SQS queue using automatic batching.
173+
*
174+
* <p>
175+
* <strong>Important:</strong> The actual deletion may be delayed due to batching. Handle the returned
176+
* {@link CompletableFuture} to confirm successful deletion.
177+
*
178+
* @param deleteMessageRequest a consumer to configure the delete message request
179+
* @return a {@link CompletableFuture} that completes with the deletion result
180+
*/
181+
@Override
182+
public CompletableFuture<DeleteMessageResponse> deleteMessage(
183+
Consumer<DeleteMessageRequest.Builder> deleteMessageRequest) {
184+
return batchManager.deleteMessage(deleteMessageRequest);
185+
}
186+
187+
/**
188+
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching.
189+
*
190+
* <p>
191+
* The batching manager may combine multiple visibility change requests to optimize AWS API usage.
192+
*
193+
* @param changeMessageVisibilityRequest the request containing queue URL, receipt handle, and new timeout
194+
* @return a {@link CompletableFuture} that completes with the visibility change result
195+
*/
196+
@Override
197+
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(
198+
ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
199+
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest);
200+
}
201+
202+
/**
203+
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching.
204+
*
205+
* <p>
206+
* The batching manager may combine multiple visibility change requests to optimize AWS API usage.
207+
*
208+
* @param changeMessageVisibilityRequest a consumer to configure the change visibility request
209+
* @return a {@link CompletableFuture} that completes with the visibility change result
210+
*/
211+
@Override
212+
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(
213+
Consumer<ChangeMessageVisibilityRequest.Builder> changeMessageVisibilityRequest) {
214+
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest);
215+
}
216+
}

0 commit comments

Comments
 (0)