Skip to content

Commit

Permalink
[fix][broker] Fix race condition in unacked message updating and disp…
Browse files Browse the repository at this point in the history
…atcher blocking
  • Loading branch information
lhotari committed Jun 5, 2024
1 parent 342d88d commit 6806e33
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3338,7 +3338,7 @@ public void checkUnAckMessageDispatching() {
} else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) {
// unblock broker-dispatching if received enough acked messages back
if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) {
unblockDispatchersOnUnAckMessages(blockedDispatchers.values());
unblockDispatchersOnUnAckMessages(blockedDispatchers.values(), true);
}
}

Expand Down Expand Up @@ -3376,13 +3376,17 @@ private void blockDispatchersWithLargeUnAckMessages() {
* Unblocks the dispatchers and removes it from the {@link #blockedDispatchers} list.
*
* @param dispatcherList
* @param allowReadMore
*/
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList,
boolean allowReadMore) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
if (allowReadMore) {
executor().execute(() -> dispatcher.readMoreEntries());
}
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
+ " for consumerId: {}; avgMessagesPerEntry is {}",
topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
}
incrementUnackedMessages(unackedMessages);
addAndGetUnAckedMsgs(this, unackedMessages);
Future<Void> writeAndFlushPromise =
cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
Expand All @@ -412,14 +412,6 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
return writeAndFlushPromise;
}

private void incrementUnackedMessages(int unackedMessages) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, unackedMessages) >= getMaxUnackedMessages()
&& getMaxUnackedMessages() > 0) {
blockedConsumerOnUnackedMsgs = true;
}
}

public boolean isWritable() {
return cnx.isWritable();
}
Expand Down Expand Up @@ -793,10 +785,6 @@ public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
this.lastConsumedFlowTimestamp = System.currentTimeMillis();

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
Expand Down Expand Up @@ -879,16 +867,6 @@ public boolean checkAndApplyTopicMigration() {
}
return false;
}
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link this#getMaxUnackedMessages()} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0;
}

public void updateRates() {
msgOut.calculateRate();
Expand Down Expand Up @@ -1044,15 +1022,6 @@ private boolean removePendingAcks(PositionImpl position) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}
return false;
Expand All @@ -1068,7 +1037,7 @@ public int getPriorityLevel() {

public void redeliverUnacknowledgedMessages(long consumerEpoch) {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs();
UNACKED_MESSAGES_UPDATER.set(this, 0);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
Expand Down Expand Up @@ -1143,10 +1112,24 @@ public Subscription getSubscription() {
}

private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
int unackedMsgs = 0;
if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) {
subscription.addUnAckedMessages(ackedMessages);
unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
if (!isPersistentTopic || !Subscription.isIndividualAckMode(subType)) {
return 0;
}
subscription.addUnAckedMessages(ackedMessages);
int unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
int maxUnackedMessages = getMaxUnackedMessages();
if (maxUnackedMessages > 0) {
if (ackedMessages < 0) {
if (unackedMsgs <= maxUnackedMessages / 2 && blockedConsumerOnUnackedMsgs) {
blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(this);
}
} else if (ackedMessages > 0) {
// block shared consumer when unacked-messages reaches limit
if (unackedMsgs >= getMaxUnackedMessages()) {
blockedConsumerOnUnackedMsgs = true;
}
}
}
if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) {
negativeUnackedMsgsTimestamp = System.currentTimeMillis();
Expand All @@ -1155,11 +1138,6 @@ private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
return unackedMsgs;
}

private void clearUnAckedMsgs() {
int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
subscription.addUnAckedMessages(-unaAckedMsgs);
}

public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -1005,6 +1004,7 @@ public boolean isConsumerAvailable(Consumer consumer) {

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
doAddUnAckedMessages(-totalUnackedMessages, false);
consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
redeliveryTracker.incrementAndGetRedeliveryCount((PositionImpl.get(ledgerId, entryId)));
Expand All @@ -1019,6 +1019,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
doAddUnAckedMessages(-totalUnackedMessages, false);
positions.forEach(position -> {
// TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages
// on Key_Shared subscription, but it's difficult to get the sticky key here
Expand All @@ -1034,12 +1035,18 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List

@Override
public void addUnAckedMessages(int numberOfMessages) {
doAddUnAckedMessages(numberOfMessages, true);
}

private void doAddUnAckedMessages(int numberOfMessages, boolean allowReadMore) {
int maxUnackedMessages = topic.getMaxUnackedMessagesOnSubscription();
// don't block dispatching if maxUnackedMessages = 0
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
readMoreEntriesAsync();
if (allowReadMore) {
readMoreEntriesAsync();
}
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand All @@ -1055,14 +1062,16 @@ public void addUnAckedMessages(int numberOfMessages) {
if (totalUnackedMessages < (topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2)) {
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// it removes dispatcher from blocked list and unblocks dispatcher by scheduling read
topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(this));
topic.getBrokerService().unblockDispatchersOnUnAckMessages(List.of(this), allowReadMore);
}
}
} else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages < maxUnackedMessages / 2) {
} else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages <= maxUnackedMessages / 2) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.debug("[{}] Dispatcher is unblocked", name);
readMoreEntriesAsync();
if (allowReadMore) {
readMoreEntriesAsync();
}
}
}
// increment broker-level count
Expand Down
Loading

0 comments on commit 6806e33

Please sign in to comment.