Skip to content

Commit

Permalink
introduce retry parameters for publish-confirm
Browse files Browse the repository at this point in the history
Avoid reusing reconnect-attemps and reconnect interval to define
explicits new parameters for the publish mode

Fix #2726
  • Loading branch information
gcuisinier committed Aug 19, 2024
1 parent 353773d commit a8f7ea0
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
@ConnectorAttribute(name = "default-routing-key", direction = OUTGOING, description = "The default routing key to use when sending messages to the exchange", type = "string", defaultValue = "")
@ConnectorAttribute(name = "default-ttl", direction = OUTGOING, description = "If specified, the time (ms) sent messages can remain in queues undelivered before they are dead", type = "long")
@ConnectorAttribute(name = "publish-confirms", direction = OUTGOING, description = "If set to true, published messages are acknowledged when the publish confirm is received from the broker", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "retry-on-fail-attempts", direction = OUTGOING, description = "The number of tentative to retry on failure", type = "int", defaultValue = "6")
@ConnectorAttribute(name = "retry-on-fail-interval", direction = OUTGOING, description = "The interval (in seconds) between two sending attempts", type = "int", defaultValue = "5")

// Tracing
@ConnectorAttribute(name = "tracing.enabled", direction = INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", type = "boolean", defaultValue = "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ private Uni<Message<?>> send(
final Message<?> msg,
final String exchange,
final RabbitMQConnectorOutgoingConfiguration configuration) {
final int retryAttempts = configuration.getReconnectAttempts();
final int retryInterval = configuration.getReconnectInterval();
final int retryAttempts = configuration.getRetryOnFailAttempts();
final int retryInterval = configuration.getRetryOnFailInterval();
final String defaultRoutingKey = configuration.getDefaultRoutingKey();

final RabbitMQMessageConverter.OutgoingRabbitMQMessage outgoingRabbitMQMessage = RabbitMQMessageConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import io.smallrye.common.annotation.CheckReturnValue;
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.ConfigProvider;
Expand Down Expand Up @@ -473,21 +474,83 @@ static class DeliveryTagInterceptor implements OutgoingInterceptor {

List<Long> deliveryTags = new CopyOnWriteArrayList<>();

List<Long> deliveryTagsNack = new CopyOnWriteArrayList<>();


@Override
public void onMessageAck(Message<?> message) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTags.add((long) m.getResult()));
}

@Override
public void onMessageNack(Message<?> message, Throwable failure) {

message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue()));
}

public List<Long> getDeliveryTags() {
return deliveryTags;
}

public List<Long> getDeliveryTagsNack() {
return deliveryTagsNack;
}

public int numberOfProcessedMessage(){
return deliveryTags.size() + deliveryTagsNack.size();
}
}

/**
* Verifies that messages can be sent to RabbitMQ with publish confirms.
*
* @throws InterruptedException
*/
@Test
void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedException {
final String routingKey = "normal";

List<Long> receivedTags = new CopyOnWriteArrayList<>();
CountDownLatch latch = new CountDownLatch(10);
usage.prepareNackQueue(exchangeName, routingKey);/*, v -> {
receivedTags.add(v.envelope().getDeliveryTag());
latch.countDown();
});*/

weld.addBeanClasses(ProducingBean.class, DeliveryTagInterceptor.class);

new MapBasedConfig()
.put("mp.messaging.outgoing.sink.exchange.name", exchangeName)
.put("mp.messaging.outgoing.sink.exchange.declare", false)
.put("mp.messaging.outgoing.sink.default-routing-key", routingKey)
.put("mp.messaging.outgoing.sink.publish-confirms", true)
.put("mp.messaging.outgoing.sink.retry-on-fail-attempts", 0)
.put("mp.messaging.outgoing.sink.connector", RabbitMQConnector.CONNECTOR_NAME)
.put("mp.messaging.outgoing.sink.host", host)
.put("mp.messaging.outgoing.sink.port", port)
.put("mp.messaging.outgoing.sink.tracing.enabled", false)
.put("rabbitmq-username", username)
.put("rabbitmq-password", password)
.write();

container = weld.initialize();
await().until(() -> isRabbitMQConnectorAvailable(container));

DeliveryTagInterceptor interceptor = get(container, DeliveryTagInterceptor.class);
await().until(() ->
interceptor.numberOfProcessedMessage() == 10);



assertThat(interceptor.getDeliveryTags())
.hasSizeBetween(1,2);

assertThat(interceptor.getDeliveryTagsNack())
.hasSizeBetween(8,9);
}




/**
* Verifies that messages can be sent to RabbitMQ.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import org.jboss.logging.Logger;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -157,6 +160,40 @@ public void consume(String exchange, String routingKey,
});
}

/**
* Use the supplied function to asynchronously consume messages from a queue.
*
* @param exchange the exchange
* @param routingKey the routing key
*/
public void prepareNackQueue(String exchange, String routingKey) {
final String queue = "tempConsumeMessagesNack";
// Start by the machinery to receive the messages
client.startAndAwait();
client.exchangeDeclareAndAwait(exchange, "topic", false, true);

JsonObject config = new JsonObject();
config.put("x-max-length", 1);
config.put("x-overflow", "reject-publish");

queueDeclareAndAwait(queue, false, false, true, config);

client.queueBindAndAwait(queue, exchange, routingKey);


}

public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive, boolean autoDelete, JsonObject config) {
return (com.rabbitmq.client.AMQP.Queue.DeclareOk) queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely();
}

@CheckReturnValue
public io.smallrye.mutiny.Uni<com.rabbitmq.client.AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,JsonObject config) {
return io.smallrye.mutiny.vertx.AsyncResultUni.toUni(resultHandler -> {
client.getDelegate().queueDeclare(queue, durable, exclusive, autoDelete, config, resultHandler);
});
}

public void consumeIntegers(String exchange, String routingKey, Consumer<Integer> consumer) {
final String queue = "tempConsumeIntegers";
// Start by the machinery to receive the messages
Expand Down

0 comments on commit a8f7ea0

Please sign in to comment.