From 4c791ff931a9836eb68565ebf8c9b4ed5c888015 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Mon, 20 Nov 2023 20:45:09 +0800 Subject: [PATCH 01/10] optimize: 1. no longer wait every 1ms, directly wait for 10ms or more, anyway, sendSyncRequest will wake up the thread when it comes in and puts the message into the queue.2. tm and rm threads are merged, no need for a separate thread. --- .../core/rpc/netty/AbstractNettyRemoting.java | 2 +- .../netty/AbstractNettyRemotingClient.java | 72 ++++++++++++------- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java index a4bb348aae1..ca7576e0f0e 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable { /** * The Is sending. */ - protected volatile boolean isSending = false; + protected static volatile boolean isSending = false; private String group = "DEFAULT"; /** diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 9ef4925cd1a..2c8ad061d7b 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -77,14 +77,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private static final String MSG_ID_PREFIX = "msgId:"; private static final String FUTURES_PREFIX = "futures:"; private static final String SINGLE_LOG_POSTFIX = ";"; - private static final int MAX_MERGE_SEND_MILLS = 1; - private static final String THREAD_PREFIX_SPLIT_CHAR = "_"; + private static final int MAX_MERGE_SEND_MILLS = 10; private static final int MAX_MERGE_SEND_THREAD = 1; private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE; private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L; private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L; - private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend"; - protected final Object mergeLock = new Object(); + private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend"; + protected static final Object mergeLock = new Object(); /** * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap. @@ -96,11 +95,11 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable} * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()} */ - protected final ConcurrentHashMap> basketMap = new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap>> basketMap = new ConcurrentHashMap<>(); private final NettyClientBootstrap clientBootstrap; private final NettyClientChannelManager clientChannelManager; private final NettyPoolKey.TransactionRole transactionRole; - private ExecutorService mergeSendExecutorService; + private volatile static ExecutorService mergeSendExecutorService; private TransactionMessageHandler transactionMessageHandler; protected volatile boolean enableClientBatchSendRequest; @@ -108,17 +107,27 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting public void init() { timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); if (this.isEnableClientBatchSendRequest()) { - mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, - MAX_MERGE_SEND_THREAD, - KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); - mergeSendExecutorService.submit(new MergedSendRunnable()); + startMergeSendThread(); } super.init(); clientBootstrap.start(); } + private void startMergeSendThread() { + if(mergeSendExecutorService == null) { + synchronized (AbstractNettyRemoting.class) { + if(mergeSendExecutorService == null) { + mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, + MAX_MERGE_SEND_THREAD, + KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD)); + mergeSendExecutorService.submit(new MergedSendRunnable()); + } + } + } + } + public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) { super(messageExecutor); @@ -146,8 +155,9 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap - BlockingQueue basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress, - key -> new LinkedBlockingQueue<>()); + Pair> roleMessage = CollectionUtils.computeIfAbsent(basketMap, serverAddress, + key -> new Pair<>(transactionRole, new LinkedBlockingQueue<>())); + BlockingQueue basket = roleMessage.getSecond(); if (!basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", serverAddress, rpcMessage); @@ -291,10 +301,6 @@ protected String getXid(Object msg) { return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid; } - private String getThreadPrefix() { - return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name(); - } - /** * Get pool key function. * @@ -326,7 +332,7 @@ private String getThreadPrefix() { /** * The type Merged send runnable. */ - private class MergedSendRunnable implements Runnable { + private static class MergedSendRunnable implements Runnable { @Override public void run() { @@ -338,11 +344,23 @@ public void run() { } } isSending = true; - basketMap.forEach((address, basket) -> { + basketMap.forEach((address, roleMessage) -> { + NettyPoolKey.TransactionRole messageRole = roleMessage.getFirst(); + + BlockingQueue basket = roleMessage.getSecond(); if (basket.isEmpty()) { return; } + AbstractNettyRemotingClient client = null; + if(messageRole.equals(NettyPoolKey.TransactionRole.RMROLE)){ + client = RmNettyRemotingClient.getInstance(); + }else{ + client = TmNettyRemotingClient.getInstance(); + } + + ConcurrentHashMap clientFutures = client.getFutures(); + MergedWarpMessage mergeMessage = new MergedWarpMessage(); while (!basket.isEmpty()) { RpcMessage msg = basket.poll(); @@ -350,22 +368,22 @@ public void run() { mergeMessage.msgIds.add(msg.getId()); } if (mergeMessage.msgIds.size() > 1) { - printMergeMessageLog(mergeMessage); + printMergeMessageLog(clientFutures, mergeMessage); } Channel sendChannel = null; try { // send batch message is sync request, but there is no need to get the return value. // Since the messageFuture has been created before the message is placed in basketMap, // the return value will be obtained in ClientOnResponseProcessor. - sendChannel = clientChannelManager.acquireChannel(address); - AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage); + sendChannel = client.getClientChannelManager().acquireChannel(address); + client.sendAsyncRequest(sendChannel, mergeMessage); } catch (FrameworkException e) { if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) { - destroyChannel(address, sendChannel); + client.destroyChannel(address, sendChannel); } // fast fail for (Integer msgId : mergeMessage.msgIds) { - MessageFuture messageFuture = futures.remove(msgId); + MessageFuture messageFuture = clientFutures.remove(msgId); if (messageFuture != null) { messageFuture.setResultMessage( new RuntimeException(String.format("%s is unreachable", address), e)); @@ -378,7 +396,7 @@ public void run() { } } - private void printMergeMessageLog(MergedWarpMessage mergeMessage) { + private void printMergeMessageLog(ConcurrentHashMap clientFutures, MergedWarpMessage mergeMessage) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size()); for (AbstractMessage cm : mergeMessage.msgs) { @@ -389,7 +407,7 @@ private void printMergeMessageLog(MergedWarpMessage mergeMessage) { sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX); } sb.append("\n"); - for (long l : futures.keySet()) { + for (long l : clientFutures.keySet()) { sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX); } LOGGER.debug(sb.toString()); From 941f665b30c854857884af428c29342f685f4325 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Mon, 20 Nov 2023 21:05:35 +0800 Subject: [PATCH 02/10] optimize code style --- .../netty/AbstractNettyRemotingClient.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2c8ad061d7b..6888f83ba94 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -83,7 +83,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L; private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L; private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend"; - protected static final Object mergeLock = new Object(); + protected static final Object MERGE_LOCK = new Object(); /** * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap. @@ -95,7 +95,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable} * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()} */ - protected static final ConcurrentHashMap>> basketMap = new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap>> BASKET_MAP = new ConcurrentHashMap<>(); private final NettyClientBootstrap clientBootstrap; private final NettyClientChannelManager clientChannelManager; private final NettyPoolKey.TransactionRole transactionRole; @@ -114,7 +114,7 @@ public void init() { } private void startMergeSendThread() { - if(mergeSendExecutorService == null) { + if (mergeSendExecutorService == null) { synchronized (AbstractNettyRemoting.class) { if(mergeSendExecutorService == null) { mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, @@ -155,8 +155,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap - Pair> roleMessage = CollectionUtils.computeIfAbsent(basketMap, serverAddress, - key -> new Pair<>(transactionRole, new LinkedBlockingQueue<>())); + Pair> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress, + key -> new Pair<>(transactionRole, new LinkedBlockingQueue<>())); BlockingQueue basket = roleMessage.getSecond(); if (!basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", @@ -167,8 +167,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { LOGGER.debug("offer message: {}", rpcMessage.getBody()); } if (!isSending) { - synchronized (mergeLock) { - mergeLock.notifyAll(); + synchronized (MERGE_LOCK) { + MERGE_LOCK.notifyAll(); } } @@ -337,14 +337,14 @@ private static class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { - synchronized (mergeLock) { + synchronized (MERGE_LOCK) { try { - mergeLock.wait(MAX_MERGE_SEND_MILLS); + MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS); } catch (InterruptedException e) { } } isSending = true; - basketMap.forEach((address, roleMessage) -> { + BASKET_MAP.forEach((address, roleMessage) -> { NettyPoolKey.TransactionRole messageRole = roleMessage.getFirst(); BlockingQueue basket = roleMessage.getSecond(); @@ -353,9 +353,9 @@ public void run() { } AbstractNettyRemotingClient client = null; - if(messageRole.equals(NettyPoolKey.TransactionRole.RMROLE)){ + if (messageRole.equals(NettyPoolKey.TransactionRole.RMROLE)) { client = RmNettyRemotingClient.getInstance(); - }else{ + } else { client = TmNettyRemotingClient.getInstance(); } From d7a1b7da4bbdd62d0f8fee608b2ef07dea1e446a Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Mon, 20 Nov 2023 21:12:40 +0800 Subject: [PATCH 03/10] optimize code style --- .../io/seata/core/rpc/netty/AbstractNettyRemotingClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 6888f83ba94..caf52327d69 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -116,7 +116,7 @@ public void init() { private void startMergeSendThread() { if (mergeSendExecutorService == null) { synchronized (AbstractNettyRemoting.class) { - if(mergeSendExecutorService == null) { + if (mergeSendExecutorService == null) { mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, From 7e8200cf9c0b5b82835d2439c5d271a094cfb0bc Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Mon, 20 Nov 2023 21:35:41 +0800 Subject: [PATCH 04/10] optimize code style --- .../netty/AbstractNettyRemotingClient.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index caf52327d69..138759a1386 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -17,6 +17,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -95,7 +96,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable} * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()} */ - protected static final ConcurrentHashMap>> BASKET_MAP = new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap>> BASKET_MAP = new ConcurrentHashMap<>(); private final NettyClientBootstrap clientBootstrap; private final NettyClientChannelManager clientChannelManager; private final NettyPoolKey.TransactionRole transactionRole; @@ -155,9 +156,14 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap - Pair> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress, - key -> new Pair<>(transactionRole, new LinkedBlockingQueue<>())); - BlockingQueue basket = roleMessage.getSecond(); + Map> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress, + key -> { + Map> map = new HashMap<>(2); + map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>()); + map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>()); + return map; + }); + BlockingQueue basket = roleMessage.get(transactionRole); if (!basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", serverAddress, rpcMessage); @@ -344,16 +350,13 @@ public void run() { } } isSending = true; - BASKET_MAP.forEach((address, roleMessage) -> { - NettyPoolKey.TransactionRole messageRole = roleMessage.getFirst(); - - BlockingQueue basket = roleMessage.getSecond(); + BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> { if (basket.isEmpty()) { return; } AbstractNettyRemotingClient client = null; - if (messageRole.equals(NettyPoolKey.TransactionRole.RMROLE)) { + if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) { client = RmNettyRemotingClient.getInstance(); } else { client = TmNettyRemotingClient.getInstance(); @@ -386,12 +389,12 @@ public void run() { MessageFuture messageFuture = clientFutures.remove(msgId); if (messageFuture != null) { messageFuture.setResultMessage( - new RuntimeException(String.format("%s is unreachable", address), e)); + new RuntimeException(String.format("%s is unreachable", address), e)); } } LOGGER.error("client merge call failed: {}", e.getMessage(), e); } - }); + })); isSending = false; } } From 9aa5b40d1b29d1c3fae28d44d2fc0f97cdbf411c Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 21 Nov 2023 11:27:36 +0800 Subject: [PATCH 05/10] 1. add pr and author information 2. optimize code style --- changes/en-us/2.0.0.md | 2 ++ changes/zh-cn/2.0.0.md | 3 +++ .../io/seata/core/rpc/netty/AbstractNettyRemotingClient.java | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/changes/en-us/2.0.0.md b/changes/en-us/2.0.0.md index 6768f6311cb..e718e9a487d 100644 --- a/changes/en-us/2.0.0.md +++ b/changes/en-us/2.0.0.md @@ -155,6 +155,7 @@ The version is updated as follows: - [[#5959](https://github.com/seata/seata/pull/5959)] modify code style and remove unused import - [[#6002](https://github.com/seata/seata/pull/6002)] remove fst serialization - [[#6045](https://github.com/seata/seata/pull/6045)] optimize derivative product check base on mysql +- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration ### security: @@ -225,6 +226,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [ptyin](https://github.com/ptyin) - [jsbxyyx](https://github.com/jsbxyyx) - [xxxcrel](https://github.com/xxxcrel) +- [yiqi](https://github.com/PleaseGiveMeTheCoke) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.0.0.md b/changes/zh-cn/2.0.0.md index 08559915448..079b8dc9b15 100644 --- a/changes/zh-cn/2.0.0.md +++ b/changes/zh-cn/2.0.0.md @@ -156,6 +156,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单 - [[#5959](https://github.com/seata/seata/pull/5959)] 修正代码风格问题及去除无用的类引用 - [[#6002](https://github.com/seata/seata/pull/6002)] 移除fst序列化模块 - [[#6045](https://github.com/seata/seata/pull/6045)] 优化MySQL衍生数据库判断逻辑 +- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长 ### security: @@ -227,6 +228,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单 - [ptyin](https://github.com/ptyin) - [jsbxyyx](https://github.com/jsbxyyx) - [xxxcrel](https://github.com/xxxcrel) +- [xxxcrel](https://github.com/xxxcrel) +- [yiqi](https://github.com/PleaseGiveMeTheCoke) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 138759a1386..e476c9a1a73 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -100,7 +100,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private final NettyClientBootstrap clientBootstrap; private final NettyClientChannelManager clientChannelManager; private final NettyPoolKey.TransactionRole transactionRole; - private volatile static ExecutorService mergeSendExecutorService; + private static volatile ExecutorService mergeSendExecutorService; private TransactionMessageHandler transactionMessageHandler; protected volatile boolean enableClientBatchSendRequest; From 0646cc107c33d491cdd39a236a46cdac56d97e38 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 21 Nov 2023 11:34:32 +0800 Subject: [PATCH 06/10] 1. add pr and author information 2. optimize code style --- changes/en-us/2.0.0.md | 2 -- changes/en-us/2.x.md | 3 ++- changes/zh-cn/2.0.0.md | 3 --- changes/zh-cn/2.x.md | 2 ++ 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/changes/en-us/2.0.0.md b/changes/en-us/2.0.0.md index e718e9a487d..6768f6311cb 100644 --- a/changes/en-us/2.0.0.md +++ b/changes/en-us/2.0.0.md @@ -155,7 +155,6 @@ The version is updated as follows: - [[#5959](https://github.com/seata/seata/pull/5959)] modify code style and remove unused import - [[#6002](https://github.com/seata/seata/pull/6002)] remove fst serialization - [[#6045](https://github.com/seata/seata/pull/6045)] optimize derivative product check base on mysql -- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration ### security: @@ -226,7 +225,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [ptyin](https://github.com/ptyin) - [jsbxyyx](https://github.com/jsbxyyx) - [xxxcrel](https://github.com/xxxcrel) -- [yiqi](https://github.com/PleaseGiveMeTheCoke) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index af9ed444f59..95a8bf38bf4 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -10,6 +10,7 @@ Add changes here for all PR submitted to the 2.x branch. ### optimize: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR +- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration ### security: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR @@ -21,6 +22,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) - +- [yiqi](https://github.com/PleaseGiveMeTheCoke) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.0.0.md b/changes/zh-cn/2.0.0.md index 079b8dc9b15..08559915448 100644 --- a/changes/zh-cn/2.0.0.md +++ b/changes/zh-cn/2.0.0.md @@ -156,7 +156,6 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单 - [[#5959](https://github.com/seata/seata/pull/5959)] 修正代码风格问题及去除无用的类引用 - [[#6002](https://github.com/seata/seata/pull/6002)] 移除fst序列化模块 - [[#6045](https://github.com/seata/seata/pull/6045)] 优化MySQL衍生数据库判断逻辑 -- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长 ### security: @@ -228,8 +227,6 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单 - [ptyin](https://github.com/ptyin) - [jsbxyyx](https://github.com/jsbxyyx) - [xxxcrel](https://github.com/xxxcrel) -- [xxxcrel](https://github.com/xxxcrel) -- [yiqi](https://github.com/PleaseGiveMeTheCoke) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index dba99a3d98e..05e217f7e4b 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -10,6 +10,7 @@ ### optimize: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述 +- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长 ### security: - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述 @@ -21,5 +22,6 @@ - [slievrly](https://github.com/slievrly) +- [yiqi](https://github.com/PleaseGiveMeTheCoke) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 From c4d6c41d7a87020fa8c96449c05951f36ca81625 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 21 Nov 2023 11:46:05 +0800 Subject: [PATCH 07/10] remove a example line of changes doc --- changes/en-us/2.x.md | 1 - changes/zh-cn/2.x.md | 1 - 2 files changed, 2 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 95a8bf38bf4..bad8e7dadfc 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -9,7 +9,6 @@ Add changes here for all PR submitted to the 2.x branch. - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR ### optimize: -- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR - [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 05e217f7e4b..569ad27c825 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -9,7 +9,6 @@ - [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述 ### optimize: -- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述 - [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长 ### security: From c9b5863616f7e1f39179af50730addc941a0f872 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Sat, 25 Nov 2023 11:16:59 +0800 Subject: [PATCH 08/10] Add judgment on whether basketMap is empty before wait() --- .../io/seata/core/protocol/VersionInfo.java | 27 +++++++++++++++++++ .../netty/AbstractNettyRemotingClient.java | 13 +++++---- 2 files changed, 35 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/io/seata/core/protocol/VersionInfo.java diff --git a/core/src/main/java/io/seata/core/protocol/VersionInfo.java b/core/src/main/java/io/seata/core/protocol/VersionInfo.java new file mode 100644 index 00000000000..218631b6e4d --- /dev/null +++ b/core/src/main/java/io/seata/core/protocol/VersionInfo.java @@ -0,0 +1,27 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.protocol; + +/** + * The interface VersionInfo. + * + * @author wang.liang + */ +interface VersionInfo { + + String VERSION = "${project.version}"; + +} diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index e476c9a1a73..883704d9852 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -17,6 +17,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -343,10 +344,12 @@ private static class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { - synchronized (MERGE_LOCK) { - try { - MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS); - } catch (InterruptedException e) { + if(BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))){ + synchronized (MERGE_LOCK) { + try { + MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS); + } catch (InterruptedException e) { + } } } isSending = true; @@ -355,7 +358,7 @@ public void run() { return; } - AbstractNettyRemotingClient client = null; + AbstractNettyRemotingClient client; if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) { client = RmNettyRemotingClient.getInstance(); } else { From fbe822278bfba3446fb68b38914b2fd00c1fbdfd Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Sat, 25 Nov 2023 11:19:56 +0800 Subject: [PATCH 09/10] delete repetitive file --- .../io/seata/core/protocol/VersionInfo.java | 27 ------------------- 1 file changed, 27 deletions(-) delete mode 100644 core/src/main/java/io/seata/core/protocol/VersionInfo.java diff --git a/core/src/main/java/io/seata/core/protocol/VersionInfo.java b/core/src/main/java/io/seata/core/protocol/VersionInfo.java deleted file mode 100644 index 218631b6e4d..00000000000 --- a/core/src/main/java/io/seata/core/protocol/VersionInfo.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 1999-2019 Seata.io Group. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.seata.core.protocol; - -/** - * The interface VersionInfo. - * - * @author wang.liang - */ -interface VersionInfo { - - String VERSION = "${project.version}"; - -} From 30122a39b82694cf2c88452f7fba706b6dcf9b10 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Sat, 25 Nov 2023 23:41:34 +0800 Subject: [PATCH 10/10] double check lock --- .../core/rpc/netty/AbstractNettyRemotingClient.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 883704d9852..1e052d27af8 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -344,11 +344,13 @@ private static class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { - if(BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))){ + if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) { synchronized (MERGE_LOCK) { - try { - MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS); - } catch (InterruptedException e) { + if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) { + try { + MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS); + } catch (InterruptedException e) { + } } } }