From 1c35adb3dc3824ef39f39d1375cf224428fdc4fb Mon Sep 17 00:00:00 2001
From: dingshuangxi888 <dingshuangxi888@gmail.com>
Date: Wed, 25 Dec 2024 17:57:47 +0800
Subject: [PATCH] [ISSUE #9075]Avoid message type validate in message sync
 scenario. (#9076)

* Avoid message type validate in message sync scenario.
---
 .../apache/rocketmq/common/message/Message.java    |  7 +++++++
 .../proxy/processor/ProducerProcessor.java         |  6 +++++-
 .../remoting/activity/SendMessageActivity.java     | 14 ++++++++++----
 3 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index c7997c47318..acd4df96d28 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -108,6 +108,13 @@ public String getProperty(final String name) {
         return this.properties.get(name);
     }
 
+    public boolean hasProperty(final String name) {
+        if (null == this.properties) {
+            return false;
+        }
+        return this.properties.containsKey(name);
+    }
+
     public String getTopic() {
         return topic;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 43e16ddd2d7..17a2f27fa74 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -74,7 +74,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
         try {
             Message message = messageList.get(0);
             String topic = message.getTopic();
-            if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+            if (isNeedCheckTopicMessageType(message)) {
                 if (topicMessageTypeValidator != null) {
                     // Do not check retry or dlq topic
                     if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
@@ -261,4 +261,8 @@ public CompletableFuture<RemotingCommand> forwardMessageToDeadLetterQueue(ProxyC
         return FutureUtils.addExecutor(future, this.executor);
     }
 
+    private boolean isNeedCheckTopicMessageType(Message message) {
+        return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
+            && !message.hasProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
+    }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 17af0fdcb37..22d9efd9347 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -21,17 +21,18 @@
 import java.time.Duration;
 import java.util.Map;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
-import org.apache.rocketmq.remoting.protocol.RequestCode;
-import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
 import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 
 public class SendMessageActivity extends AbstractRemotingActivity {
     TopicMessageTypeValidator topicMessageTypeValidator;
@@ -66,7 +67,7 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand
         String topic = requestHeader.getTopic();
         Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties());
         TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property);
-        if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+        if (isNeedCheckTopicMessageType(property)) {
             if (topicMessageTypeValidator != null) {
                 // Do not check retry or dlq topic
                 if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
@@ -87,4 +88,9 @@ protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, Remotin
         ProxyContext context) throws Exception {
         return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
     }
+
+    private boolean isNeedCheckTopicMessageType(Map<String, String> property) {
+        return ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()
+            && !property.containsKey(MessageConst.PROPERTY_TRANSFER_FLAG);
+    }
 }