diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 6c62b0e123c..d3a3565f340 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6075](https://github.com/seata/seata/pull/6075)] fix missing table alias for on update column of image SQL ### optimize: +- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration - [[#6031](https://github.com/seata/seata/pull/6031)] add a check for the existence of the undolog table ### security: @@ -21,6 +22,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) +- [yiqi](https://github.com/PleaseGiveMeTheCoke) - [ptyin](https://github.com/ptyin) - [laywin](https://github.com/laywin) - [imcmai](https://github.com/imcmai) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 45a59e183fd..32dcc3dce6c 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -9,8 +9,10 @@ - [[#6075](https://github.com/seata/seata/pull/6075)] 修复镜像SQL对于on update列没有添加表别名的问题 ### optimize: +- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长 - [[#6031](https://github.com/seata/seata/pull/6031)] 添加undo_log表的存在性校验 + ### security: - [[#6069](https://github.com/seata/seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞 @@ -21,6 +23,7 @@ - [slievrly](https://github.com/slievrly) +- [yiqi](https://github.com/PleaseGiveMeTheCoke) - [ptyin](https://github.com/ptyin) - [laywin](https://github.com/laywin) - [imcmai](https://github.com/imcmai) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java index a4bb348aae1..ca7576e0f0e 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable { /** * The Is sending. */ - protected volatile boolean isSending = false; + protected static volatile boolean isSending = false; private String group = "DEFAULT"; /** diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 9ef4925cd1a..1e052d27af8 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -17,6 +17,8 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -77,14 +79,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private static final String MSG_ID_PREFIX = "msgId:"; private static final String FUTURES_PREFIX = "futures:"; private static final String SINGLE_LOG_POSTFIX = ";"; - private static final int MAX_MERGE_SEND_MILLS = 1; - private static final String THREAD_PREFIX_SPLIT_CHAR = "_"; + private static final int MAX_MERGE_SEND_MILLS = 10; private static final int MAX_MERGE_SEND_THREAD = 1; private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE; private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L; private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L; - private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend"; - protected final Object mergeLock = new Object(); + private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend"; + protected static final Object MERGE_LOCK = new Object(); /** * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap. @@ -96,11 +97,11 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable} * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()} */ - protected final ConcurrentHashMap> basketMap = new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap>> BASKET_MAP = new ConcurrentHashMap<>(); private final NettyClientBootstrap clientBootstrap; private final NettyClientChannelManager clientChannelManager; private final NettyPoolKey.TransactionRole transactionRole; - private ExecutorService mergeSendExecutorService; + private static volatile ExecutorService mergeSendExecutorService; private TransactionMessageHandler transactionMessageHandler; protected volatile boolean enableClientBatchSendRequest; @@ -108,17 +109,27 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting public void init() { timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); if (this.isEnableClientBatchSendRequest()) { - mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, - MAX_MERGE_SEND_THREAD, - KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); - mergeSendExecutorService.submit(new MergedSendRunnable()); + startMergeSendThread(); } super.init(); clientBootstrap.start(); } + private void startMergeSendThread() { + if (mergeSendExecutorService == null) { + synchronized (AbstractNettyRemoting.class) { + if (mergeSendExecutorService == null) { + mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, + MAX_MERGE_SEND_THREAD, + KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD)); + mergeSendExecutorService.submit(new MergedSendRunnable()); + } + } + } + } + public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) { super(messageExecutor); @@ -146,8 +157,14 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap - BlockingQueue basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress, - key -> new LinkedBlockingQueue<>()); + Map> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress, + key -> { + Map> map = new HashMap<>(2); + map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>()); + map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>()); + return map; + }); + BlockingQueue basket = roleMessage.get(transactionRole); if (!basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", serverAddress, rpcMessage); @@ -157,8 +174,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { LOGGER.debug("offer message: {}", rpcMessage.getBody()); } if (!isSending) { - synchronized (mergeLock) { - mergeLock.notifyAll(); + synchronized (MERGE_LOCK) { + MERGE_LOCK.notifyAll(); } } @@ -291,10 +308,6 @@ protected String getXid(Object msg) { return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid; } - private String getThreadPrefix() { - return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name(); - } - /** * Get pool key function. * @@ -326,23 +339,36 @@ private String getThreadPrefix() { /** * The type Merged send runnable. */ - private class MergedSendRunnable implements Runnable { + private static class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { - synchronized (mergeLock) { - try { - mergeLock.wait(MAX_MERGE_SEND_MILLS); - } catch (InterruptedException e) { + if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) { + synchronized (MERGE_LOCK) { + if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) { + try { + MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS); + } catch (InterruptedException e) { + } + } } } isSending = true; - basketMap.forEach((address, basket) -> { + BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> { if (basket.isEmpty()) { return; } + AbstractNettyRemotingClient client; + if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) { + client = RmNettyRemotingClient.getInstance(); + } else { + client = TmNettyRemotingClient.getInstance(); + } + + ConcurrentHashMap clientFutures = client.getFutures(); + MergedWarpMessage mergeMessage = new MergedWarpMessage(); while (!basket.isEmpty()) { RpcMessage msg = basket.poll(); @@ -350,35 +376,35 @@ public void run() { mergeMessage.msgIds.add(msg.getId()); } if (mergeMessage.msgIds.size() > 1) { - printMergeMessageLog(mergeMessage); + printMergeMessageLog(clientFutures, mergeMessage); } Channel sendChannel = null; try { // send batch message is sync request, but there is no need to get the return value. // Since the messageFuture has been created before the message is placed in basketMap, // the return value will be obtained in ClientOnResponseProcessor. - sendChannel = clientChannelManager.acquireChannel(address); - AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage); + sendChannel = client.getClientChannelManager().acquireChannel(address); + client.sendAsyncRequest(sendChannel, mergeMessage); } catch (FrameworkException e) { if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) { - destroyChannel(address, sendChannel); + client.destroyChannel(address, sendChannel); } // fast fail for (Integer msgId : mergeMessage.msgIds) { - MessageFuture messageFuture = futures.remove(msgId); + MessageFuture messageFuture = clientFutures.remove(msgId); if (messageFuture != null) { messageFuture.setResultMessage( - new RuntimeException(String.format("%s is unreachable", address), e)); + new RuntimeException(String.format("%s is unreachable", address), e)); } } LOGGER.error("client merge call failed: {}", e.getMessage(), e); } - }); + })); isSending = false; } } - private void printMergeMessageLog(MergedWarpMessage mergeMessage) { + private void printMergeMessageLog(ConcurrentHashMap clientFutures, MergedWarpMessage mergeMessage) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size()); for (AbstractMessage cm : mergeMessage.msgs) { @@ -389,7 +415,7 @@ private void printMergeMessageLog(MergedWarpMessage mergeMessage) { sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX); } sb.append("\n"); - for (long l : futures.keySet()) { + for (long l : clientFutures.keySet()) { sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX); } LOGGER.debug(sb.toString());