Skip to content

Commit

Permalink
feat: support to send trace for batchSend
Browse files Browse the repository at this point in the history
  • Loading branch information
cserwen authored and dengzhiwen1 committed Dec 20, 2023
1 parent a376fbc commit dc252f8
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.rocketmq.client.trace.hook;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
Expand All @@ -26,6 +29,9 @@
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageType;

public class SendMessageTraceHookImpl implements SendMessageHook {

Expand All @@ -48,19 +54,35 @@ public void sendMessageBefore(SendMessageContext context) {
}
//build the context content of TraceContext
TraceContext traceContext = new TraceContext();
traceContext.setTraceBeans(new ArrayList<>(1));
Message message = context.getMessage();
List<TraceBean> traceBeans;
if (message instanceof MessageBatch) {
MessageBatch messageBatch = (MessageBatch) message;
traceBeans = new ArrayList<>(messageBatch.getBatchCount());
for (Message batch : messageBatch) {
traceBeans.add(buildBeforeTraceBean(batch, context.getMsgType(), context.getBrokerAddr()));
}
} else {
traceBeans = new ArrayList<>(1);
traceBeans.add(buildBeforeTraceBean(message, context.getMsgType(), context.getBrokerAddr()));
}

traceContext.setTraceBeans(traceBeans);
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.Pub);
traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
}

public TraceBean buildBeforeTraceBean(Message message, MessageType msgType, String brokerAddr) {
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
traceContext.getTraceBeans().add(traceBean);
traceBean.setTopic(NamespaceUtil.withoutNamespace(message.getTopic()));
traceBean.setTags(message.getTags());
traceBean.setKeys(message.getKeys());
traceBean.setStoreHost(brokerAddr);
traceBean.setBodyLength(message.getBody().length);
traceBean.setMsgType(msgType);
return traceBean;
}

@Override
Expand All @@ -75,24 +97,38 @@ public void sendMessageAfter(SendMessageContext context) {
}

if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
|| !context.getSendResult().isTraceOn()
|| StringUtils.isEmpty(context.getSendResult().getMsgId())
|| StringUtils.isEmpty(context.getSendResult().getOffsetMsgId())) {
// if switch is false,skip it
return;
}

