From 58dd5639c19970534c1e29bb2c305f6436cd85dc Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Fri, 31 Jan 2025 16:49:41 +0800 Subject: [PATCH 1/4] Variable modified in read method --- ...tStickyKeyDispatcherMultipleConsumers.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 8bddbde02c974..b5afdd4820f75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -439,12 +439,17 @@ private Map> filterAndGroupEntriesForDispatching(List new MutableInt(getAvailablePermits(k))); // a consumer was found for the sticky key hash and the entry can be dispatched - if (permits.intValue() > 0 - && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) { - // decrement the permits for the consumer - permits.decrement(); - // allow the entry to be dispatched - dispatchEntry = true; + if (permits.intValue() > 0) { + boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash); + if (!canDispatchEntry) { + if(blockedByHash != null){ + blockedByHash.setTrue(); + } + // decrement the permits for the consumer + permits.decrement(); + // allow the entry to be dispatched + dispatchEntry = true; + } } } } @@ -507,27 +512,18 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) { // checks if the entry can be dispatched to the consumer private boolean canDispatchEntry(Consumer consumer, Entry entry, - ReadType readType, int stickyKeyHash, - MutableBoolean blockedByHash) { + ReadType readType, int stickyKeyHash) { // If redeliveryMessages contains messages that correspond to the same hash as the entry to be dispatched // do not send those messages for order guarantee if (readType == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) { - if (blockedByHash != null) { - blockedByHash.setTrue(); - } return false; } - if (drainingHashesRequired) { // If the hash is draining, do not send the message if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) { - if (blockedByHash != null) { - blockedByHash.setTrue(); - } return false; } } - return true; } From 3af9bb216b524eaf2c9b2ff3d89d6227f32c4d8e Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Fri, 31 Jan 2025 19:25:23 +0800 Subject: [PATCH 2/4] Variable modified in read method --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index b5afdd4820f75..3373b796e44dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -442,7 +442,7 @@ private Map> filterAndGroupEntriesForDispatching(List 0) { boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash); if (!canDispatchEntry) { - if(blockedByHash != null){ + if (blockedByHash != null) { blockedByHash.setTrue(); } // decrement the permits for the consumer From 4e23af19eae01ce86ef009e4f9f2b8622a513659 Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Sun, 9 Feb 2025 11:21:00 +0800 Subject: [PATCH 3/4] Variable modified in read method --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 3373b796e44dd..374111267e362 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -441,14 +441,15 @@ private Map> filterAndGroupEntriesForDispatching(List 0) { boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash); - if (!canDispatchEntry) { - if (blockedByHash != null) { - blockedByHash.setTrue(); - } + if (canDispatchEntry) { // decrement the permits for the consumer permits.decrement(); // allow the entry to be dispatched dispatchEntry = true; + } else { + if (blockedByHash != null) { + blockedByHash.setTrue(); + } } } } From 67fb138a60edc823a90e7c4e25bf37508d4e83d6 Mon Sep 17 00:00:00 2001 From: account guanyue <690010051@qq.com> Date: Tue, 11 Feb 2025 00:32:07 +0800 Subject: [PATCH 4/4] Variable modified in read method --- ...rsistentStickyKeyDispatcherMultipleConsumers.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 374111267e362..9e92a2ab40dc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -424,7 +424,7 @@ private Map> filterAndGroupEntriesForDispatching(List> filterAndGroupEntriesForDispatching(List new MutableInt(getAvailablePermits(k))); @@ -446,10 +446,8 @@ private Map> filterAndGroupEntriesForDispatching(List> filterAndGroupEntriesForDispatching(List