diff --git a/CHANGELOG.md b/CHANGELOG.md index 298fe8a..7fd65b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,7 +64,7 @@ ### Features -* github actions ([0f3f860](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/0f3f86048fa893b60f8cb8962a15b9c9a545627a)) +* GitHub actions ([0f3f860](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/0f3f86048fa893b60f8cb8962a15b9c9a545627a)) * initial commit ([7b42aec](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/7b42aec084fd4fd21904a935324d40d0b476916d)) * maven publish ([840108a](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/840108ad21c1b904fcea555a70cbf3adbd33c351)) * retry per queue ([21ce36b](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/21ce36b750a732239af36e18e0e20f264d434e1b)) diff --git a/README.md b/README.md index 5b0ebd4..83c6689 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ class MyListener { } ``` -By default, any exception thrown in the listener will immediately lead to the message being requeued. In many cases -exceptions are caused by either malformed messages or unavailable backends. In both cases, requeuing the message will +By default, any exception thrown in the listener will immediately lead to the message being re-queued. In many cases +exceptions are caused by either malformed messages or unavailable backends. In both cases, re-queuing the message will not help. If a backend is unavailable due to being overloaded, this behavior is harmful. A solution to this problem is exponential backoff. A message will not be retried immediately, but after some delay. For @@ -37,7 +37,7 @@ class MyListener { public void handle(Message msg) { try { processMessage(msg); - } catch (BackendTimeoutExceptoin e) { + } catch (BackendTimeoutException e) { // Backend will probably come back. Retry. throw new RetryMessagesException(msg); } catch (MalformedMessageException e) { @@ -88,12 +88,45 @@ jaconi: If you set `create-resources = true` you need to ensure that the RabbitMQ user that your application is using has the required permissions to declare (configure) the required queues. +## Manual Acknowledgement + +When dealing with situations where the retry error handler cannot be used (for example, when dealing with manual `ack` +and `nack`), the `RetryService` can be used directly: + +```java +class MyListener { + + @Autowired + private RetryService retryService; + + @RabbitListener(queues = "foo") + public void handle(Message msg, Channel ch) { + try { + processMessage(msg); + + // Acknowledge successfully processed messages. + ch.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (BackendTimeoutException e) { + // Backend will probably come back. Retry. + retryService.retry(msg); + + // Acknowledge messages scheduled for retry. + ch.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (MalformedMessageException e) { + // The message will not be fixed by retrying... + // Log and discard. + ch.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); + } + } +} +``` + ## Releasing Spring RabbitMQ Retry is published to the central maven repository. Usually, publishing happens automatically via GitHub Actions. However, if you are an employee of jaconi, you can also -publish releases manually. To publish a release, you will need to configure the GPG private signing key and the keys +publish releases manually. To publish a release, you will need to configure the GPG private signing key and the key's passphrase: ``` diff --git a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfiguration.java b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfiguration.java index bffbb97..7696274 100644 --- a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfiguration.java +++ b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfiguration.java @@ -16,7 +16,7 @@ @AutoConfiguration(after = RabbitAutoConfiguration.class) @ConditionalOnProperty(value = "jaconi.rabbitmq.listener.retry.enabled", havingValue = "true") @EnableConfigurationProperties(RetryProperties.class) -@Import({RetryErrorHandler.class, RetryResourceConfiguration.class}) +@Import({RetryService.class, RetryErrorHandler.class, RetryResourceConfiguration.class}) @RequiredArgsConstructor public class RetryAutoConfiguration { private static final String NOOP_LOGGER = "io.jaconi.spring.rabbitmq.retry.noop"; diff --git a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandler.java b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandler.java index 114145f..6565076 100644 --- a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandler.java +++ b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandler.java @@ -1,27 +1,22 @@ package io.jaconi.spring.rabbitmq.retry; -import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.ImmediateAcknowledgeAmqpException; -import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; -import org.springframework.amqp.support.AmqpHeaders; import org.springframework.lang.NonNull; -import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** - * Custom error handling for RabbitMQ. By default, Spring AMQP retries most exceptions by immediately requeuing the + * Custom error handling for RabbitMQ. By default, Spring AMQP retries most exceptions by immediately re-queuing the * causing message. This error handler rejects any messages causing an exception in the listener. If messages should be * retried, the listener code can throw a {@link RetryMessagesException}. The messages in the * {@link RetryMessagesException} are retried as configured in the {@link RetryProperties}. */ -@Slf4j @Component("retryErrorHandler") public class RetryErrorHandler extends ConditionalRejectingErrorHandler { - private final AmqpTemplate amqpTemplate; + private final RetryService retryService; - public RetryErrorHandler(AmqpTemplate amqpTemplate) { + public RetryErrorHandler(RetryService retryService) { super(new DefaultExceptionStrategy() { @Override protected boolean isUserCauseFatal(@NonNull Throwable cause) { @@ -30,54 +25,16 @@ protected boolean isUserCauseFatal(@NonNull Throwable cause) { } }); - this.amqpTemplate = amqpTemplate; + this.retryService = retryService; } @Override public void handleError(@NonNull Throwable t) { if (t instanceof ListenerExecutionFailedException lefe && lefe.getCause() instanceof RetryMessagesException rme) { - rme.getMessages().forEach(this::retryMessage); + rme.getMessages().forEach(retryService::retryMessage); throw new ImmediateAcknowledgeAmqpException("acknowledge messages as they were scheduled for retry", t); } else { super.handleError(t); } } - - private void retryMessage(Message message) { - var retry = getRetry(message); - log.info("retrying message (attempt {}): {}", retry, message); - - var routingKey = message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY, String.class); - amqpTemplate.convertAndSend(getRetryExchange(message), routingKey, message.getPayload(), m -> { - m.getMessageProperties().setHeader(RetryProperties.RETRY_HEADER, retry); - TechnicalHeadersFilter.filterHeaders(message.getHeaders()) - .forEach(h -> m.getMessageProperties().setHeader(h, message.getHeaders().get(h))); - return m; - }); - } - - /** - * Determine the retry attempt for the {@link Message}. - * - * @param message the {@link Message} - * @return {@literal 1L} for the first retry, {@literal 2L} for the second, and so on - */ - private long getRetry(Message message) { - Long previousRetryAttempt = message.getHeaders().get(RetryProperties.RETRY_HEADER, Long.class); - if (previousRetryAttempt == null) { - previousRetryAttempt = 0L; - } - - return previousRetryAttempt + 1; - } - - /** - * Determine the retry exchange for a {@link Message}. - * - * @param message the {@link Message} - * @return the retry exchange - */ - private String getRetryExchange(Message message) { - return RetryProperties.RETRY_EXCHANGE_PATTERN.formatted(message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE, String.class)); - } } diff --git a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryMessagesException.java b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryMessagesException.java index 29c6784..4106ed0 100644 --- a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryMessagesException.java +++ b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryMessagesException.java @@ -1,5 +1,6 @@ package io.jaconi.spring.rabbitmq.retry; +import lombok.Getter; import org.springframework.amqp.support.converter.MessagingMessageConverter; import org.springframework.messaging.Message; @@ -7,6 +8,7 @@ import java.util.Collection; import java.util.Collections; +@Getter public class RetryMessagesException extends RuntimeException { private static final MessagingMessageConverter CONVERTER = new MessagingMessageConverter(); private final Collection> messages; @@ -70,8 +72,4 @@ public RetryMessagesException(String message, Throwable cause, Collection(); this.messages.addAll(messages); } - - public Collection> getMessages() { - return messages; - } } diff --git a/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryService.java b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryService.java new file mode 100644 index 0000000..385b98d --- /dev/null +++ b/src/main/java/io/jaconi/spring/rabbitmq/retry/RetryService.java @@ -0,0 +1,75 @@ +package io.jaconi.spring.rabbitmq.retry; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.amqp.support.converter.MessagingMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +/** + * Retry AMQP messages as configured in the {@link RetryProperties}. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RetryService { + private static final MessagingMessageConverter CONVERTER = new MessagingMessageConverter(); + + private final AmqpTemplate amqpTemplate; + + /** + * Retry a {@link org.springframework.amqp.core.Message} by increasing the retry attempt in the message header and + * sending the message to the retry exchange. + * + * @param message the {@link org.springframework.amqp.core.Message} + */ + public void retryMessage(org.springframework.amqp.core.Message message) { + retryMessage((Message) CONVERTER.fromMessage(message)); + } + + /** + * Retry a {@link Message} by increasing the retry attempt in the message header and sending the message to the + * retry exchange. + * + * @param message the {@link Message} + */ + public void retryMessage(Message message) { + var retry = getRetry(message); + log.info("retrying message (attempt {}): {}", retry, message); + + var routingKey = message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY, String.class); + amqpTemplate.convertAndSend(getRetryExchange(message), routingKey, message.getPayload(), m -> { + m.getMessageProperties().setHeader(RetryProperties.RETRY_HEADER, retry); + TechnicalHeadersFilter.filterHeaders(message.getHeaders()) + .forEach(h -> m.getMessageProperties().setHeader(h, message.getHeaders().get(h))); + return m; + }); + } + + /** + * Determine the retry attempt for the {@link Message}. + * + * @param message the {@link Message} + * @return {@literal 1L} for the first retry, {@literal 2L} for the second, and so on + */ + private long getRetry(Message message) { + Long previousRetryAttempt = message.getHeaders().get(RetryProperties.RETRY_HEADER, Long.class); + if (previousRetryAttempt == null) { + previousRetryAttempt = 0L; + } + + return previousRetryAttempt + 1; + } + + /** + * Determine the retry exchange for a {@link Message}. + * + * @param message the {@link Message} + * @return the retry exchange + */ + private String getRetryExchange(Message message) { + return RetryProperties.RETRY_EXCHANGE_PATTERN.formatted(message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE, String.class)); + } +} diff --git a/src/test/java/io/jaconi/spring/rabbitmq/retry/RabbitMQTest.java b/src/test/java/io/jaconi/spring/rabbitmq/retry/RabbitMQTest.java new file mode 100644 index 0000000..0e82bd4 --- /dev/null +++ b/src/test/java/io/jaconi/spring/rabbitmq/retry/RabbitMQTest.java @@ -0,0 +1,71 @@ +package io.jaconi.spring.rabbitmq.retry; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.RabbitMQContainer; + +@SpringBootTest(properties = { + "jaconi.rabbitmq.listener.retry.enabled=true", + "jaconi.rabbitmq.listener.retry.create-resources=true" +}) +abstract class RabbitMQTest { + protected static final String EXCHANGE = "test-exchange"; + protected static final String QUEUE = "test-queue"; + protected static final String ROUTING_KEY = "foo"; + + static final RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management-alpine"); + + @Autowired + @SuppressWarnings("unused") + protected AmqpTemplate amqpTemplate; + + @BeforeAll + static void beforeAll() { + rabbit.start(); + } + + @AfterAll + static void afterAll() { + rabbit.stop(); + } + + @DynamicPropertySource + @SuppressWarnings("unused") + static void configureProperties(DynamicPropertyRegistry registry) { + registry.add("spring.rabbitmq.addresses", rabbit::getAmqpUrl); + + registry.add("jaconi.rabbitmq.listener.retry.queues.%s.max-attempts".formatted(QUEUE), () -> 2); + registry.add("jaconi.rabbitmq.listener.retry.queues.%s.durations[0]".formatted(QUEUE), () -> "5s"); + registry.add("jaconi.rabbitmq.listener.retry.queues.%s.durations[1]".formatted(QUEUE), () -> "10s"); + } + + @SpringBootApplication + protected abstract static class RabbitMQTestApplication { + + @Bean(QUEUE) + @SuppressWarnings("unused") + Queue queue() { + return new Queue(QUEUE); + } + + @Bean(EXCHANGE) + @SuppressWarnings("unused") + DirectExchange exchange() { + return new DirectExchange(EXCHANGE); + } + + @Bean + @SuppressWarnings("unused") + Binding binding(@Qualifier(QUEUE) Queue queue, @Qualifier(EXCHANGE) Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); + } + } +} diff --git a/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfigurationTest.java b/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfigurationTest.java index 688825d..13ad43a 100644 --- a/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfigurationTest.java +++ b/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryAutoConfigurationTest.java @@ -1,7 +1,6 @@ package io.jaconi.spring.rabbitmq.retry; import org.junit.jupiter.api.Test; -import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.boot.test.context.runner.ApplicationContextRunner; @@ -38,10 +37,6 @@ public void testAutoConfiguration_enabled() { .run(context -> { assertThat(context).hasNotFailed(); assertThat(context).hasBean("retryContainerCustomizer"); - - context.getBean(ContainerCustomizer.class); - - }); } } diff --git a/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandlerTest.java b/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandlerTest.java index fb3aa81..9957e93 100644 --- a/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandlerTest.java +++ b/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryErrorHandlerTest.java @@ -1,57 +1,14 @@ package io.jaconi.spring.rabbitmq.retry; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.containers.RabbitMQContainer; import java.time.Duration; import static org.assertj.core.api.Assertions.assertThat; -@SpringBootTest(properties = { - "jaconi.rabbitmq.listener.retry.enabled=true", - "jaconi.rabbitmq.listener.retry.create-resources=true" -}) -class RetryErrorHandlerTest { - public static final String EXCHANGE = "test-exchange"; - public static final String QUEUE = "test-queue"; - public static final String ROUTING_KEY = "foo"; - - static final RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management-alpine"); - - @Autowired - @SuppressWarnings("unused") - private AmqpTemplate amqpTemplate; - - @BeforeAll - static void beforeAll() { - rabbit.start(); - } - - @AfterAll - static void afterAll() { - rabbit.stop(); - } - - @DynamicPropertySource - @SuppressWarnings("unused") - static void configureProperties(DynamicPropertyRegistry registry) { - registry.add("spring.rabbitmq.addresses", rabbit::getAmqpUrl); - - registry.add("jaconi.rabbitmq.listener.retry.queues.%s.max-attempts".formatted(QUEUE), () -> 2); - registry.add("jaconi.rabbitmq.listener.retry.queues.%s.durations[0]".formatted(QUEUE), () -> "5s"); - registry.add("jaconi.rabbitmq.listener.retry.queues.%s.durations[1]".formatted(QUEUE), () -> "10s"); - } +class RetryErrorHandlerTest extends RabbitMQTest { @Test void handleError1() { @@ -89,29 +46,9 @@ void handleError3() { assertThat(msg).isEqualTo(payload); } - @SpringBootApplication @SuppressWarnings("unused") - public static class TestApplication { - - @Bean(QUEUE) - @SuppressWarnings("unused") - Queue queue() { - return new Queue(QUEUE); - } - - @Bean(EXCHANGE) - @SuppressWarnings("unused") - DirectExchange exchange() { - return new DirectExchange(EXCHANGE); - } - - @Bean - @SuppressWarnings("unused") - Binding binding(@Qualifier(QUEUE) Queue queue, @Qualifier(EXCHANGE) Exchange exchange) { - return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); - } + public static class TestApplication extends RabbitMQTest.RabbitMQTestApplication { - @SuppressWarnings("unused") @RabbitListener(queues = QUEUE) public void handle(Message message) { throw new RetryMessagesException(message); diff --git a/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryServiceTest.java b/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryServiceTest.java new file mode 100644 index 0000000..2238aa4 --- /dev/null +++ b/src/test/java/io/jaconi/spring/rabbitmq/retry/RetryServiceTest.java @@ -0,0 +1,79 @@ +package io.jaconi.spring.rabbitmq.retry; + +import com.rabbitmq.client.Channel; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.RabbitMQContainer; + +import java.io.IOException; +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +class RetryServiceTest extends RabbitMQTest { + + @Test + void testFirstRetry() { + var payload = "Test Message 1"; + + // Send a message. Our test application will always throw a retry exception. + amqpTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, payload); + + // Make sure the message is sent to the first retry queue within 5 seconds. + var msg = amqpTemplate.receiveAndConvert("%s-retry-in-PT5S".formatted(QUEUE), Duration.ofSeconds(5).toMillis()); + assertThat(msg).isEqualTo(payload); + } + + @Test + void testSecondRetry() { + var payload = "Test Message 2"; + + // Send a message. Our test application will always throw a retry exception. + amqpTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, payload); + + // Make sure the message is sent to the second retry queue within 10 seconds. + var msg = amqpTemplate.receiveAndConvert("%s-retry-in-PT10S".formatted(QUEUE), Duration.ofSeconds(10).toMillis()); + assertThat(msg).isEqualTo(payload); + } + + @Test + void testThirdRetry() { + var payload = "Test Message 3"; + + // Send a message. Our test application will always throw a retry exception. + amqpTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, payload); + + // Make sure the message is sent to the second retry queue within 20 seconds. + var msg = amqpTemplate.receiveAndConvert("%s-retry-never".formatted(QUEUE), Duration.ofSeconds(20).toMillis()); + assertThat(msg).isEqualTo(payload); + } + + @SuppressWarnings("unused") + public static class TestApplication extends RabbitMQTestApplication { + + @Autowired + @SuppressWarnings("unused") + private RetryService retryService; + + @RabbitListener(queues = QUEUE) + public void handle(Message message, Channel channel) throws IOException { + retryService.retryMessage(message); + + if (channel != null) { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } + } + } +}