Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Refactor a private method to eliminate an unnecessary parameter #23915

Merged
merged 4 commits into from
Feb 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = null;
MutableBoolean blockedByHash = null;
boolean blockedByHash = false;
boolean dispatchEntry = false;
// check if the hash is already blocked
boolean hashIsAlreadyBlocked = alreadyBlockedHashes.contains(stickyKeyHash);
Expand All @@ -434,17 +434,21 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
final var canUpdateBlockedByHash = lookAheadAllowed && readType == ReadType.Normal;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
if (permits.intValue() > 0) {
boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash);
if (canDispatchEntry) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
} else if (canUpdateBlockedByHash) {
blockedByHash = true;
}
}
}
}
Expand All @@ -458,7 +462,7 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// the hash is blocked, add it to the set of blocked hashes
alreadyBlockedHashes.add(stickyKeyHash);
}
if (blockedByHash != null && blockedByHash.isTrue()) {
if (blockedByHash) {
// the entry is blocked by hash, add the consumer to the blocked set
blockedByHashConsumers.add(consumer);
}
Expand Down Expand Up @@ -507,27 +511,18 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {

// checks if the entry can be dispatched to the consumer
private boolean canDispatchEntry(Consumer consumer, Entry entry,
ReadType readType, int stickyKeyHash,
MutableBoolean blockedByHash) {
ReadType readType, int stickyKeyHash) {
// If redeliveryMessages contains messages that correspond to the same hash as the entry to be dispatched
// do not send those messages for order guarantee
if (readType == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}

if (drainingHashesRequired) {
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}
}

return true;
}

Expand Down
Loading