Skip to content

Commit

Permalink
[ISSUE #8957] Remove excess traffic and fix cache inconsistencies (#8958
Browse files Browse the repository at this point in the history
)
  • Loading branch information
3424672656 authored Dec 25, 2024
1 parent 35a6426 commit d63373a
Showing 1 changed file with 1 addition and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
Expand All @@ -60,12 +59,8 @@ public abstract class RebalanceImpl {
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
private static final int TIMEOUT_CHECK_TIMES = 3;
private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;

private Map<String, String> topicBrokerRebalance = new ConcurrentHashMap<>();
private Map<String, String> topicClientRebalance = new ConcurrentHashMap<>();

public RebalanceImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory) {
Expand Down Expand Up @@ -241,7 +236,7 @@ public boolean doRebalance(final boolean isOrder) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
if (!clientRebalance(topic)) {
boolean result = this.getRebalanceResultFromBroker(topic, isOrder);
if (!result) {
balanced = false;
Expand All @@ -266,38 +261,6 @@ public boolean doRebalance(final boolean isOrder) {
return balanced;
}

private boolean tryQueryAssignment(String topic) {
if (topicClientRebalance.containsKey(topic)) {
return false;
}

if (topicBrokerRebalance.containsKey(topic)) {
return true;
}
String strategyName = allocateMessageQueueStrategy != null ? allocateMessageQueueStrategy.getName() : null;
int retryTimes = 0;
while (retryTimes++ < TIMEOUT_CHECK_TIMES) {
try {
Set<MessageQueueAssignment> resultSet = mQClientFactory.queryAssignment(topic, consumerGroup,
strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / TIMEOUT_CHECK_TIMES * retryTimes);
topicBrokerRebalance.put(topic, topic);
return true;
} catch (Throwable t) {
if (!(t instanceof RemotingTimeoutException)) {
log.error("tryQueryAssignment error.", t);
topicClientRebalance.put(topic, topic);
return false;
}
}
}
if (retryTimes >= TIMEOUT_CHECK_TIMES) {
// if never success before and timeout exceed TIMEOUT_CHECK_TIMES, force client rebalance
topicClientRebalance.put(topic, topic);
return false;
}
return true;
}

public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
Expand Down Expand Up @@ -460,20 +423,6 @@ private void truncateMessageQueueNotMyTopic() {
}
}
}

Iterator<Map.Entry<String, String>> clientIter = topicClientRebalance.entrySet().iterator();
while (clientIter.hasNext()) {
if (!subTable.containsKey(clientIter.next().getKey())) {
clientIter.remove();
}
}

Iterator<Map.Entry<String, String>> brokerIter = topicBrokerRebalance.entrySet().iterator();
while (brokerIter.hasNext()) {
if (!subTable.containsKey(brokerIter.next().getKey())) {
brokerIter.remove();
}
}
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
Expand Down

0 comments on commit d63373a

Please sign in to comment.