Skip to content

Commit 0844419

Browse files
Consolidate ExponentialErrorHandlers
* Introduce unified Jitter API (NONE, FULL, HALF) with builder `.jitter(Jitter)` * HALF jitter now samples uniformly in [ceil(timeout/2), timeout]; FULL/NONE unchanged * Centralize logic in ExponentialBackoffErrorHandler * Update docs and tests (unit + integration)
1 parent 2ba6fd5 commit 0844419

File tree

8 files changed

+187
-466
lines changed

8 files changed

+187
-466
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 39 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,9 +1348,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
13481348
----
13491349

13501350
==== Exponential Backoff Error Handler
1351-
This error handler implements an exponential backoff strategy for retrying failed SQS message processing.
1352-
1353-
The backoff duration is computed using the `ApproximateReceiveCount` message attribute, applying an exponential function to determine the delay. Once calculated, the handler sets the message's visibility timeout to the computed backoff value, postponing the message's reprocessing accordingly.
1351+
Implements exponential backoff for retrying failed SQS message processing. The base backoff is computed from `ApproximateReceiveCount` as
1352+
`initialVisibilityTimeoutSeconds * multiplier^(receiveCount - 1)`, capped by `maxVisibilityTimeoutSeconds`.
13541353

13551354
[cols="2,3,1,1"]
13561355
|===
@@ -1362,7 +1361,7 @@ The backoff duration is computed using the `ApproximateReceiveCount` message att
13621361

13631362
NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours). If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds this limit, an `IllegalArgumentException` will be thrown.
13641363

1365-
When using auto-configured factory, simply declare a `@Bean` and the error handler will be set
1364+
When using the auto-configured factory, a single `@Bean` can be declared and the error handler will be set:
13661365

13671366
[source, java]
13681367
----
@@ -1377,32 +1376,23 @@ public ExponentialBackoffErrorHandler<Object> asyncErrorHandler() {
13771376
}
13781377
----
13791378

1380-
Alternatively, `ExponentialBackoffErrorHandler` can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1379+
Alternatively, set it via `SqsMessageListenerContainerFactory#errorHandler(...)` or directly on a container if configuring programmatically.
13811380

1382-
[source, java]
1383-
----
1384-
@Bean
1385-
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1386-
return SqsMessageListenerContainerFactory
1387-
.builder()
1388-
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1389-
.errorHandler(ExponentialBackoffErrorHandler
1390-
.builder()
1391-
.initialVisibilityTimeoutSeconds(1)
1392-
.multiplier(2)
1393-
.maxVisibilityTimeoutSeconds(10)
1394-
.build())
1395-
.build();
1396-
}
1397-
----
1381+
===== Jitter
1382+
Jitter randomizes the final visibility timeout derived from the exponential calculation to avoid synchronized retries ("thundering herd"). It can be configured on the builder via `jitter(...)`.
1383+
1384+
`ExponentialBackoffErrorHandler` supports the following strategies:
1385+
1386+
- `Jitter.NONE` (default): no randomization; uses the exact exponential timeout.
1387+
- `Jitter.FULL`: picks a random value uniformly in [0, timeout].
1388+
- `Jitter.HALF`: picks a random value uniformly in [ceil(timeout/2), timeout].
13981389

1399-
==== Exponential Backoff Full Jitter Error Handler
1400-
This error handler applies an exponential backoff strategy with *full jitter*
1401-
when retrying failed message processing. After calculating the exponential
1402-
visibility timeout using the `ApproximateReceiveCount` message attribute, a
1403-
random value between zero and the computed timeout is selected. The selected
1404-
value becomes the new visibility timeout, spreading retries and helping to
1405-
avoid spikes caused by synchronized retries.
1390+
The random source can be overridden with `randomSupplier(...)`.
1391+
1392+
====== Full Jitter
1393+
Applies exponential backoff with *full jitter* when retrying failed message processing. After calculating the exponential
1394+
visibility timeout using the `ApproximateReceiveCount` message attribute, a random value between zero and the computed timeout is selected.
1395+
The selected value becomes the new visibility timeout, spreading retries and helping to avoid spikes caused by synchronized retries.
14061396

14071397
[cols="2,3,1,1"]
14081398
|===
@@ -1421,47 +1411,24 @@ NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
14211411
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
14221412
this limit, an `IllegalArgumentException` will be thrown.
14231413

