Skip to content

Commit

Permalink
Merge pull request #2741 from ozangunalp/mqtt_buffer_fix
Browse files Browse the repository at this point in the history
Fix mqtt buffer-size which is no longer applied correctly
  • Loading branch information
ozangunalp authored Aug 29, 2024
2 parents 233e114 + 971b48d commit 88148be
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,

this.source = holder.stream()
.select().where(m -> MqttTopicHelper.matches(topic, pattern, m))
.onOverflow().buffer(config.getBufferSize())
.emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c))
.onItem().transform(m -> new ReceivingMqttMessage(m, onNack))
.stage(multi -> {
Expand All @@ -78,7 +79,6 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,

return multi;
})
.onOverflow().buffer(config.getBufferSize())
.onCancellation().call(() -> {
alive.set(false);
if (config.getUnsubscribeOnDisconnection())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import io.smallrye.common.annotation.CheckReturnValue;
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.ConfigProvider;
Expand Down Expand Up @@ -476,15 +475,15 @@ static class DeliveryTagInterceptor implements OutgoingInterceptor {

List<Long> deliveryTagsNack = new CopyOnWriteArrayList<>();


@Override
public void onMessageAck(Message<?> message) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTags.add((long) m.getResult()));
}

@Override
public void onMessageNack(Message<?> message, Throwable failure) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue()));
message.getMetadata(OutgoingMessageMetadata.class)
.ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue()));
}

public List<Long> getDeliveryTags() {
Expand All @@ -495,7 +494,7 @@ public List<Long> getDeliveryTagsNack() {
return deliveryTagsNack;
}

public int numberOfProcessedMessage(){
public int numberOfProcessedMessage() {
return deliveryTags.size() + deliveryTagsNack.size();
}
}
Expand All @@ -511,10 +510,12 @@ void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedEx

List<Long> receivedTags = new CopyOnWriteArrayList<>();
CountDownLatch latch = new CountDownLatch(10);
usage.prepareNackQueue(exchangeName, routingKey);/*, v -> {
receivedTags.add(v.envelope().getDeliveryTag());
latch.countDown();
});*/
usage.prepareNackQueue(exchangeName, routingKey);/*
* , v -> {
* receivedTags.add(v.envelope().getDeliveryTag());
* latch.countDown();
* });
*/

weld.addBeanClasses(ProducingBean.class, DeliveryTagInterceptor.class);

Expand All @@ -536,21 +537,15 @@ void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedEx
await().until(() -> isRabbitMQConnectorAvailable(container));

DeliveryTagInterceptor interceptor = get(container, DeliveryTagInterceptor.class);
await().until(() ->
interceptor.numberOfProcessedMessage() == 10);


await().until(() -> interceptor.numberOfProcessedMessage() == 10);

assertThat(interceptor.getDeliveryTags())
.hasSizeBetween(1,2);
.hasSizeBetween(1, 2);

assertThat(interceptor.getDeliveryTagsNack())
.hasSizeBetween(8,9);
.hasSizeBetween(8, 9);
}




/**
* Verifies that messages can be sent to RabbitMQ.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import org.jboss.logging.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;

import io.smallrye.common.annotation.CheckReturnValue;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -180,15 +178,16 @@ public void prepareNackQueue(String exchange, String routingKey) {

client.queueBindAndAwait(queue, exchange, routingKey);


}

public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive, boolean autoDelete, JsonObject config) {
return (com.rabbitmq.client.AMQP.Queue.DeclareOk) queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely();
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive,
boolean autoDelete, JsonObject config) {
return queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely();
}

@CheckReturnValue
public io.smallrye.mutiny.Uni<com.rabbitmq.client.AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,JsonObject config) {
public io.smallrye.mutiny.Uni<com.rabbitmq.client.AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable,
boolean exclusive, boolean autoDelete, JsonObject config) {
return io.smallrye.mutiny.vertx.AsyncResultUni.toUni(resultHandler -> {
client.getDelegate().queueDeclare(queue, durable, exclusive, autoDelete, config, resultHandler);
});
Expand Down

0 comments on commit 88148be

Please sign in to comment.