From 971b48d2eeb0419e81751ad6418c0ecde265087a Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 29 Aug 2024 13:30:57 +0200 Subject: [PATCH] Fix mqtt buffer-size which is no longer applied correctly --- .../reactive/messaging/mqtt/MqttSource.java | 2 +- .../messaging/rabbitmq/RabbitMQTest.java | 29 ++++++++----------- .../messaging/rabbitmq/RabbitMQUsage.java | 13 ++++----- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index aa9d3121a..fb4c33bfe 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -70,6 +70,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, this.source = holder.stream() .select().where(m -> MqttTopicHelper.matches(topic, pattern, m)) + .onOverflow().buffer(config.getBufferSize()) .emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c)) .onItem().transform(m -> new ReceivingMqttMessage(m, onNack)) .stage(multi -> { @@ -78,7 +79,6 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, return multi; }) - .onOverflow().buffer(config.getBufferSize()) .onCancellation().call(() -> { alive.set(false); if (config.getUnsubscribeOnDisconnection()) diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index b4379bfe2..b79c77c99 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -11,7 +11,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import io.smallrye.common.annotation.CheckReturnValue; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.config.ConfigProvider; @@ -476,7 +475,6 @@ static class DeliveryTagInterceptor implements OutgoingInterceptor { List deliveryTagsNack = new CopyOnWriteArrayList<>(); - @Override public void onMessageAck(Message message) { message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTags.add((long) m.getResult())); @@ -484,7 +482,8 @@ public void onMessageAck(Message message) { @Override public void onMessageNack(Message message, Throwable failure) { - message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue())); + message.getMetadata(OutgoingMessageMetadata.class) + .ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue())); } public List getDeliveryTags() { @@ -495,7 +494,7 @@ public List getDeliveryTagsNack() { return deliveryTagsNack; } - public int numberOfProcessedMessage(){ + public int numberOfProcessedMessage() { return deliveryTags.size() + deliveryTagsNack.size(); } } @@ -511,10 +510,12 @@ void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedEx List receivedTags = new CopyOnWriteArrayList<>(); CountDownLatch latch = new CountDownLatch(10); - usage.prepareNackQueue(exchangeName, routingKey);/*, v -> { - receivedTags.add(v.envelope().getDeliveryTag()); - latch.countDown(); - });*/ + usage.prepareNackQueue(exchangeName, routingKey);/* + * , v -> { + * receivedTags.add(v.envelope().getDeliveryTag()); + * latch.countDown(); + * }); + */ weld.addBeanClasses(ProducingBean.class, DeliveryTagInterceptor.class); @@ -536,21 +537,15 @@ void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedEx await().until(() -> isRabbitMQConnectorAvailable(container)); DeliveryTagInterceptor interceptor = get(container, DeliveryTagInterceptor.class); - await().until(() -> - interceptor.numberOfProcessedMessage() == 10); - - + await().until(() -> interceptor.numberOfProcessedMessage() == 10); assertThat(interceptor.getDeliveryTags()) - .hasSizeBetween(1,2); + .hasSizeBetween(1, 2); assertThat(interceptor.getDeliveryTagsNack()) - .hasSizeBetween(8,9); + .hasSizeBetween(8, 9); } - - - /** * Verifies that messages can be sent to RabbitMQ. * diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java index e467886ec..d98fa691f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQUsage.java @@ -25,14 +25,12 @@ import java.util.function.Consumer; import java.util.function.Supplier; -import io.smallrye.common.annotation.CheckReturnValue; -import io.smallrye.mutiny.Uni; -import io.vertx.core.Future; import org.jboss.logging.Logger; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BasicProperties; +import io.smallrye.common.annotation.CheckReturnValue; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.mutiny.core.Vertx; @@ -180,15 +178,16 @@ public void prepareNackQueue(String exchange, String routingKey) { client.queueBindAndAwait(queue, exchange, routingKey); - } - public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive, boolean autoDelete, JsonObject config) { - return (com.rabbitmq.client.AMQP.Queue.DeclareOk) queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely(); + public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive, + boolean autoDelete, JsonObject config) { + return queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely(); } @CheckReturnValue - public io.smallrye.mutiny.Uni queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,JsonObject config) { + public io.smallrye.mutiny.Uni queueDeclare(String queue, boolean durable, + boolean exclusive, boolean autoDelete, JsonObject config) { return io.smallrye.mutiny.vertx.AsyncResultUni.toUni(resultHandler -> { client.getDelegate().queueDeclare(queue, durable, exclusive, autoDelete, config, resultHandler); });