1424-
When using auto-configured factory, simply declare a `@Bean` and the error
1425-
handler will be set
1414+
To enable full jitter, `jitter(Jitter.FULL)` should be set on the builder:
14261415

14271416
[source, java]
14281417
----
1429-
@Bean
1430-
public ExponentialBackoffErrorHandlerWithFullJitter<Object> asyncErrorHandler() {
1431-
return ExponentialBackoffErrorHandlerWithFullJitter
1432-
.builder()
1433-
.initialVisibilityTimeoutSeconds(1)
1434-
.multiplier(2)
1435-
.maxVisibilityTimeoutSeconds(10)
1436-
.build();
1437-
}
1418+
ExponentialBackoffErrorHandler<Object> handler = ExponentialBackoffErrorHandler
1419+
.builder()
1420+
.initialVisibilityTimeoutSeconds(1)
1421+
.multiplier(2)
1422+
.maxVisibilityTimeoutSeconds(10)
1423+
.jitter(Jitter.FULL)
1424+
.build();
14381425
----
14391426

1440-
Alternatively, `ExponentialBackoffErrorHandlerWithFullJitter` can be set in the
1441-
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1442-
1443-
[source, java]
1444-
----
1445-
@Bean
1446-
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1447-
return SqsMessageListenerContainerFactory
1448-
.builder()
1449-
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1450-
.errorHandler(ExponentialBackoffErrorHandlerWithFullJitter
1451-
.builder()
1452-
.initialVisibilityTimeoutSeconds(1)
1453-
.multiplier(2)
1454-
.maxVisibilityTimeoutSeconds(10)
1455-
.build())
1456-
.build();
1457-
}
1458-
----
1427+
The handler can then be passed to a factory or container as needed.
14591428

1460-
==== Exponential Backoff Half Jitter Error Handler
1461-
This variant also computes the visibility timeout exponentially but applies
1462-
*half jitter*. The exponential delay is halved and a random value between zero
1463-
and this half is added to it. The resulting timeout is then used to change the
1464-
message visibility.
1429+
====== Half Jitter
1430+
Computes the visibility timeout exponentially and applies *half jitter*. A random value is selected uniformly between ceil(timeout/2) and timeout.
1431+
The selected value becomes the new visibility timeout, providing moderate randomization while maintaining a reasonable minimum delay.
14651432

14661433
[cols="2,3,1,1"]
14671434
|===
@@ -1480,41 +1447,20 @@ NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
14801447
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
14811448
this limit, an `IllegalArgumentException` will be thrown.
14821449

1483-
When using auto-configured factory, simply declare a `@Bean` and the error
1484-
handler will be set
1450+
To enable half jitter, `jitter(Jitter.HALF)` on the builder:
14851451

14861452
[source, java]
14871453
----
1488-
@Bean
1489-
public ExponentialBackoffErrorHandlerWithHalfJitter<Object> asyncErrorHandler() {
1490-
return ExponentialBackoffErrorHandlerWithHalfJitter
1491-
.builder()
1492-
.initialVisibilityTimeoutSeconds(1)
1493-
.multiplier(2)
1494-
.maxVisibilityTimeoutSeconds(10)
1495-
.build();
1496-
}
1454+
ExponentialBackoffErrorHandler<Object> handler = ExponentialBackoffErrorHandler
1455+
.builder()
1456+
.initialVisibilityTimeoutSeconds(1)
1457+
.multiplier(2)
1458+
.maxVisibilityTimeoutSeconds(10)
1459+
.jitter(Jitter.HALF)
1460+
.build();
14971461
----
14981462

1499-
Alternatively, `ExponentialBackoffErrorHandlerWithHalfJitter` can be set in the
1500-
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1501-
1502-
[source, java]
1503-
----
1504-
@Bean
1505-
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1506-
return SqsMessageListenerContainerFactory
1507-
.builder()
1508-
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1509-
.errorHandler(ExponentialBackoffErrorHandlerWithHalfJitter
1510-
.builder()
1511-
.initialVisibilityTimeoutSeconds(1)
1512-
.multiplier(2)
1513-
.maxVisibilityTimeoutSeconds(10)
1514-
.build())
1515-
.build();
1516-
}
1517-
----
1463+
The handler can then be passed to a factory or container as needed.
15181464

