-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix out-of-order issues with ConsistentHashingStickyKeyConsumerSelector #23327
base: master
Are you sure you want to change the base?
[fix][broker] Fix out-of-order issues with ConsistentHashingStickyKeyConsumerSelector #23327
Conversation
ac7fdeb
to
cc78aec
Compare
I renamed this to "reduce" out-of-order issues since I'm not sure if the current model in this PR prevents the case that a consumer gets swapped when an unrelated consumer leaves. It seems that another hash ring layer is needed for the colliding consumers. That would prevent unnecessary changes. A hash ring inside a hash ring... |
cc78aec
to
6a1be71
Compare
04c751b
to
12577d5
Compare
b28a199
to
cdf3490
Compare
527e431
to
e5eb2ef
Compare
06db6e3
to
bae30bc
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23327 +/- ##
============================================
+ Coverage 73.57% 74.57% +1.00%
- Complexity 32624 33940 +1316
============================================
Files 1877 1934 +57
Lines 139502 145015 +5513
Branches 15299 15846 +547
============================================
+ Hits 102638 108149 +5511
+ Misses 28908 28598 -310
- Partials 7956 8268 +312
Flags with carried forward coverage won't be shown. Click here to find out more.
|
...c/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
Show resolved
Hide resolved
if (selectedConsumerEntry == null || consumerEntry.compareTo(selectedConsumerEntry) < 0) { | ||
// if the new consumer has a higher priority or lower selection count | ||
// than the currently selected consumer, select the new consumer | ||
changeSelectedConsumerEntry(consumerEntry); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a braking change in behavior when hashes collide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it's a breaking change. The previous behavior is already broken.
To fix the problem there needs to be a solution where the consumer is only changed when a new consumer is added or when the currently "selected" consumer is removed.
In the previous solution, all consumers for a colliding hash were active at the same time. The consumer was selected by the input hash on this line:
Line 127 in 4f96146
return consumerList.get(hash % consumerList.size()); |
That was a wrong solution that was broken.
One small detail what I made in the sorting order was that I made it also consider the priority of the consumer. That's been ignored before, but I think it has been a mistake before. Since the previous behavior has been broken, we aren't introducing a breaking change when we fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After your comment, I debugged your code, and I noticed that my understanding was incorrect.
My understanding at the time was, "If a new consumer is added, the range will always move to a new one."
Now, I understand as follows. Is it correct?
"When the selected count was greater than new consumer, then the consumer's range is moved to a new one."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After your comment, I debugged your code, and I noticed that my understanding was incorrect. My understanding at the time was, "If a new consumer is added, the range will always move to a new one."
Now, I understand as follows. Is it correct? "When the selected count was greater than new consumer, then the consumer's range is moved to a new one."
@equanz In the previous solution consumerList.get(hash % consumerList.size())
could result in a different consumer any time consumers are added or removed. Since the hash
passed as the parameter is the hash calculated for the message key, it also depends on the input message. It could also result in the same consumer as before, so it doesn't mean that is always switched, however the probability increases when there are more consumers since hitting a different consumer has (n-1)/n chance.
} | ||
|
||
public Consumer getSelectedConsumer() { | ||
return selectedConsumerEntry != null ? selectedConsumerEntry.consumer() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue's occurring condition is that multiple consumers have the same hash( it always occurs when the consumer names are the same ), right?
This PR breaks the original behavior( the original behavior contains an issue ), which may cause a consumer not to receive any messages if users register consumers with the same name.
How about rewrite the logic as this?
- Register
consumer-a1
- hash:
h1
- hash:
- Register
consumer-a2
- hash:
h1
- hash:
- We add an extra int value of
consumer-a2
(such as the socket-port or a sequence number), so its hash is not equalconsumer-a1
- Mark the final
hash
forconsumer-a2
in memory, so it will be the same when it reconnects. - Remove the cache when the socket closes.
- Remove the cache after
consumer-a2
offline for a long time( such as5min
)
- Mark the final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR breaks the original behavior( the original behavior contains an issue ), which may cause a consumer not to receive any messages if users register consumers with the same name.
The hash ring will contain 100 entries and for each entry, the selected consumer will be evenly distributed. That's how it's already covered. Most of the complexity in this PR about addressing that issue that all registered consumers with the same name would get evenly distributed.
This applies when there are less than 100 consumers. This can be configured by increasing subscriptionKeySharedConsistentHashingReplicaPoints
if that's not sufficient. However, users could simply use unique consumer names if there are more than subscriptionKeySharedConsistentHashingReplicaPoints
consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have not answered my questions, such as follows
The issue's occurring condition is that multiple consumers have the same hash( it always occurs when the consumer names are the same ), right?
This PR breaks the original behavior( the original behavior contains an issue ), which may cause a consumer not to receive any messages if users register consumers with the same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have not answered my questions, such as follows
I'll retry. 😁
The issue's occurring condition is that multiple consumers have the same hash( it always occurs when the consumer names are the same ), right?
Yes.
This PR breaks the original behavior( the original behavior contains an issue ), which may cause a consumer not to receive any messages if users register consumers with the same name.
No. I explained in the previous comment.
Fixes #23315
Fixes #23321
Motivation
See #23315 and #23321. This PR fixes a bug in ConsistentHashingStickyKeyConsumerSelector where the selected consumer for a hash could change across 2 existing consumers when a consumer was removed or when a new consumer was added.
In addition it fixes the problem that ConsistentHashingStickyKeyConsumerSelector.getConsumerKeyHashRanges() returned invalid information. These are fixed together since they are in the same code area and fixing getConsumerKeyHashRanges is required for testing the consistent selection of a consumer when there are hash collisions.
Modifications
In PR #8396 a solution was added to fix a problem where all hash ring points were assigned to the last arrived consumer when there were hash collisions, for example when the consumers had the same name:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
Line 127 in 4f96146
That solution causes problems since it causes the bug #23315. A different solution is needed to distribute the consumers evenly when there are hash collisions.
The solution in this PR is to keep track how many times a specific consumer is "selected" as a consumer in the hash ring. When a consumer is added, it will replace the previously selected consumer if the sorting order is less than the existing consumer. The sorting order takes into account the consumer priority, customer name and the selection count.
When a consumer is removed and the consumer was selected, a new consumer is selected after sorting the consumers. If the consumer wasn't previously selected, it will be only removed from the consumers for that hash ring entry.
This solution ensures that consumers aren't switching across two existing consumers and that the consumers are evenly balanced when there are hash collision, for example when the consumers have the same name.
Documentation
doc
doc-required
doc-not-needed
doc-complete