From d63373a152ebd395cdce6a2e04e01b62e54c76af Mon Sep 17 00:00:00 2001 From: hqbfz <125714719+3424672656@users.noreply.github.com> Date: Wed, 25 Dec 2024 14:38:57 +0800 Subject: [PATCH] [ISSUE #8957] Remove excess traffic and fix cache inconsistencies (#8958) --- .../client/impl/consumer/RebalanceImpl.java | 53 +------------------ 1 file changed, 1 insertion(+), 52 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index d1f0d116e05..b6f1d99b1c7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -36,7 +36,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; import org.apache.rocketmq.common.message.MessageRequestMode; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; @@ -60,12 +59,8 @@ public abstract class RebalanceImpl { protected MessageModel messageModel; protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory; - private static final int TIMEOUT_CHECK_TIMES = 3; private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000; - private Map topicBrokerRebalance = new ConcurrentHashMap<>(); - private Map topicClientRebalance = new ConcurrentHashMap<>(); - public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory) { @@ -241,7 +236,7 @@ public boolean doRebalance(final boolean isOrder) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { - if (!clientRebalance(topic) && tryQueryAssignment(topic)) { + if (!clientRebalance(topic)) { boolean result = this.getRebalanceResultFromBroker(topic, isOrder); if (!result) { balanced = false; @@ -266,38 +261,6 @@ public boolean doRebalance(final boolean isOrder) { return balanced; } - private boolean tryQueryAssignment(String topic) { - if (topicClientRebalance.containsKey(topic)) { - return false; - } - - if (topicBrokerRebalance.containsKey(topic)) { - return true; - } - String strategyName = allocateMessageQueueStrategy != null ? allocateMessageQueueStrategy.getName() : null; - int retryTimes = 0; - while (retryTimes++ < TIMEOUT_CHECK_TIMES) { - try { - Set resultSet = mQClientFactory.queryAssignment(topic, consumerGroup, - strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / TIMEOUT_CHECK_TIMES * retryTimes); - topicBrokerRebalance.put(topic, topic); - return true; - } catch (Throwable t) { - if (!(t instanceof RemotingTimeoutException)) { - log.error("tryQueryAssignment error.", t); - topicClientRebalance.put(topic, topic); - return false; - } - } - } - if (retryTimes >= TIMEOUT_CHECK_TIMES) { - // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, force client rebalance - topicClientRebalance.put(topic, topic); - return false; - } - return true; - } - public ConcurrentMap getSubscriptionInner() { return subscriptionInner; } @@ -460,20 +423,6 @@ private void truncateMessageQueueNotMyTopic() { } } } - - Iterator> clientIter = topicClientRebalance.entrySet().iterator(); - while (clientIter.hasNext()) { - if (!subTable.containsKey(clientIter.next().getKey())) { - clientIter.remove(); - } - } - - Iterator> brokerIter = topicBrokerRebalance.entrySet().iterator(); - while (brokerIter.hasNext()) { - if (!subTable.containsKey(brokerIter.next().getKey())) { - brokerIter.remove(); - } - } } private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,