15191465
==== Linear Backoff Error Handler
15201466
`LinearBackoffErrorHandler` increases the visibility timeout linearly whenever a

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandler.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import io.awspring.cloud.sqs.listener.BatchVisibility;
2020
import io.awspring.cloud.sqs.listener.Visibility;
2121
import java.util.Collection;
22+
import java.util.Random;
2223
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ThreadLocalRandom;
25+
import java.util.function.Supplier;
2326
import org.slf4j.Logger;
2427
import org.slf4j.LoggerFactory;
2528
import org.springframework.messaging.Message;
@@ -38,6 +41,8 @@
3841
*
3942
* @author Bruno Garcia
4043
* @author Rafael Pavarini
44+
* @author Tomaz Fernandes
45+
*
4146
*/
4247

4348
public class ExponentialBackoffErrorHandler<T> implements AsyncErrorHandler<T> {
@@ -46,12 +51,16 @@ public class ExponentialBackoffErrorHandler<T> implements AsyncErrorHandler<T> {
4651
private final int initialVisibilityTimeoutSeconds;
4752
private final double multiplier;
4853
private final int maxVisibilityTimeoutSeconds;
54+
private final Supplier<Random> randomSupplier;
55+
private final Jitter jitter;
4956

5057
private ExponentialBackoffErrorHandler(int initialVisibilityTimeoutSeconds, double multiplier,
51-
int maxVisibilityTimeoutSeconds) {
58+
int maxVisibilityTimeoutSeconds, Supplier<Random> randomSupplier, Jitter jitter) {
5259
this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds;
5360
this.multiplier = multiplier;
5461
this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds;
62+
this.randomSupplier = randomSupplier;
63+
this.jitter = jitter;
5564
}
5665

5766
@Override
@@ -104,8 +113,12 @@ private int calculateTimeout(Message<T> message) {
104113
}
105114

106115
private int calculateTimeout(long receiveMessageCount) {
107-
return ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount,
116+
int timeout = ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount,
108117
initialVisibilityTimeoutSeconds, multiplier, maxVisibilityTimeoutSeconds);
118+
int jitteredTimeout = jitter.applyJitter(new Jitter.Context(timeout, randomSupplier));
119+
logger.debug("Exponential backoff jitter applied: original={}, jittered={}, receiveCount={}, jitterType={}",
120+
timeout, jitteredTimeout, receiveMessageCount, jitter.getClass().getSimpleName());
121+
return jitteredTimeout;
109122
}
110123

111124
public static <T> Builder<T> builder() {
@@ -117,6 +130,8 @@ public static class Builder<T> {
117130
private int initialVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS;
118131
private double multiplier = BackoffVisibilityConstants.DEFAULT_MULTIPLIER;
119132
private int maxVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS;
133+
private Supplier<Random> randomSupplier = ThreadLocalRandom::current;
134+
private Jitter jitter = Jitter.NONE;
120135

121136
public Builder<T> initialVisibilityTimeoutSeconds(int initialVisibilityTimeoutSeconds) {
122137
ErrorHandlerVisibilityHelper.checkVisibilityTimeout(initialVisibilityTimeoutSeconds);
@@ -137,11 +152,23 @@ public Builder<T> maxVisibilityTimeoutSeconds(int maxVisibilityTimeoutSeconds) {
137152
return this;
138153
}
139154

155+
public Builder<T> randomSupplier(Supplier<Random> randomSupplier) {
156+
Assert.notNull(randomSupplier, "randomSupplier cannot be null");
157+
this.randomSupplier = randomSupplier;
158+
return this;
159+
}
160+
161+
public Builder<T> jitter(Jitter jitter) {
162+
Assert.notNull(jitter, "jitter cannot be null");
163+
this.jitter = jitter;
164+
return this;
165+
}
166+
140167
public ExponentialBackoffErrorHandler<T> build() {
141168
Assert.isTrue(initialVisibilityTimeoutSeconds <= maxVisibilityTimeoutSeconds,
142169
"Initial visibility timeout must not exceed max visibility timeout");
143170
return new ExponentialBackoffErrorHandler<>(initialVisibilityTimeoutSeconds, multiplier,
144-
maxVisibilityTimeoutSeconds);
171+
maxVisibilityTimeoutSeconds, randomSupplier, jitter);
145172
}
146173
}
147174
}

0 commit comments

Comments
 (0)