Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7509] Optimize updateTopicRouteInfoFromNameServer #7510

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.updateTopicRouteInfoFromNameServerWithoutException(topic);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.updateTopicRouteInfoFromNameServerWithoutException(topic);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}

private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
private boolean rebalanceByTopic(final String topic, final boolean isOrder) throws MQClientException {
boolean balanced = true;
switch (messageModel) {
case BROADCASTING: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void updateTopicRouteInfoFromNameServer() {
}

for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
this.updateTopicRouteInfoFromNameServerWithoutException(topic);
}
}

Expand Down Expand Up @@ -562,10 +562,18 @@ public void adjustThreadPool() {
}
}

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
public boolean updateTopicRouteInfoFromNameServer(final String topic) throws MQClientException {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}

public boolean updateTopicRouteInfoFromNameServerWithoutException(final String topic) {
try {
return updateTopicRouteInfoFromNameServer(topic, false, null);
} catch (MQClientException e) {
return false;
}
}

private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
for (Entry<String, TopicRouteData> entry : this.topicRouteTable.entrySet()) {
TopicRouteData topicRouteData = entry.getValue();
Expand Down Expand Up @@ -711,7 +719,7 @@ private void sendHeartbeatToAllBrokerV2(boolean isRebalance) {
}

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
DefaultMQProducer defaultMQProducer) throws MQClientException {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
Expand Down Expand Up @@ -785,6 +793,7 @@ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean is
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

先log再throw的写法可能会造成上层出现重复打印error的情况

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是为了尽可能和之前的行为保持一致,如果去掉的话,可能会导致原先打印的日志无法被打印出来。

} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
Expand Down Expand Up @@ -1125,7 +1134,7 @@ private int findBrokerVersion(String brokerName, String brokerAddr) {
return 0;
}

public List<String> findConsumerIdList(final String topic, final String group) {
public List<String> findConsumerIdList(final String topic, final String group) throws MQClientException {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
Expand All @@ -1145,7 +1154,7 @@ public List<String> findConsumerIdList(final String topic, final String group) {

public Set<MessageQueueAssignment> queryAssignment(final String topic, final String consumerGroup,
final String strategyName, final MessageModel messageModel, int timeout)
throws RemotingException, InterruptedException, MQBrokerException {
throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
Expand Down Expand Up @@ -1323,12 +1332,4 @@ public ClientConfig getClientConfig() {
return clientConfig;
}

public TopicRouteData queryTopicRouteData(String topic) {
TopicRouteData data = this.getAnExistTopicRouteData(topic);
if (data == null) {
this.updateTopicRouteInfoFromNameServer(topic);
data = this.getAnExistTopicRouteData(topic);
}
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ private SendResult sendDefaultImpl(
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) throws MQClientException {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
Expand Down Expand Up @@ -1722,7 +1722,7 @@ private void requestFail(final String correlationId) {
}
}

private void prepareSendRequest(final Message msg, long timeout) {
private void prepareSendRequest(final Message msg, long timeout) throws MQClientException {
String correlationId = CorrelationIdUtil.createCorrelationId();
String requestClientId = this.getMqClientFactory().getClientId();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
}
}

private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) throws MQClientException {
Set<String> brokerSet = new HashSet<>();
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,13 +889,12 @@ private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, P
}

private void suppressUpdateTopicRouteInfoFromNameServer(
DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException, MQClientException {
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
}

ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public RebalancePushImplTest() {
}

@Test
public void testMessageQueueChanged_CountThreshold() {
public void testMessageQueueChanged_CountThreshold() throws MQClientException {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
Expand Down Expand Up @@ -100,7 +100,7 @@ private void doRebalanceForcibly(RebalancePushImpl rebalancePush, Set<MessageQue
rebalancePush.messageQueueChanged(topic, allocateResultSet, allocateResultSet);
}

private void init(final RebalancePushImpl rebalancePush) {
private void init(final RebalancePushImpl rebalancePush) throws MQClientException {
rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());

rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
Expand All @@ -111,7 +111,7 @@ private void init(final RebalancePushImpl rebalancePush) {
}

@Test
public void testMessageQueueChanged_SizeThreshold() {
public void testMessageQueueChanged_SizeThreshold() throws MQClientException {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
init(rebalancePush);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
pushConsumer.changeInstanceNameToPID();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServerWithoutException(anyString());

rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
Expand Down Expand Up @@ -305,15 +306,15 @@ private SendResult createSendResult(SendStatus sendStatus) {
return sendResult;
}

private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException, MQClientException {
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
}
MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServerWithoutException(anyString());
}

}