Skip to content

Commit

Permalink
bugfix: gracefully shut down the server (#6143)
Browse files Browse the repository at this point in the history
  • Loading branch information
laywin authored Jan 11, 2024
1 parent 77bfc08 commit 17358bb
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 10 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6121](https://github.com/apache/incubator-seata/pull/6121)] fix the branch transaction order error when rolling back
- [[#6182](https://github.com/apache/incubator-seata/pull/6182)] fix guava-32.0.0-jre.jar zip file is empty in ci
- [[#6196](https://github.com/apache/incubator-seata/pull/6196)] fix asf config file format error
- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] gracefully shut down the server
- [[#6204](https://github.com/apache/incubator-seata/pull/6204)] fix the problem that The incorrect configuration needs to be fixed
- [[#6248](https://github.com/apache/incubator-seata/pull/6248)] fix JDBC resultSet, statement, connection closing order


### optimize:
- [[#6031](https://github.com/apache/incubator-seata/pull/6031)] add a check for the existence of the undolog table
- [[#6089](https://github.com/apache/incubator-seata/pull/6089)] modify the semantics of RaftServerFactory and remove unnecessary singleton
Expand Down
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
- [[#6121](https://github.com/apache/incubator-seata/pull/6121)] 修复回滚分支事务时没有按照时间排序的问题
- [[#6182](https://github.com/apache/incubator-seata/pull/6182)] 修复在ci中guava-32.0.0-jre.jar zip文件为空的问题
- [[#6196](https://github.com/apache/incubator-seata/pull/6196)] 修复asf配置格式错误的问题
- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] 修复优雅停机
- [[#6204](https://github.com/apache/incubator-seata/pull/6204)] 修复错误配置问题
- [[#6248](https://github.com/apache/incubator-seata/pull/6248)] 修复JDBC resultSet, statement, connection关闭顺序


### optimize:
- [[#6031](https://github.com/apache/incubator-seata/pull/6031)] 添加undo_log表的存在性校验
- [[#6089](https://github.com/apache/incubator-seata/pull/6089)] 修改RaftServerFactory语义并删除不必要的单例构建
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface DefaultValues {
/**
* Shutdown timeout default 3s
*/
int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 3;
int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 13;
int DEFAULT_SELECTOR_THREAD_SIZE = 1;
int DEFAULT_BOSS_THREAD_SIZE = 1;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.seata.common.XID;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.core.rpc.RegisterCheckAuthHandler;
import io.seata.discovery.registry.MultiRegistryFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


public class NettyRemotingServerTest {

private NettyRemotingServer nettyRemotingServer;

@BeforeEach
public void init() {
nettyRemotingServer = new NettyRemotingServer(new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new LinkedBlockingDeque<>()));
}
@Test
public void testInit() throws NoSuchFieldException, IllegalAccessException {

MockedStatic<EnhancedServiceLoader> enhancedServiceLoaderMockedStatic = Mockito.mockStatic(EnhancedServiceLoader.class);
enhancedServiceLoaderMockedStatic.when(() -> EnhancedServiceLoader.load((RegisterCheckAuthHandler.class))).thenReturn(null);

MockedStatic<MultiRegistryFactory> multiRegistryFactoryMockedStatic = Mockito.mockStatic(MultiRegistryFactory.class);
multiRegistryFactoryMockedStatic.when(MultiRegistryFactory::getInstances).thenReturn(
Collections.emptyList());

XID.setIpAddress("127.0.0.1");
XID.setPort(8093);

nettyRemotingServer.init();

multiRegistryFactoryMockedStatic.close();
enhancedServiceLoaderMockedStatic.close();

Field field = NettyRemotingServer.class.getDeclaredField("initialized");
field.setAccessible(true);

Assertions.assertTrue(((AtomicBoolean)field.get(nettyRemotingServer)).get());
}

@Test
public void testDestroyChannel() {
Channel channel = Mockito.mock(Channel.class);
nettyRemotingServer.destroyChannel("127.0.0.1:8091", channel);
Mockito.verify(channel).close();
}

@Test
public void destory() {
nettyRemotingServer.destroy();
Assertions.assertTrue(nettyRemotingServer != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,15 @@ private void refreshCluster(String cluster, List<HealthService> services) {
if (cluster == null || services == null) {
return;
}
clusterAddressMap.put(cluster, services.stream()

List<InetSocketAddress> addresses = services.stream()
.map(HealthService::getService)
.map(service -> new InetSocketAddress(service.getAddress(), service.getPort()))
.collect(Collectors.toList()));
.collect(Collectors.toList());

clusterAddressMap.put(cluster, addresses);

removeOfflineAddressesIfNecessary(cluster, addresses);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package io.seata.discovery.registry;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationFactory;

Expand Down Expand Up @@ -117,12 +120,33 @@ default String getServiceGroup(String key) {
}

default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>());
}

default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);
}


/**
*
* remove offline addresses if necessary.
*
* Intersection of the old and new addresses
*
* @param clusterName
* @param newAddressed
*/
default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) {

List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());

List<InetSocketAddress> inetSocketAddresses = currentAddresses
.stream().filter(newAddressed::contains).collect(
Collectors.toList());

CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ private void refreshCluster(String cluster) throws Exception {
return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1]));
}).collect(Collectors.toList());
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));

removeOfflineAddressesIfNecessary(cluster, instanceList);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ private void refreshCluster(String clusterName) {
.map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort()))
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
.map(eachInstance -> new InetSocketAddress(eachInstance.getIp(), eachInstance.getPort()))
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S
}
}
}
CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress);
socketAddresses.remove(inetSocketAddress);

removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package io.seata.discovery.registry.sofa;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -173,6 +173,8 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
} else {
List<InetSocketAddress> newAddressList = flatData(instances);
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
}
respondRegistries.countDown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ private void refreshClusterAddressMap(String clusterName, List<String> instances
}
}
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
}

private String getClusterName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package io.seata.discovery.registry.zk;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,4 +105,69 @@ public void execute() throws Throwable {
service.unsubscribe("default", listener2);
}

@Test
public void testLookUp() throws Exception {
ZookeeperRegisterServiceImpl zookeeperRegisterService = ZookeeperRegisterServiceImpl.getInstance();

ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000);
client.createPersistent("/registry/zk/cluster");
client.createEphemeral("/registry/zk/cluster/127.0.0.1:8091");

Field field = ZookeeperRegisterServiceImpl.class.getDeclaredField("zkClient");
field.setAccessible(true);
field.set(zookeeperRegisterService, client);


System.setProperty("txServiceGroup", "default_tx_group");
System.setProperty("service.vgroupMapping.default_tx_group", "cluster");


List<InetSocketAddress> addressList = zookeeperRegisterService.lookup("default_tx_group");

Assertions.assertEquals(addressList, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
}

@Test
public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() {
service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));

Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("cluster").size());
}

@Test
public void testRemoveOfflineAddressesIfNecessaryRemoveCase() {
service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091)));

Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("cluster").size());
}

@Test
public void testAliveLookup() {

System.setProperty("txServiceGroup", "default_tx_group");
System.setProperty("service.vgroupMapping.default_tx_group", "cluster");

service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
List<InetSocketAddress> result = service.aliveLookup("default_tx_group");

Assertions.assertEquals(result, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
}


@Test
public void tesRefreshAliveLookup() {

System.setProperty("txServiceGroup", "default_tx_group");
System.setProperty("service.vgroupMapping.default_tx_group", "cluster");

service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));

service.refreshAliveLookup("default_tx_group",
Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091)));

Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"),
Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testTransportProperties() {

@Test
public void testShutdownProperties() {
assertEquals(3L, context.getBean(ShutdownProperties.class).getWait());
assertEquals(13L, context.getBean(ShutdownProperties.class).getWait());
}

@Test
Expand Down

0 comments on commit 17358bb

Please sign in to comment.