Skip to content

Commit

Permalink
optimize: fall back to any of available cluster address when query cl…
Browse files Browse the repository at this point in the history
…uster address is empty (#6797)
  • Loading branch information
laywin authored Aug 30, 2024
1 parent 525d1b9 commit b0c2bb2
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 50 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty


### optimize:
Expand Down Expand Up @@ -113,6 +114,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [l81893521](https://github.com/l81893521)
- [laywin](https://github.com/laywin)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 2 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题
- [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址


### optimize:
Expand Down Expand Up @@ -73,7 +74,6 @@
- [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题
- [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题


### refactor:


Expand Down Expand Up @@ -117,6 +117,7 @@
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [l81893521](https://github.com/l81893521)
- [laywin](https://github.com/laywin)



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener
private static final int THREAD_POOL_NUM = 1;
private static final int MAP_INITIAL_CAPACITY = 8;

private String transactionServiceGroup;

/**
* default tcp check interval
*/
Expand Down Expand Up @@ -161,6 +163,7 @@ public void unsubscribe(String cluster, ConsulListener listener) throws Exceptio

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
final String cluster = getServiceGroup(key);
if (cluster == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List<HealthService> services) {

clusterAddressMap.put(cluster, addresses);

removeOfflineAddressesIfNecessary(cluster, addresses);
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
*/
package org.apache.seata.discovery.registry;

import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -27,8 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.seata.config.ConfigurationFactory;

/**
* The interface Registry service.
*
Expand All @@ -54,7 +54,7 @@ public interface RegistryService<T> {
/**
* Service node health check
*/
Map<String,List<InetSocketAddress>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
Map<String, Map<String, List<InetSocketAddress>>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>();
/**
* Register.
*
Expand Down Expand Up @@ -119,12 +119,29 @@ default String getServiceGroup(String key) {
}

default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>());
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
k -> new ConcurrentHashMap<>());

String clusterName = getServiceGroup(transactionServiceGroup);
List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(clusterName);
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
return inetSocketAddresses;
}

// fall back to addresses of any cluster
return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
.findAny().orElse(Collections.emptyList());
}

default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);

Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
key -> new ConcurrentHashMap<>());

String clusterName = getServiceGroup(transactionServiceGroup);

return clusterAddressMap.put(clusterName, aliveAddress);
}


Expand All @@ -137,15 +154,21 @@ default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGrou
* @param clusterName
* @param newAddressed
*/
default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) {
default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection<InetSocketAddress> newAddressed) {

Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService,
key -> new ConcurrentHashMap<>());

List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
List<InetSocketAddress> currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList());

List<InetSocketAddress> inetSocketAddresses = currentAddresses
.stream().filter(newAddressed::contains).collect(
Collectors.toList());

CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
// prevent empty update
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
clusterAddressMap.put(clusterName, inetSocketAddresses);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener>
private static final int MAP_INITIAL_CAPACITY = 8;
private static final int THREAD_POOL_SIZE = 2;
private ExecutorService executorService;

private String transactionServiceGroup;
/**
* TTL for lease
*/
Expand Down Expand Up @@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
final String cluster = getServiceGroup(key);
if (cluster == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception {
}).collect(Collectors.toList());
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));

removeOfflineAddressesIfNecessary(cluster, instanceList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
private static volatile EurekaRegistryServiceImpl instance;
private static volatile EurekaClient eurekaClient;

private String transactionServiceGroup;

private EurekaRegistryServiceImpl() {
}

Expand Down Expand Up @@ -130,6 +132,7 @@ public void unsubscribe(String cluster, EurekaEventListener listener) throws Exc

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) {
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener>
private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com");
private static volatile Boolean useSLBWay;

private String transactionServiceGroup;

private NacosRegistryServiceImpl() {
String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB());
Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB)
Expand Down Expand Up @@ -193,7 +195,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.apache.seata.discovery.registry.namingserver;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.HashMap;
import java.util.Objects;
import java.util.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -41,22 +41,23 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -322,17 +323,6 @@ public void unsubscribe(String vGroup) throws Exception {
isSubscribed = false;
}

@Override
public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
}

@Override
public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
}

/**
* @param key vGroup name
* @return List<InetSocketAddress> available instance list
Expand Down Expand Up @@ -413,6 +403,31 @@ public String getNamespace() {
return namespace;
}

@Override
public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
k -> new ConcurrentHashMap<>());

List<InetSocketAddress> inetSocketAddresses = clusterAddressMap.get(transactionServiceGroup);
if (CollectionUtils.isNotEmpty(inetSocketAddresses)) {
return inetSocketAddresses;
}

// fall back to addresses of any cluster
return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty)
.findAny().orElse(Collections.emptyList());
}

@Override
public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
Map<String, List<InetSocketAddress>> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
key -> new ConcurrentHashMap<>());


return clusterAddressMap.put(transactionServiceGroup, aliveAddress);
}


/**
* get one namingserver url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener>
private static final long KEY_TTL = 5L;
private static final long KEY_REFRESH_PERIOD = 2000L;

private String transactionServiceGroup;

private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("RedisRegistryService-subscribe", 1));
private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1,
Expand Down Expand Up @@ -219,6 +221,7 @@ public void unsubscribe(String cluster, RedisListener listener) {

@Override
public List<InetSocketAddress> lookup(String key) {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand Down Expand Up @@ -280,7 +283,7 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S
}
socketAddresses.remove(inetSocketAddress);

removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses);
removeOfflineAddressesIfNecessary(transactionServiceGroup, notifyCluserName, socketAddresses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb

private static volatile SofaRegistryServiceImpl instance;

private String transactionServiceGroup;

private SofaRegistryServiceImpl() {
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public void unsubscribe(String cluster, SubscriberDataObserver listener) throws

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
transactionServiceGroup = key;
String clusterName = getServiceGroup(key);
if (clusterName == null) {
String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
Expand All @@ -174,7 +177,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
List<InetSocketAddress> newAddressList = flatData(instances);
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);

removeOfflineAddressesIfNecessary(clusterName, newAddressList);
removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList);
}
respondRegistries.countDown();
});
Expand Down
Loading

0 comments on commit b0c2bb2

Please sign in to comment.