From a8f7ea0c697b28b02b6dee506a1d1693822cf1f0 Mon Sep 17 00:00:00 2001 From: Gildas Cuisinier Date: Mon, 19 Aug 2024 10:18:03 +0200 Subject: [PATCH] introduce retry parameters for publish-confirm Avoid reusing reconnect-attemps and reconnect interval to define explicits new parameters for the publish mode Fix #2726 --- .../messaging/rabbitmq/RabbitMQConnector.java | 2 + .../internals/RabbitMQMessageSender.java | 4 +- .../messaging/rabbitmq/RabbitMQTest.java | 65 ++++++++++++++++++- .../messaging/rabbitmq/RabbitMQUsage.java | 37 +++++++++++ 4 files changed, 105 insertions(+), 3 deletions(-) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index cf14e77e91..70fb99e1c0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -132,6 +132,8 @@ @ConnectorAttribute(name = "default-routing-key", direction = OUTGOING, description = "The default routing key to use when sending messages to the exchange", type = "string", defaultValue = "") @ConnectorAttribute(name = "default-ttl", direction = OUTGOING, description = "If specified, the time (ms) sent messages can remain in queues undelivered before they are dead", type = "long") @ConnectorAttribute(name = "publish-confirms", direction = OUTGOING, description = "If set to true, published messages are acknowledged when the publish confirm is received from the broker", type = "boolean", defaultValue = "false") +@ConnectorAttribute(name = "retry-on-fail-attempts", direction = OUTGOING, description = "The number of tentative to retry on failure", type = "int", defaultValue = "6") +@ConnectorAttribute(name = "retry-on-fail-interval", direction = OUTGOING, description = "The interval (in seconds) between two sending attempts", type = "int", defaultValue = "5") // Tracing @ConnectorAttribute(name = "tracing.enabled", direction = INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", type = "boolean", defaultValue = "true") diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java index 96fa2b1050..1ccb9bb99d 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java @@ -261,8 +261,8 @@ private Uni> send( final Message msg, final String exchange, final RabbitMQConnectorOutgoingConfiguration configuration) { - final int retryAttempts = configuration.getReconnectAttempts(); - final int retryInterval = configuration.getReconnectInterval(); + final int retryAttempts = configuration.getRetryOnFailAttempts(); + final int retryInterval = configuration.getRetryOnFailInterval(); final String defaultRoutingKey = configuration.getDefaultRoutingKey(); final RabbitMQMessageConverter.OutgoingRabbitMQMessage outgoingRabbitMQMessage = RabbitMQMessageConverter diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index 9bf159f181..b4379bfe25 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import io.smallrye.common.annotation.CheckReturnValue; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.config.ConfigProvider; @@ -473,6 +474,9 @@ static class DeliveryTagInterceptor implements OutgoingInterceptor { List deliveryTags = new CopyOnWriteArrayList<>(); + List deliveryTagsNack = new CopyOnWriteArrayList<>(); + + @Override public void onMessageAck(Message message) { message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTags.add((long) m.getResult())); @@ -480,14 +484,73 @@ public void onMessageAck(Message message) { @Override public void onMessageNack(Message message, Throwable failure) { - + message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue())); } public List getDeliveryTags() { return deliveryTags; } + + public List getDeliveryTagsNack() { + return deliveryTagsNack; + } + + public int numberOfProcessedMessage(){ + return deliveryTags.size() + deliveryTagsNack.size(); + } } + /** + * Verifies that messages can be sent to RabbitMQ with publish confirms. + * + * @throws InterruptedException + */ + @Test + void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedException { + final String routingKey = "normal"; + + List receivedTags = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(10); + usage.prepareNackQueue(exchangeName, routingKey);/*, v -> { + receivedTags.add(v.envelope().getDeliveryTag()); + latch.countDown(); + });*/ + + weld.addBeanClasses(ProducingBean.class, DeliveryTagInterceptor.class); + + new MapBasedConfig() + .put("mp.messaging.outgoing.sink.exchange.name", exchangeName) + .put("mp.messaging.outgoing.sink.exchange.declare", false) + .put("mp.messaging.outgoing.sink.default-routing-key", routingKey) + .put("mp.messaging.outgoing.sink.publish-confirms", true) + .put("mp.messaging.outgoing.sink.retry-on-fail-attempts", 0) + .put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.outgoing.sink.host", host) + .put("mp.messaging.outgoing.sink.port", port) + .put("mp.messaging.outgoing.sink.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + DeliveryTagInterceptor interceptor = get(container, DeliveryTagInterceptor.class); + await().until(() -> + interceptor.numberOfProcessedMessage() == 10); + + + + assertThat(interceptor.getDeliveryTags()) + .hasSizeBetween(1,2); + + assertThat(interceptor.getDeliveryTagsNack()) + .hasSizeBetween(8,9); + } + + + + /** * Verifies that messages can be sent to RabbitMQ. * diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java index 1a86c93515..e467886ece 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java @@ -25,6 +25,9 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import io.smallrye.common.annotation.CheckReturnValue; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Future; import org.jboss.logging.Logger; import com.rabbitmq.client.AMQP; @@ -157,6 +160,40 @@ public void consume(String exchange, String routingKey, }); } + /** + * Use the supplied function to asynchronously consume messages from a queue. + * + * @param exchange the exchange + * @param routingKey the routing key + */ + public void prepareNackQueue(String exchange, String routingKey) { + final String queue = "tempConsumeMessagesNack"; + // Start by the machinery to receive the messages + client.startAndAwait(); + client.exchangeDeclareAndAwait(exchange, "topic", false, true); + + JsonObject config = new JsonObject(); + config.put("x-max-length", 1); + config.put("x-overflow", "reject-publish"); + + queueDeclareAndAwait(queue, false, false, true, config); + + client.queueBindAndAwait(queue, exchange, routingKey); + + + } + + public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive, boolean autoDelete, JsonObject config) { + return (com.rabbitmq.client.AMQP.Queue.DeclareOk) queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely(); + } + + @CheckReturnValue + public io.smallrye.mutiny.Uni queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,JsonObject config) { + return io.smallrye.mutiny.vertx.AsyncResultUni.toUni(resultHandler -> { + client.getDelegate().queueDeclare(queue, durable, exclusive, autoDelete, config, resultHandler); + }); + } + public void consumeIntegers(String exchange, String routingKey, Consumer consumer) { final String queue = "tempConsumeIntegers"; // Start by the machinery to receive the messages