From b97df187d9bb1849d32d9d80ff8529cb28946807 Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Tue, 31 Dec 2024 14:18:32 +0800 Subject: [PATCH] [fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when maxUnackedMessagesPerConsumer is 1 --- .../pulsar/broker/service/Consumer.java | 7 +++- .../broker/service/BrokerServiceTest.java | 41 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0baba13cc0b07..d770e09ec46c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -595,6 +595,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map // consumer can start again consuming messages int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); @@ -1087,7 +1093,6 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; flowConsumerBlockedPermits(ackOwnedConsumer); } - return true; } public PendingAcksMap getPendingAcks() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 89727014be99e..fa76fdd5bf45c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1790,6 +1790,47 @@ public void testDuplicateAcknowledgement() throws Exception { .get("sub-1").getUnackedMessages(), 0); } + @Test + public void testBlockedConsumerOnUnackedMsgs() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1); + + final String topicName = "persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs"; + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-test") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .receiverQueueSize(0) + .subscribe(); + + producer.send("1".getBytes(StandardCharsets.UTF_8)); + producer.send("2".getBytes(StandardCharsets.UTF_8)); + + // 1. receive message + Message message = consumer.receive(); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test"); + assertEquals(subscriptionStats.getUnackedMessages(), 1); + assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + + // 2、ack this message + consumer.acknowledge(message); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test"); + assertEquals(subscriptionStats.getUnackedMessages(), 0); + assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + } + @Test public void testUnsubscribeNonDurableSub() throws Exception { final String ns = "prop/ns-test";