From 7bad2e05ba392b7228b198f0838f98d76b1458d3 Mon Sep 17 00:00:00 2001 From: zhouxiang Date: Fri, 27 Oct 2023 11:47:46 +0800 Subject: [PATCH 1/5] Optimize updateTopicRouteInfoFromNameServer * Throw MQClientException for updateTopicRouteInfoFromNameServer * Add compatible updateTopicRouteInfoFromNameServerWithoutException method --- .../consumer/DefaultLitePullConsumerImpl.java | 2 +- .../consumer/DefaultMQPushConsumerImpl.java | 2 +- .../client/impl/factory/MQClientInstance.java | 33 +++++++++++-------- .../impl/producer/DefaultMQProducerImpl.java | 4 +-- .../client/trace/AsyncTraceDispatcher.java | 2 +- .../consumer/DefaultLitePullConsumerTest.java | 2 +- ...efaultMQLitePullConsumerWithTraceTest.java | 3 +- 7 files changed, 27 insertions(+), 21 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 20ca4770086..31d30995933 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -473,7 +473,7 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() { if (subTable != null) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + this.mQClientFactory.updateTopicRouteInfoFromNameServerWithoutException(topic); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e57579321cb..1230fe7b0df 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -1223,7 +1223,7 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() { if (subTable != null) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + this.mQClientFactory.updateTopicRouteInfoFromNameServerWithoutException(topic); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index ba72a6dce77..759fd871080 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -372,7 +372,7 @@ public String getClientId() { return clientId; } - public void updateTopicRouteInfoFromNameServer() { + public void updateTopicRouteInfoFromNameServer() throws MQClientException { Set topicList = new HashSet<>(); // Consumer @@ -402,7 +402,7 @@ public void updateTopicRouteInfoFromNameServer() { } for (String topic : topicList) { - this.updateTopicRouteInfoFromNameServer(topic); + this.updateTopicRouteInfoFromNameServerWithoutException(topic); } } @@ -562,10 +562,18 @@ public void adjustThreadPool() { } } - public boolean updateTopicRouteInfoFromNameServer(final String topic) { + public boolean updateTopicRouteInfoFromNameServer(final String topic) throws MQClientException { return updateTopicRouteInfoFromNameServer(topic, false, null); } + public boolean updateTopicRouteInfoFromNameServerWithoutException(final String topic) { + try { + return updateTopicRouteInfoFromNameServer(topic, false, null); + } catch (MQClientException e) { + return false; + } + } + private boolean isBrokerAddrExistInTopicRouteTable(final String addr) { for (Entry entry : this.topicRouteTable.entrySet()) { TopicRouteData topicRouteData = entry.getValue(); @@ -711,7 +719,7 @@ private void sendHeartbeatToAllBrokerV2(boolean isRebalance) { } public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, - DefaultMQProducer defaultMQProducer) { + DefaultMQProducer defaultMQProducer) throws MQClientException { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { @@ -785,6 +793,7 @@ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean is if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } + throw e; } catch (RemotingException e) { log.error("updateTopicRouteInfoFromNameServer Exception", e); throw new IllegalStateException(e); @@ -1128,7 +1137,11 @@ private int findBrokerVersion(String brokerName, String brokerAddr) { public List findConsumerIdList(final String topic, final String group) { String brokerAddr = this.findBrokerAddrByTopic(topic); if (null == brokerAddr) { - this.updateTopicRouteInfoFromNameServer(topic); + try { + this.updateTopicRouteInfoFromNameServer(topic); + } catch (MQClientException e) { + return null; + } brokerAddr = this.findBrokerAddrByTopic(topic); } @@ -1145,7 +1158,7 @@ public List findConsumerIdList(final String topic, final String group) { public Set queryAssignment(final String topic, final String consumerGroup, final String strategyName, final MessageModel messageModel, int timeout) - throws RemotingException, InterruptedException, MQBrokerException { + throws RemotingException, InterruptedException, MQBrokerException, MQClientException { String brokerAddr = this.findBrokerAddrByTopic(topic); if (null == brokerAddr) { this.updateTopicRouteInfoFromNameServer(topic); @@ -1323,12 +1336,4 @@ public ClientConfig getClientConfig() { return clientConfig; } - public TopicRouteData queryTopicRouteData(String topic) { - TopicRouteData data = this.getAnExistTopicRouteData(topic); - if (data == null) { - this.updateTopicRouteInfoFromNameServer(topic); - data = this.getAnExistTopicRouteData(topic); - } - return data; - } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index b0c212e46b6..340e7732946 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -843,7 +843,7 @@ private SendResult sendDefaultImpl( null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } - private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { + private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) throws MQClientException { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); @@ -1722,7 +1722,7 @@ private void requestFail(final String correlationId) { } } - private void prepareSendRequest(final Message msg, long timeout) { + private void prepareSendRequest(final Message msg, long timeout) throws MQClientException { String correlationId = CorrelationIdUtil.createCorrelationId(); String requestClientId = this.getMqClientFactory().getClientId(); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index ea423b71766..1be8edaf1f5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -438,7 +438,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { } } - private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) { + private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) throws MQClientException { Set brokerSet = new HashSet<>(); TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 24e39f56689..88dfe1694e1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -889,7 +889,7 @@ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, P } private void suppressUpdateTopicRouteInfoFromNameServer( - DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException { + DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException, MQClientException { if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { litePullConsumer.changeInstanceNameToPID(); } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index e0573bdfb0b..803c41a8ad2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQAdminImpl; @@ -305,7 +306,7 @@ private SendResult createSendResult(SendStatus sendStatus) { return sendResult; } - private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException { + private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException, MQClientException { DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true); if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { litePullConsumer.changeInstanceNameToPID(); From 6dbdcef8fdc9ef42ac90f51e3475eb762bc83f5c Mon Sep 17 00:00:00 2001 From: zhouxiang Date: Fri, 27 Oct 2023 15:15:12 +0800 Subject: [PATCH 2/5] fix unit test --- .../rocketmq/client/consumer/DefaultLitePullConsumerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 88dfe1694e1..17f354b42ba 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -896,6 +896,5 @@ private void suppressUpdateTopicRouteInfoFromNameServer( ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory); - doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); } } From 033690e199e7a80daf52a15438f95bf6e5cfb580 Mon Sep 17 00:00:00 2001 From: zhouxiang Date: Fri, 27 Oct 2023 15:34:53 +0800 Subject: [PATCH 3/5] fix --- .../apache/rocketmq/client/impl/factory/MQClientInstance.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 759fd871080..f44ce90b8d8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -372,7 +372,7 @@ public String getClientId() { return clientId; } - public void updateTopicRouteInfoFromNameServer() throws MQClientException { + public void updateTopicRouteInfoFromNameServer() { Set topicList = new HashSet<>(); // Consumer From c10b5d315d0f8e6030a72541aefc3295f49743bf Mon Sep 17 00:00:00 2001 From: zhouxiang Date: Fri, 27 Oct 2023 17:25:29 +0800 Subject: [PATCH 4/5] fix --- .../rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +- .../rocketmq/client/impl/factory/MQClientInstance.java | 8 ++------ .../client/impl/consumer/RebalancePushImplTest.java | 6 +++--- .../trace/DefaultMQLitePullConsumerWithTraceTest.java | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 97d9460f827..b05e333cfb4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -296,7 +296,7 @@ public ConcurrentMap getSubscriptionInner() { return subscriptionInner; } - private boolean rebalanceByTopic(final String topic, final boolean isOrder) { + private boolean rebalanceByTopic(final String topic, final boolean isOrder) throws MQClientException { boolean balanced = true; switch (messageModel) { case BROADCASTING: { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f44ce90b8d8..042585f6672 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1134,14 +1134,10 @@ private int findBrokerVersion(String brokerName, String brokerAddr) { return 0; } - public List findConsumerIdList(final String topic, final String group) { + public List findConsumerIdList(final String topic, final String group) throws MQClientException { String brokerAddr = this.findBrokerAddrByTopic(topic); if (null == brokerAddr) { - try { - this.updateTopicRouteInfoFromNameServer(topic); - } catch (MQClientException e) { - return null; - } + this.updateTopicRouteInfoFromNameServer(topic); brokerAddr = this.findBrokerAddrByTopic(topic); } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java index f55b5869e56..d5832ab8b95 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -70,7 +70,7 @@ public RebalancePushImplTest() { } @Test - public void testMessageQueueChanged_CountThreshold() { + public void testMessageQueueChanged_CountThreshold() throws MQClientException { RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING, new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer); init(rebalancePush); @@ -100,7 +100,7 @@ private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory); - doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServerWithoutException(anyString()); } } From 01b16c9b3be73f5ce22e3f3d9e536e3ef2e579e3 Mon Sep 17 00:00:00 2001 From: zhouxiang Date: Fri, 27 Oct 2023 19:25:58 +0800 Subject: [PATCH 5/5] fix --- .../rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 6283abd6b2f..082f8af8389 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -147,7 +147,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, pushConsumer.changeInstanceNameToPID(); mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); - doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServerWithoutException(anyString()); rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");