Replies: 2 comments 2 replies
-
关于seata-go集成注册中心的方案,我在这里提出初步的想法,大家可以先看一下,有好的建议可以提出来。 seata-go目前没有实现TC,只有RM和TM,RM和TM没有被发现的需要,不用进行服务注册,在seata-java中也是如此。 因此seata-go集成注册中心的大致方向如下:
如果大家觉得方向没有问题,我会尽快开展后续工作。 |
Beta Was this translation helpful? Give feedback.
-
seata-go注册中心集成方案详述seata-go是seata体系中的一部分,因此需要首先对seata-java的注册中心方案进行详细梳理,seata-go的方案也应当以完全覆盖java侧能力为最终目标。@georgehao 曾建议参考arana中的注册中心代码,因此我也对arana的方案进行了梳理。希望能够结合seata-java和arana来决定seata-go注册中心的最后方案。 seata-java注册中心方案根据seata多语言目前的发展方向,所有客户端都依赖java版本的server。Seata Server(后续简称TC)将自身服务地址注册到注册中心,RM、TM客户端则从注册中心获取TC地址列表,也就是说TM和RM仅需要服务发现能力,不需要进行服务注册。 seata-java中的注册中心代码主要涉及以下几个模块:config、discovery、core、tm、rm。 https://github.com/seata/seata/tree/1.5.2 我所使用的源码版本为1.5.2,代码依赖Spring。在后续的代码片段中我会适当对源码进行简化,以更加方便地看清脉络。 config该模块核心内容是Configuration接口以及用于获取Configuration实例的ConfigurationFactory。 seata-java集成了完整的配置中心能力,Configuration接口是对配置中心能力的抽象,定义了查询、写入配置相关的一系列重载方法。 public interface Configuration {
/**
* Gets config.
*
* @param dataId the data id
* @return the config
*/
String getConfig(String dataId);
} ConfigurationFactory使用static代码块实现在类加载阶段初始化一个Configuration实例,该实例用于获取初始配置。 public static Configuration CURRENT_FILE_INSTANCE;
static {
load();
}
private static void load() {
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}
ConfigurationFactory提供getInstance函数,用于获取Configuration实例,该实例用于获取初始配置中指定的配置文件或配置中心中的配置信息。请务必和CURRENT_FILE_INSTANCE区分开。 public static Configuration getInstance() {
if (instance == null) {
synchronized (Configuration.class) {
if (instance == null) {
instance = buildConfiguration();
}
}
}
return instance;
} buildConfiguration的过程如下:
seata-java支持的配置类型包括:nacos、consul、apollo、zk、etcd3、file。 discovery服务发现模块中包括核心部分以及各类注册中心的实现,而核心部分主要是registry和loadbalance。registry中是对服务注册和服务发现能力的抽象,而loadbalance则是一组负载均衡均衡策略,用于后续对同一TC的多个服务地址进行负载均衡。 registryregistry子模块中是RegistryService接口以及获取RegistryService实例的RegistryFactory。 RegistryService接口是对注册中心能力的抽象,包含了完成的服务注册和服务发现。 public interface RegistryService<T> {
void register(InetSocketAddress address) throws Exception;
void unregister(InetSocketAddress address) throws Exception;
void subscribe(String cluster, T listener) throws Exception;
void unsubscribe(String cluster, T listener) throws Exception;
List<InetSocketAddress> lookup(String key) throws Exception;
void close() throws Exception;
} RegistryFactory通过静态内部类RegistryFactoryHolder的加载过程,初始化RegistryService实例。这里是一次单例模式的应用。 public class RegistryFactory {
public static RegistryService getInstance() {
return RegistryFactoryHolder.INSTANCE;
}
private static RegistryService buildRegistryService() {
RegistryType registryType;
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
registryType = RegistryType.getType(registryTypeName);
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName);
}
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
private static class RegistryFactoryHolder {
private static final RegistryService INSTANCE = buildRegistryService();
}
} buildRegistryService过程如下:
seata-java支持的注册中心类型包括:nacos、eureka、redis、zk、consul、etcd3、sofa、file、custom。 loadbalanceloadbalance子模块中是LoadBalance接口、5种接口实现以及用于获取负载均衡器实例的LoadBalanceFactory。 public interface LoadBalance {
<T> T select(List<T> invokers, String xid) throws Exception;
}
public class LoadBalanceFactory {
public static LoadBalance getInstance() {
String config = ConfigurationFactory.getInstance().getConfig(LOAD_BALANCE_TYPE, DEFAULT_LOAD_BALANCE);
return EnhancedServiceLoader.load(LoadBalance.class, config);
}
} LoadBalance接口中使用了泛型,抽象级别较高。select方法用于从一个对象列表中选出一个,xid是一个全局事务唯一标识。在seata-java服务发现场景下,T的实际类型将会是TC的服务地址(InetSocketAddress)。 LoadBalanceFactory通过Conguration实例查询client.loadbalance.type值,默认是XID。 5种client.loadbalance.type值对应5种负载均衡器实现:
coreseata-java核心模块中与注册中心相关的是rpc子模块,其中RM和TM的NettyRemotingClient初始化、数据发送功能需要使用注册中心获取TC地址。 NettyRemotingClient初始化//io.seata.core.rpc.netty.RmNettyRemotingClient
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
// Found one or more resources that were registered before initialization
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
//io.seata.core.rpc.netty.TmNettyRemotingClient
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
} RmNettyRemotingClient和TmNettyRemotingClient在初始化时会与每个TC地址建立长连接。 //io.seata.core.rpc.netty.NettyClientChannelManager
private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance()
.lookup(transactionServiceGroup);
if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
return Collections.emptyList();
}
return availInetSocketAddressList.stream()
.map(NetUtil::toStringAddress)
.collect(Collectors.toList());
}
void reconnect(String transactionServiceGroup) {
List<String> availList = getAvailServerList(transactionServiceGroup);
Set<String> channelAddress = new HashSet<>(availList.size());
for (String serverAddress : availList) {
acquireChannel(serverAddress);
channelAddress.add(serverAddress);
}
if (CollectionUtils.isNotEmpty(channelAddress)) {
List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
for (String address : channelAddress) {
String[] array = address.split(":");
aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
}
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
} else {
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
}
} TC地址列表则是通过RegistryFactory.getInstance()获取到的注册中心实例查询到的。transactionServiceGroup是查询TC地址列表的key,对应着配置文件中的seata.tx-service-group配置项。TC地址列表会被注册中心实例缓存到本地。 NettyRemotingClient的数据发送public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
protected String loadBalance(String transactionServiceGroup, Object msg) {
InetSocketAddress address = null;
List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
address = this.doSelect(inetSocketAddressList, msg);
return NetUtil.toStringAddress(address);
}
protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception {
if (CollectionUtils.isNotEmpty(list)) {
if (list.size() > 1) {
return LoadBalanceFactory.getInstance().select(list, getXid(msg));
} else {
return list.get(0);
}
}
return null;
} 在数据发送时通过RegistryFactory.getInstance()获取RegistryService实例,查询TC地址列表。通过LoadBalanceFactory.getInstance()获取LoadBalance实例,并从TC地址列表中选出一个地址。 arana-db注册中心方案https://github.com/arana-db/arana arana-db是一个云原生数据库代理,也可以作为数据库网格的sidecar部署。其中与注册中心相关的是registry模块,整个模块并不复杂,非常简洁。 arana-db仅支持etcd和nacos两种注册中心。registry下分为base、etcd、nacos、store四部分,并对外提供了Discovery和Registry的初始化函数入口。 basebase部分包含ServiceInstance结构体,以及Registry和Discovery两个接口。ServiceInstance代表注册到注册中心的一个服务,Registry和Discovery则是对服务注册和服务发现能力的抽象。 // ServiceInstance is an instance of a service in a discovery system.
type ServiceInstance struct {
// ID is the unique instance ID as registered.
ID string `json:"id"`
// Name is the service name as registered.
Name string `json:"name"`
// Version is the version of the compiled.
Version string `json:"version"`
// Endpoint addresses of the service instance.
Endpoint *config.Listener
}
func (p ServiceInstance) String() string {
return fmt.Sprintf("Service instance: id:%s, name:%s, version:%s, endpoints:%s", p.ID, p.Name, p.Version, p.Endpoint)
}
type Registry interface {
Register(ctx context.Context, serviceInstance *ServiceInstance) error
Unregister(ctx context.Context, name string) error
UnregisterAllService(ctx context.Context) error
}
type Discovery interface {
GetServices() []*ServiceInstance
WatchService() <-chan []*ServiceInstance
Close()
} 我注意到arana-db将服务注册和服务发现能力定义在两个不同的接口中,而seata-java是统一放在了RegistryService一个接口中。 storestore部分的核心内容是Store接口,定义了一组k/v操作函数。 type Store interface {
// Put a value at the specified key
Put(ctx context.Context, key string, value []byte, ttl int64) error
// Get a value given its key
Get(ctx context.Context, key string) ([]byte, error)
// Delete the value at the specified key
Delete(ctx context.Context, key string) error
// Exists if a Key exists in the store
Exists(ctx context.Context, key string) (bool, error)
// Watch for changes on a key
Watch(ctx context.Context, key string, stopCh <-chan struct{}) (<-chan []byte, error)
// WatchTree watches for changes on child nodes under
// a given directory
WatchTree(ctx context.Context, directory string, stopCh <-chan struct{}) (<-chan [][]byte, error)
// List the content of a given prefix
List(ctx context.Context, directory string) ([][]byte, error)
// Close the store connection
Close()
} 在etcdv3.go中对Store接口进行了实现,以支持对EtcdV3的k/v数据操作。 etcdetcd部分依赖了[EtcdV3客户端](https://github.com/etcd-io/etcd/tree/main/client/v3)。 分别在discovery.go和registery.go中对Discovery和Registry两个接口进行了实现,通过调用store中的api实现对etcdv3的数据进行存取。 nacosnaco部分依赖了[nacos的Go语言sdk](https://github.com/nacos-group/nacos-sdk-go)。 分别在discovery.go和registery.go中对Discovery和Registry两个接口进行了实现,通过多nacos sdk实现对nacos配置中心的操作。 初始化流程无论是Registry还是Discovery,都是根据配置中获取的注册中心类型去初始化对应类型的实例。这和seata-java中通过类型初始化实例是一致的。 func InitDiscovery(storeType string, options map[string]interface{}) (base.Discovery, error) {
var serviceDiscovery base.Discovery
var err error
switch storeType {
case base.ETCD:
serviceDiscovery, err = initEtcdDiscovery(options)
case base.NACOS:
initNacosV2Discovery(options)
default:
err = errors.Errorf("Service registry not support store:%s", storeType)
}
if err != nil {
err = errors.Wrap(err, "init service registry err:%v")
log.Fatal(err.Error())
return nil, err
}
return serviceDiscovery, nil
} seata-go 注册中心方案的规划通过对seata-java和arana-db两个项目的注册中心方案梳理,seata-go注册中心方案基本清晰了。 方向
实施步骤seata-go需要新增一个服务发现模块,并修改client、remoting两个模块,将服务发现能力集成进去。 目前可以拆分出6个高优先级事项,10个低优先级事项。 discovery在该模块中完成基础抽象代码以及总体框架的编写,为后续多人并行参与打好基础。 高优先级工作
低优先级工作调研eureka、zk、redis、sofa、custom注册中心接入方案,尽量基于已有的Go语言SDK去实现。
client部分配置项在现有的seata-go代码中已经支持,该项工作难度并不大。
remoting需要对remoting模块下的getty和loadbalance子模块进行扩展。 getty目前seata-go已经支持使用getty与TC地址列表中的每个地址建立长连接,该项工作难度也不大。
loadbalance目前seata-go已经支持在发送数据时通过负载均衡选择一个TC地址,但只实现了XID和Random两种负载均衡策略。待支持的如下:
另外,XID和Random负载均衡代码缺少对应的单元测试,可以进行补充。
|
Beta Was this translation helpful? Give feedback.
-
注册中心方案设计
Beta Was this translation helpful? Give feedback.
All reactions