Skip to content

Commit

Permalink
[fix][broker] PIP-379 Key_Shared implementation race condition causin…
Browse files Browse the repository at this point in the history
…g out-of-order message delivery (#23874)

(cherry picked from commit eaf9ac1)
  • Loading branch information
lhotari committed Jan 23, 2025
1 parent 75c9ed7 commit a5992e7
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,9 @@ public void cursorIsReset() {

protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) {
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Adding message to replay for {}:{} hash: {}", name, ledgerId, entryId, stickyKeyHash);
}
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -407,6 +409,8 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
Set<Consumer> blockedByHashConsumers = lookAheadAllowed && readType == ReadType.Normal ? new HashSet<>() : null;
// in replay read mode, keep track of consumers for entries, used for look-ahead check
Set<Consumer> consumersForEntriesForLookaheadCheck = lookAheadAllowed ? new HashSet<>() : null;
// track already blocked hashes to block any further messages with the same hash
IntSet alreadyBlockedHashes = new IntOpenHashSet();

for (Entry inputEntry : entries) {
EntryAndMetadata entry;
Expand All @@ -419,24 +423,29 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1));
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = selector.select(stickyKeyHash);
Consumer consumer = null;
MutableBoolean blockedByHash = null;
boolean dispatchEntry = false;
if (consumer != null) {
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(consumer)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
// check if the hash is already blocked
boolean hashIsAlreadyBlocked = alreadyBlockedHashes.contains(stickyKeyHash);
if (!hashIsAlreadyBlocked) {
consumer = selector.select(stickyKeyHash);
if (consumer != null) {
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
}
}
}
if (dispatchEntry) {
Expand All @@ -445,6 +454,10 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
entriesGroupedByConsumer.computeIfAbsent(consumer, k -> new ArrayList<>());
consumerEntries.add(entry);
} else {
if (!hashIsAlreadyBlocked) {
// the hash is blocked, add it to the set of blocked hashes
alreadyBlockedHashes.add(stickyKeyHash);
}
if (blockedByHash != null && blockedByHash.isTrue()) {
// the entry is blocked by hash, add the consumer to the blocked set
blockedByHashConsumers.add(consumer);
Expand Down Expand Up @@ -536,6 +549,9 @@ private class ReplayPositionFilter implements Predicate<Position> {
// tracks the available permits for each consumer for the duration of the filter usage
// the filter is stateful and shouldn't be shared or reused later
private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap<>();
// tracks the hashes that have been blocked during the filtering
// it is necessary to block all later messages after a hash gets blocked so that ordering is preserved
private final Set<Long> alreadyBlockedHashes = new HashSet<>();

@Override
public boolean test(Position position) {
Expand All @@ -553,25 +569,34 @@ public boolean test(Position position) {
}
return true;
}
// check if the hash is already blocked, if so, then replaying of the position should be skipped
// to preserve ordering
if (alreadyBlockedHashes.contains(stickyKeyHash)) {
return false;
}

// find the consumer for the sticky key hash
Consumer consumer = selector.select(stickyKeyHash.intValue());
// skip replaying the message position if there's no assigned consumer
if (consumer == null) {
alreadyBlockedHashes.add(stickyKeyHash);
return false;
}

// lookup the available permits for the consumer
MutableInt availablePermits =
availablePermitsMap.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(consumer)));
// skip replaying the message position if the consumer has no available permits
if (availablePermits.intValue() <= 0) {
alreadyBlockedHashes.add(stickyKeyHash);
return false;
}

if (drainingHashesRequired
&& drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash.intValue())) {
// the hash is draining and the consumer is not the draining consumer
alreadyBlockedHashes.add(stickyKeyHash);
return false;
}

Expand Down
Loading

0 comments on commit a5992e7

Please sign in to comment.