TraceContext traceContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = traceContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - traceContext.getTimeStamp()) / traceContext.getTraceBeans().size());
traceContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
traceContext.setSuccess(true);
} else {
traceContext.setSuccess(false);
String[] uniqMsgIds = context.getSendResult().getMsgId().split(",");
String[] offsetMsgIds = context.getSendResult().getOffsetMsgId().split(",");
if (uniqMsgIds.length != traceContext.getTraceBeans().size() || offsetMsgIds.length != traceContext.getTraceBeans().size()) {
return;
}
int costTime = (int) (System.currentTimeMillis() - traceContext.getTimeStamp());
for (int i = 0; i < traceContext.getTraceBeans().size(); i++) {
// build traceBean
TraceBean traceBean = traceContext.getTraceBeans().get(i);
traceBean.setMsgId(uniqMsgIds[i]);
traceBean.setOffsetMsgId(offsetMsgIds[i]);
traceBean.setStoreTime(traceContext.getTimeStamp() + costTime / 2);

// build traceContext
TraceContext tmpContext = new TraceContext();
tmpContext.setTraceType(traceContext.getTraceType());
tmpContext.setRegionId(context.getSendResult().getRegionId());
tmpContext.setGroupName(traceContext.getGroupName());
tmpContext.setCostTime(costTime);
tmpContext.setSuccess(context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK));
tmpContext.setContextCode(traceContext.getContextCode());
tmpContext.setTraceBeans(new ArrayList<>(1));
tmpContext.getTraceBeans().add(traceBean);
localDispatcher.append(tmpContext);
}
traceContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(traceContext.getTimeStamp() + costTime / 2);
localDispatcher.append(traceContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.trace.hook;

import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class SendMessageTraceHookImplTest {

private static final String TOPIC = "TopicTest";
private static final String TAGS = "tags";
private static final String KEYS = "keys";
private static final String BODY = "bodyLength";
private static final String BROKER = "127.0.0.1:10911";
private static final String GROUP = "producer";

private static final String MSG_ID = "msgId";
private static final String OFFSET_MSG_ID = "offsetMsgId";
private static final String REGION_ID = "testRegion";

private SendMessageTraceHookImpl sendMessageTraceHook;
private TestAsyncTraceDispatcher traceDispatcher;

private static class TestAsyncTraceDispatcher extends AsyncTraceDispatcher {

private BlockingDeque<TraceContext> stashQueue = new LinkedBlockingDeque<>(1024);

public TestAsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
super(group, type, traceTopicName, rpcHook);
}

@Override
public boolean append(Object ctx) {
return stashQueue.offer((TraceContext) ctx);
}

public BlockingDeque<TraceContext> getStashQueue() {
return stashQueue;
}
}

@Before
public void init() {
traceDispatcher = new TestAsyncTraceDispatcher(null, TraceDispatcher.Type.PRODUCE, TopicValidator.RMQ_SYS_TRACE_TOPIC, null);
sendMessageTraceHook = new SendMessageTraceHookImpl(traceDispatcher);
}

@Test
public void testBuildBeforeTraceBean() {
TraceBean traceBean = sendMessageTraceHook.buildBeforeTraceBean(buildTestMessage(), MessageType.Normal_Msg, BROKER);
Assert.assertEquals(traceBean.getTopic(), TOPIC);
Assert.assertEquals(traceBean.getTags(), TAGS);
Assert.assertEquals(traceBean.getKeys(), KEYS);
Assert.assertEquals(traceBean.getBodyLength(), BODY.length());
Assert.assertEquals(traceBean.getMsgType(), MessageType.Normal_Msg);
Assert.assertEquals(traceBean.getStoreHost(), BROKER);
}

@Test
public void testSendSingleMessageTrace() throws InterruptedException {
SendMessageContext context = new SendMessageContext();
context.setMessage(buildTestMessage());
context.setMsgType(MessageType.Normal_Msg);
context.setBrokerAddr(BROKER);
context.setProducerGroup(GROUP);

sendMessageTraceHook.sendMessageBefore(context);
Thread.sleep(100);
context.setSendResult(buildTestResult());
sendMessageTraceHook.sendMessageAfter(context);

Assert.assertEquals(traceDispatcher.getStashQueue().size(), 1);
TraceContext traceContext = traceDispatcher.getStashQueue().poll();
Assert.assertNotNull(traceContext);
Assert.assertNotNull(traceContext.getTraceBeans());
Assert.assertEquals(traceContext.getTraceBeans().size(), 1);
Assert.assertEquals(traceContext.getTraceType(), TraceType.Pub);
Assert.assertEquals(traceContext.getRegionId(), REGION_ID);
Assert.assertEquals(traceContext.getGroupName(), GROUP);
Assert.assertTrue(traceContext.getCostTime() >= 100 && traceContext.getCostTime() < 200);
Assert.assertTrue(traceContext.isSuccess());
Assert.assertEquals(traceContext.getTraceBeans().size(), 1);

TraceBean traceBean = traceContext.getTraceBeans().get(0);
Assert.assertEquals(traceBean.getTopic(), TOPIC);
Assert.assertEquals(traceBean.getTags(), TAGS);
Assert.assertEquals(traceBean.getKeys(), KEYS);
Assert.assertEquals(traceBean.getBodyLength(), BODY.length());
Assert.assertEquals(traceBean.getMsgType(), MessageType.Normal_Msg);
Assert.assertEquals(traceBean.getStoreHost(), BROKER);
}

@Test
public void testSendBatchMessageTrace() throws InterruptedException {
int batchSize = 10;
SendMessageContext context = new SendMessageContext();
context.setMessage(buildTestBatchMessage(batchSize));
context.setMsgType(MessageType.Normal_Msg);
context.setBrokerAddr(BROKER);
context.setProducerGroup(GROUP);

sendMessageTraceHook.sendMessageBefore(context);
Thread.sleep(100);
context.setSendResult(buildTestBatchResult(batchSize));
sendMessageTraceHook.sendMessageAfter(context);

Assert.assertEquals(traceDispatcher.getStashQueue().size(), batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext traceContext = traceDispatcher.getStashQueue().poll();
Assert.assertNotNull(traceContext);
Assert.assertNotNull(traceContext.getTraceBeans());
Assert.assertEquals(traceContext.getTraceBeans().size(), 1);
Assert.assertEquals(traceContext.getTraceType(), TraceType.Pub);
Assert.assertEquals(traceContext.getRegionId(), REGION_ID);
Assert.assertEquals(traceContext.getGroupName(), GROUP);
Assert.assertTrue(traceContext.getCostTime() >= 100 && traceContext.getCostTime() < 200);
Assert.assertTrue(traceContext.isSuccess());
Assert.assertEquals(traceContext.getTraceBeans().size(), 1);

TraceBean traceBean = traceContext.getTraceBeans().get(0);
Assert.assertEquals(traceBean.getTopic(), TOPIC);
Assert.assertEquals(traceBean.getTags(), TAGS + i);
Assert.assertEquals(traceBean.getKeys(), KEYS + i);
Assert.assertEquals(traceBean.getBodyLength(), (BODY + i).length());
Assert.assertEquals(traceBean.getMsgType(), MessageType.Normal_Msg);
Assert.assertEquals(traceBean.getStoreHost(), BROKER);
}
}

private Message buildTestMessage() {
Message message = new Message();
message.setTopic(TOPIC);
message.setTags(TAGS);
message.setKeys(KEYS);
message.setBody(BODY.getBytes(StandardCharsets.UTF_8));
return message;
}

private SendResult buildTestResult() {
SendResult sendResult = new SendResult();
sendResult.setMsgId(MSG_ID);
sendResult.setOffsetMsgId(OFFSET_MSG_ID);
sendResult.setSendStatus(SendStatus.SEND_OK);
sendResult.setRegionId(REGION_ID);
return sendResult;
}

private Message buildTestBatchMessage(int batchSize) {
List<Message> messages = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
Message message = new Message();
message.setTopic(TOPIC);
message.setTags(TAGS + i);
message.setKeys(KEYS + i);
message.setBody((BODY + i).getBytes(StandardCharsets.UTF_8));
messages.add(message);
}
return MessageBatch.generateFromList(messages);
}

private SendResult buildTestBatchResult(int batchSize) {
StringBuilder batchMsgId = new StringBuilder();
StringBuilder batchOffsetMsgId = new StringBuilder();
for (int i = 0; i < batchSize; i++) {
batchMsgId.append(MSG_ID);
batchOffsetMsgId.append(OFFSET_MSG_ID);
if (i < batchSize - 1) {
batchMsgId.append(",");
batchOffsetMsgId.append(",");
}
}

SendResult sendResult = new SendResult();
sendResult.setMsgId(batchMsgId.toString());
sendResult.setOffsetMsgId(batchOffsetMsgId.toString());
sendResult.setSendStatus(SendStatus.SEND_OK);
sendResult.setRegionId(REGION_ID);
return sendResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,8 @@ public static MessageBatch generateFromList(Collection<? extends Message> messag
return messageBatch;
}

public int getBatchCount() {
return this.messages.size();
}

}

0 comments on commit dc252f8

Please sign in to comment.