diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0baba13cc0b07..d770e09ec46c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -595,6 +595,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map // consumer can start again consuming messages int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); @@ -1087,7 +1093,6 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; flowConsumerBlockedPermits(ackOwnedConsumer); } - return true; } public PendingAcksMap getPendingAcks() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 89727014be99e..fa76fdd5bf45c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1790,6 +1790,47 @@ public void testDuplicateAcknowledgement() throws Exception { .get("sub-1").getUnackedMessages(), 0); } + @Test + public void testBlockedConsumerOnUnackedMsgs() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1); + + final String topicName = "persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs"; + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-test") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .receiverQueueSize(0) + .subscribe(); + + producer.send("1".getBytes(StandardCharsets.UTF_8)); + producer.send("2".getBytes(StandardCharsets.UTF_8)); + + // 1. receive message + Message message = consumer.receive(); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test"); + assertEquals(subscriptionStats.getUnackedMessages(), 1); + assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + + // 2、ack this message + consumer.acknowledge(message); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test"); + assertEquals(subscriptionStats.getUnackedMessages(), 0); + assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + } + @Test public void testUnsubscribeNonDurableSub() throws Exception { final String ns = "prop/ns-test";