From 17358bbd7c3e978b13ccb08183d36d76b88c04ca Mon Sep 17 00:00:00 2001 From: laywin Date: Thu, 11 Jan 2024 17:53:33 +0800 Subject: [PATCH] bugfix: gracefully shut down the server (#6143) --- changes/en-us/2.x.md | 2 + changes/zh-cn/2.x.md | 2 + .../java/io/seata/common/DefaultValues.java | 2 +- .../rpc/netty/NettyRemotingServerTest.java | 83 +++++++++++++++++++ .../consul/ConsulRegistryServiceImpl.java | 9 +- .../discovery/registry/RegistryService.java | 32 ++++++- .../etcd3/EtcdRegistryServiceImpl.java | 2 + .../eureka/EurekaRegistryServiceImpl.java | 2 + .../nacos/NacosRegistryServiceImpl.java | 2 + .../redis/RedisRegistryServiceImpl.java | 4 +- .../sofa/SofaRegistryServiceImpl.java | 4 +- .../zk/ZookeeperRegisterServiceImpl.java | 2 + .../zk/ZookeeperRegisterServiceImplTest.java | 67 +++++++++++++++ .../autoconfigure/CorePropertiesTest.java | 2 +- 14 files changed, 205 insertions(+), 10 deletions(-) create mode 100644 core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index c6a1cd2f55f..df52104bf81 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -15,9 +15,11 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6121](https://github.com/apache/incubator-seata/pull/6121)] fix the branch transaction order error when rolling back - [[#6182](https://github.com/apache/incubator-seata/pull/6182)] fix guava-32.0.0-jre.jar zip file is empty in ci - [[#6196](https://github.com/apache/incubator-seata/pull/6196)] fix asf config file format error +- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] gracefully shut down the server - [[#6204](https://github.com/apache/incubator-seata/pull/6204)] fix the problem that The incorrect configuration needs to be fixed - [[#6248](https://github.com/apache/incubator-seata/pull/6248)] fix JDBC resultSet, statement, connection closing order + ### optimize: - [[#6031](https://github.com/apache/incubator-seata/pull/6031)] add a check for the existence of the undolog table - [[#6089](https://github.com/apache/incubator-seata/pull/6089)] modify the semantics of RaftServerFactory and remove unnecessary singleton diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 0730187615b..047473b5696 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -15,9 +15,11 @@ - [[#6121](https://github.com/apache/incubator-seata/pull/6121)] 修复回滚分支事务时没有按照时间排序的问题 - [[#6182](https://github.com/apache/incubator-seata/pull/6182)] 修复在ci中guava-32.0.0-jre.jar zip文件为空的问题 - [[#6196](https://github.com/apache/incubator-seata/pull/6196)] 修复asf配置格式错误的问题 +- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] 修复优雅停机 - [[#6204](https://github.com/apache/incubator-seata/pull/6204)] 修复错误配置问题 - [[#6248](https://github.com/apache/incubator-seata/pull/6248)] 修复JDBC resultSet, statement, connection关闭顺序 + ### optimize: - [[#6031](https://github.com/apache/incubator-seata/pull/6031)] 添加undo_log表的存在性校验 - [[#6089](https://github.com/apache/incubator-seata/pull/6089)] 修改RaftServerFactory语义并删除不必要的单例构建 diff --git a/common/src/main/java/io/seata/common/DefaultValues.java b/common/src/main/java/io/seata/common/DefaultValues.java index 0db7db8b0d7..1aa5352ab41 100644 --- a/common/src/main/java/io/seata/common/DefaultValues.java +++ b/common/src/main/java/io/seata/common/DefaultValues.java @@ -47,7 +47,7 @@ public interface DefaultValues { /** * Shutdown timeout default 3s */ - int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 3; + int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 13; int DEFAULT_SELECTOR_THREAD_SIZE = 1; int DEFAULT_BOSS_THREAD_SIZE = 1; diff --git a/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java b/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java new file mode 100644 index 00000000000..cad4252626e --- /dev/null +++ b/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import io.seata.common.XID; +import io.seata.common.loader.EnhancedServiceLoader; +import io.seata.core.rpc.RegisterCheckAuthHandler; +import io.seata.discovery.registry.MultiRegistryFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class NettyRemotingServerTest { + + private NettyRemotingServer nettyRemotingServer; + + @BeforeEach + public void init() { + nettyRemotingServer = new NettyRemotingServer(new ThreadPoolExecutor(1, 1, 0, + TimeUnit.SECONDS, new LinkedBlockingDeque<>())); + } + @Test + public void testInit() throws NoSuchFieldException, IllegalAccessException { + + MockedStatic enhancedServiceLoaderMockedStatic = Mockito.mockStatic(EnhancedServiceLoader.class); + enhancedServiceLoaderMockedStatic.when(() -> EnhancedServiceLoader.load((RegisterCheckAuthHandler.class))).thenReturn(null); + + MockedStatic multiRegistryFactoryMockedStatic = Mockito.mockStatic(MultiRegistryFactory.class); + multiRegistryFactoryMockedStatic.when(MultiRegistryFactory::getInstances).thenReturn( + Collections.emptyList()); + + XID.setIpAddress("127.0.0.1"); + XID.setPort(8093); + + nettyRemotingServer.init(); + + multiRegistryFactoryMockedStatic.close(); + enhancedServiceLoaderMockedStatic.close(); + + Field field = NettyRemotingServer.class.getDeclaredField("initialized"); + field.setAccessible(true); + + Assertions.assertTrue(((AtomicBoolean)field.get(nettyRemotingServer)).get()); + } + + @Test + public void testDestroyChannel() { + Channel channel = Mockito.mock(Channel.class); + nettyRemotingServer.destroyChannel("127.0.0.1:8091", channel); + Mockito.verify(channel).close(); + } + + @Test + public void destory() { + nettyRemotingServer.destroy(); + Assertions.assertTrue(nettyRemotingServer != null); + } +} diff --git a/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java b/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java index 7ddf7a62897..9a51e56e1c9 100644 --- a/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java +++ b/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java @@ -304,10 +304,15 @@ private void refreshCluster(String cluster, List services) { if (cluster == null || services == null) { return; } - clusterAddressMap.put(cluster, services.stream() + + List addresses = services.stream() .map(HealthService::getService) .map(service -> new InetSocketAddress(service.getAddress(), service.getPort())) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + + clusterAddressMap.put(cluster, addresses); + + removeOfflineAddressesIfNecessary(cluster, addresses); } /** diff --git a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java index 86486b1708b..56f65f313c3 100644 --- a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java @@ -17,12 +17,15 @@ package io.seata.discovery.registry; import java.net.InetSocketAddress; -import java.util.ArrayList; +import java.util.Set; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Collection; +import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import io.seata.config.ConfigurationCache; import io.seata.config.ConfigurationFactory; @@ -117,12 +120,33 @@ default String getServiceGroup(String key) { } default List aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>()); + return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>()); } default List refreshAliveLookup(String transactionServiceGroup, List aliveAddress) { - return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress); + return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress); + } + + + /** + * + * remove offline addresses if necessary. + * + * Intersection of the old and new addresses + * + * @param clusterName + * @param newAddressed + */ + default void removeOfflineAddressesIfNecessary(String clusterName, Collection newAddressed) { + + List currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList()); + + List inetSocketAddresses = currentAddresses + .stream().filter(newAddressed::contains).collect( + Collectors.toList()); + + CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses); } } diff --git a/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java b/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java index a08be77e79f..3f8f9565066 100644 --- a/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java +++ b/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java @@ -252,6 +252,8 @@ private void refreshCluster(String cluster) throws Exception { return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1])); }).collect(Collectors.toList()); clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList)); + + removeOfflineAddressesIfNecessary(cluster, instanceList); } /** diff --git a/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 3719349710f..62e3251f1de 100644 --- a/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -168,6 +168,8 @@ private void refreshCluster(String clusterName) { .map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort())) .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } } diff --git a/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java b/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java index 708b2b54bd4..002b555bcb0 100644 --- a/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java +++ b/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java @@ -192,6 +192,8 @@ public List lookup(String key) throws Exception { .map(eachInstance -> new InetSocketAddress(eachInstance.getIp(), eachInstance.getPort())) .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } }); } diff --git a/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java b/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java index e0775a404de..483460f90cd 100644 --- a/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java +++ b/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java @@ -278,7 +278,9 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S } } } - CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress); + socketAddresses.remove(inetSocketAddress); + + removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses); } @Override diff --git a/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java b/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java index 39bd39671f1..957ba02c0bf 100644 --- a/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java +++ b/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java @@ -17,9 +17,9 @@ package io.seata.discovery.registry.sofa; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -173,6 +173,8 @@ public List lookup(String key) throws Exception { } else { List newAddressList = flatData(instances); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } respondRegistries.countDown(); }); diff --git a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 0d26f7b6867..d0c01fd8fb9 100644 --- a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -308,6 +308,8 @@ private void refreshClusterAddressMap(String clusterName, List instances } } CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } private String getClusterName() { diff --git a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 9faaadf04d6..9a21e61cf11 100644 --- a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -16,8 +16,10 @@ */ package io.seata.discovery.registry.zk; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -103,4 +105,69 @@ public void execute() throws Throwable { service.unsubscribe("default", listener2); } + @Test + public void testLookUp() throws Exception { + ZookeeperRegisterServiceImpl zookeeperRegisterService = ZookeeperRegisterServiceImpl.getInstance(); + + ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000); + client.createPersistent("/registry/zk/cluster"); + client.createEphemeral("/registry/zk/cluster/127.0.0.1:8091"); + + Field field = ZookeeperRegisterServiceImpl.class.getDeclaredField("zkClient"); + field.setAccessible(true); + field.set(zookeeperRegisterService, client); + + + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); + + + List addressList = zookeeperRegisterService.lookup("default_tx_group"); + + Assertions.assertEquals(addressList, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + } + + @Test + public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + } + + @Test + public void testRemoveOfflineAddressesIfNecessaryRemoveCase() { + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + } + + @Test + public void testAliveLookup() { + + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); + + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + List result = service.aliveLookup("default_tx_group"); + + Assertions.assertEquals(result, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + } + + + @Test + public void tesRefreshAliveLookup() { + + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); + + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + + service.refreshAliveLookup("default_tx_group", + Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"), + Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + } } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java index 829e6f83a2e..4b007b8f886 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java @@ -75,7 +75,7 @@ public void testTransportProperties() { @Test public void testShutdownProperties() { - assertEquals(3L, context.getBean(ShutdownProperties.class).getWait()); + assertEquals(13L, context.getBean(ShutdownProperties.class).getWait()); } @Test