Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: the high CPU usage issue of the rpcMergeMessageSend thread #6061

Merged
merged 15 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR

### optimize:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR
- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration

### security:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR
Expand All @@ -21,6 +21,6 @@ Thanks to these contributors for their code commits. Please report an unintended

<!-- Please make sure your Github ID is in the list below -->
- [slievrly](https://github.com/slievrly)

- [yiqi](https://github.com/PleaseGiveMeTheCoke)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 2 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述

### optimize:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述
- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长

### security:
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述
Expand All @@ -21,5 +21,6 @@

<!-- 请确保您的 GitHub ID 在以下列表中 -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable {
/**
* The Is sending.
*/
protected volatile boolean isSending = false;
protected static volatile boolean isSending = false;
private String group = "DEFAULT";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -77,14 +79,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
private static final String MSG_ID_PREFIX = "msgId:";
private static final String FUTURES_PREFIX = "futures:";
private static final String SINGLE_LOG_POSTFIX = ";";
private static final int MAX_MERGE_SEND_MILLS = 1;
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private static final int MAX_MERGE_SEND_MILLS = 10;
private static final int MAX_MERGE_SEND_THREAD = 1;
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
protected final Object mergeLock = new Object();
private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend";
protected static final Object MERGE_LOCK = new Object();

/**
* When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
Expand All @@ -96,29 +97,39 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
* Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable}
* {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()}
*/
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<String/*serverAddress*/, Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>>> BASKET_MAP = new ConcurrentHashMap<>();
private final NettyClientBootstrap clientBootstrap;
private final NettyClientChannelManager clientChannelManager;
private final NettyPoolKey.TransactionRole transactionRole;
private ExecutorService mergeSendExecutorService;
private static volatile ExecutorService mergeSendExecutorService;
private TransactionMessageHandler transactionMessageHandler;
protected volatile boolean enableClientBatchSendRequest;

@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
startMergeSendThread();
}
super.init();
clientBootstrap.start();
}

private void startMergeSendThread() {
if (mergeSendExecutorService == null) {
synchronized (AbstractNettyRemoting.class) {
if (mergeSendExecutorService == null) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
}
}
}

public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
Expand Down Expand Up @@ -146,8 +157,14 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
futures.put(rpcMessage.getId(), messageFuture);

// put message into basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress,
key -> {
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> map = new HashMap<>(2);
map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>());
map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>());
return map;
});
BlockingQueue<RpcMessage> basket = roleMessage.get(transactionRole);
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
Expand All @@ -157,8 +174,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
synchronized (MERGE_LOCK) {
MERGE_LOCK.notifyAll();
}
}

Expand Down Expand Up @@ -291,10 +308,6 @@ protected String getXid(Object msg) {
return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
}

private String getThreadPrefix() {
return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

/**
* Get pool key function.
*
Expand Down Expand Up @@ -326,59 +339,72 @@ private String getThreadPrefix() {
/**
* The type Merged send runnable.
*/
private class MergedSendRunnable implements Runnable {
private static class MergedSendRunnable implements Runnable {

@Override
public void run() {
while (true) {
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
synchronized (MERGE_LOCK) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
try {
MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
}
}
isSending = true;
basketMap.forEach((address, basket) -> {
BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> {
if (basket.isEmpty()) {
return;
}

AbstractNettyRemotingClient client;
if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) {
client = RmNettyRemotingClient.getInstance();
} else {
client = TmNettyRemotingClient.getInstance();
}

ConcurrentHashMap<Integer, MessageFuture> clientFutures = client.getFutures();

MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
printMergeMessageLog(clientFutures, mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
sendChannel = client.getClientChannelManager().acquireChannel(address);
client.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
client.destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
MessageFuture messageFuture = clientFutures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
}));
isSending = false;
}
}

private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clientFutures, MergedWarpMessage mergeMessage) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
for (AbstractMessage cm : mergeMessage.msgs) {
Expand All @@ -389,7 +415,7 @@ private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
sb.append("\n");
for (long l : futures.keySet()) {
for (long l : clientFutures.keySet()) {
sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
LOGGER.debug(sb.toString());
Expand Down