Skip to content

Commit

Permalink
[fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when m…
Browse files Browse the repository at this point in the history
…axUnackedMessagesPerConsumer is 1
  • Loading branch information
summeriiii committed Jan 3, 2025
1 parent f199e88 commit b97df18
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
}

Expand Down Expand Up @@ -1078,6 +1079,11 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position)
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
return true;
}

public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
Expand All @@ -1087,7 +1093,6 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position)
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}

public PendingAcksMap getPendingAcks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,47 @@ public void testDuplicateAcknowledgement() throws Exception {
.get("sub-1").getUnackedMessages(), 0);
}

@Test
public void testBlockedConsumerOnUnackedMsgs() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1);

final String topicName = "persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-test")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.receiverQueueSize(0)
.subscribe();

producer.send("1".getBytes(StandardCharsets.UTF_8));
producer.send("2".getBytes(StandardCharsets.UTF_8));

// 1. receive message
Message<byte[]> message = consumer.receive();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
assertEquals(subscriptionStats.getUnackedMessages(), 1);
assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());

// 2、ack this message
consumer.acknowledge(message);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
assertEquals(subscriptionStats.getUnackedMessages(), 0);
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
}

@Test
public void testUnsubscribeNonDurableSub() throws Exception {
final String ns = "prop/ns-test";
Expand Down

0 comments on commit b97df18

Please sign in to comment.