title | tags | categories | keywords | description | cover | abbrlink | date | ||||
---|---|---|---|---|---|---|---|---|---|---|---|
05.Dubbo源码系列V1-Dubbo第五节-服务导出源码解析 |
|
|
Dubbo,rpc |
Dubbo服务导出源码解析 |
48141866 |
2021-10-06 07:11:58 -0700 |
https://www.yuque.com/books/share/f2394ae6-381b-4f44-819e-c231b39c1497(密码:kyys) 《Dubbo笔记》
- 服务导出的入口为ServiceBean中的export()方法,当Spring启动完之后,通过接收Spring的ContextRefreshedEvent事件来触发export()方法的执行。
- 一个ServiceBean对象就表示一个Dubbo服务,ServiceBean对象中的参数就表示服务的参数,比如timeout,该对象的参数值来至@Service注解中所定义的。
- 服务导出主要得做两件事情:
- 根据服务的参数信息,启动对应的网络服务器(netty、tomcat、jetty等),用来接收网络请求
- 将服务的信息注册到注册中心
- 但是在做这两件事情之前得先把服务的参数确定好,因为一个Dubbo服务的参数,除开可以在@Service注解中去配置,还会继承Dubbo服务所属应用(Application)上的配置,还可以在配置中心或JVM环境变量中去配置某个服务的参数,所以首先要做的是确定好当前服务最终的(优先级最高)的参数值。
- 确定好服务参数之后,就根据所配置的协议启动对应的网络服务器。在启动网络服务器时,并且在网络服务器接收请求的过程中,都可以从服务参数中获取信息,比如最大连接数,线程数,socket超时时间等等。
- 启动完网络服务器之后,就将服务信息注册到注册中心。同时还有向注册中心注册监听器,监听Dubbo的中的动态配置信息变更。
服务导出就是服务注册的意思
- DemoService接口表示一个服务,此时的服务表示服务定义
- DemoServiceImpl表示DemoService服务的具体实现,此时的服务表示服务的具体实现
- DemoService+group+version表示一个服务,此时的服务增加了分组和版本概念
- http://192.168.1.112:80/com.tuling.DemoService表示一个服务,此时的服务增加了机器IP和Port,表示远程机器可以访问这个URL来使用com.tuling.DemoService这个服务
- http://192.168.1.112:80/com.tuling.DemoService?timeout=3000&version=1.0.1&application=dubbo-demo-provider-application表示一个服务,此时的服务是拥有参数的,比如超时时间、版本号、所属应用
在dubbo中就是用的最后一种方式来表示服务的。
服务导出要做的几件事情:
- 确定服务的参数 2. 确定服务支持的协议 2. 构造服务最终的URL
- 根据服务支持的不同协议,启动不同的Server,用来接收和处理请求
- 将服务URL注册到注册中心去
- 因为Dubbo支持动态配置服务参数,所以服务导出时还需要绑定一个监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进行导出
-
在执行ServiceConfig.export()时,此时ServiceConfig对象就代表一个服务(也可以说ServiceBena代表一个服务,因为本来就是继承关系),我们已经知道了这个服务的名字(就是服务提供者接口的名字),并且此时这个服务可能已经有一些参数了,就是**@Service注解上所定义的参数**。
-
但是在Dubbo中,除开可以在@Service注解中给服务配置参数,还有很多地方也可以给服务配置参数,比如:
- dubbo.properties文件,你可以建立这个文件,dubbo会去读取这个文件的内容作为服务的参数,Dubob的源码中叫做PropertiesConfiguration
- 配置中心,dubbo在2.7版本后就支持了分布式配置中心,你可以在Dubbo-Admin中去操作配置中心,分布式配置中心就相当于一个远程的dubbo.properties文件,你可以在Dubbo-Admin中去修改这个dubbo.properties文件,当然配置中心支持按应用进行配置,也可以按全局进行配置两种,在Dubbo的源码中AppExternalConfiguration表示应用配置,ExternalConfiguration表示全局配置。
- 系统环境变量,你可以在启动应用程序时,通过-D的方式来指定参数,在Dubbo的源码中叫SystemConfiguration
- 再加上通过@Service注解所配置的参数,在Dubbo的源码中叫AbstractConfig
-
服务的参数可以从这四个位置来,这四个位置上如果配了同一个参数的话,优先级从高到低有两种情况:
- SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
- SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration
-
在服务导出时,首先得确定服务的参数。当然,服务的参数除开来自于服务的自身配置外,还可以来自其上级。比如如果服务本身没有配置timeout参数,但是如果服务所属的应用的配置了timeout,那么这个应用下的服务都会继承这个timeout配置。所以在确定服务参数时,需要先从上级获取参数,获取之后,如果服务本身配置了相同的参数,那么则进行覆盖。
//当Spring启动完之后,通过接收Spring的ContextRefreshedEvent事件来触发export()方法的执行。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 当前服务没有被导出并且没有卸载,才导出服务
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 服务导出(服务注册)
export();
}
}
@Override
public void export() {
//调用ServiceConfig#export()
super.export();
// Publish ServiceBeanExportedEvent
// Spring启动完发布ContextRefreshedEvent事件--->服务导出--->发布ServiceBeanExportedEvent
// 程序员可以通过Spring中的ApplicationListener来监听服务导出是否完成
publishExportEvent();
}
private void publishExportEvent() {
//监听这个事件就可以知道Dubbo的服务有没有注册完成
ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
applicationEventPublisher.publishEvent(exportEvent);
}
public synchronized void export() {
//读取服务配置
checkAndUpdateSubConfigs();
// 检查服务是否需要导出
if (!shouldExport()) {
return;
}
// 检查是否需要延迟发布
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 导出服务
doExport();
}
}
public void checkAndUpdateSubConfigs() {
// Use default configs defined explicitly on global configs
// ServiceConfig中的某些属性如果是空的,那么就从ProviderConfig、ModuleConfig、ApplicationConfig中获取
// 补全ServiceConfig中的属性
completeCompoundConfigs();
// Config Center should always being started first.
// 从配置中心获取配置,包括应用配置和全局配置
// 把获取到的配置放入到Environment中的externalConfigurationMap和appExternalConfigurationMap中
// 并刷新所有的XxConfig的属性(除开ServiceConfig),刷新的意思就是将配置中心的配置覆盖调用XxConfig中的属性
// 调用AbstractInterfaceConfig#startConfigCenter()
startConfigCenter();
checkDefault();
checkProtocol();
checkApplication();
// if protocol is not injvm checkRegistry
// 如果protocol不是只有injvm协议,表示服务调用不是只在本机jvm里面调用,那就需要用到注册中心
if (!isOnlyInJvm()) {
checkRegistry();
}
// 刷新ServiceConfig,调用AbstractConfig#refresh()
this.refresh();
// 如果配了metadataReportConfig,那么就刷新配置
checkMetadataReport();
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// 当前服务对应的实现类是一个GenericService,表示没有特定的接口
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
// 加载接口
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 刷新MethodConfig,并判断MethodConfig中对应的方法在接口中是否存在
checkInterfaceAndMethods(interfaceClass, methods);
// 实现类是不是该接口类型
checkRef();
generic = Boolean.FALSE.toString();
}
// local和stub一样,不建议使用了
if (local != null) {
// 如果本地存根为true,则存根类为interfaceName + "Local"
if (Boolean.TRUE.toString().equals(local)) {
local = interfaceName + "Local";
}
// 加载本地存根类
Class<?> localClass;
try {
localClass = ClassUtils.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
// 本地存根
if (stub != null) {
// 如果本地存根为true,则存根类为interfaceName + "Stub"
if (Boolean.TRUE.toString().equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
// 检查local和stub
checkStubAndLocal(interfaceClass);
// 检查mock
checkMock(interfaceClass);
}
/**
* 1.上节课我们在启动类上写了这个配置
* @PropertySource("classpath:/spring/dubbo-provider.properties")
* 2.那么Spring启动的时候就会加载里面的配置到一些xxxConfig里面【Spring整合Dubbo的时候讲过】
* 3.@Service注解里配置的参数被首先读取到了ServiceBean里
* 4.接着会调用这个方法进行补全ServiceBean的配置,从哪里补全呢?就是从上面我们配置的
* dubbo-provider.properties 进行补全
* 5.ServcieBean继承了ServiceConfig,所以它两是一个意思,这里强调一下
* 免得后续看不明白
*/
private void completeCompoundConfigs() {
// 如果配置了provider,那么则从provider中获取信息赋值其他属性,在这些属性为空的情况下
if (provider != null) {
if (application == null) {
setApplication(provider.getApplication());
}
if (module == null) {
setModule(provider.getModule());
}
if (registries == null) {
setRegistries(provider.getRegistries());
}
if (monitor == null) {
setMonitor(provider.getMonitor());
}
if (protocols == null) {
setProtocols(provider.getProtocols());
}
if (configCenter == null) {
setConfigCenter(provider.getConfigCenter());
}
}
// 如果配置了module,那么则从module中获取信息赋值其他属性,在这些属性为空的情况下
if (module != null) {
if (registries == null) {
setRegistries(module.getRegistries());
}
if (monitor == null) {
setMonitor(module.getMonitor());
}
}
// 如果配置了application,那么则从application中获取信息赋值其他属性,在这些属性为空的情况下
if (application != null) {
if (registries == null) {
setRegistries(application.getRegistries());
}
if (monitor == null) {
setMonitor(application.getMonitor());
}
}
}
void startConfigCenter() {
if (configCenter == null) {
ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
}
// 如果配置了ConfigCenter
if (this.configCenter != null) {
// 从其他位置获取配置中心的相关属性信息,比如配置中心地址
// TODO there may have duplicate refresh
this.configCenter.refresh();
// 属性更新后,从远程配置中心获取数据(应用配置,全局配置)
prepareEnvironment();
}
// 从配置中心取到配置数据后,刷新所有的XxConfig中的属性,除开ServiceConfig
ConfigManager.getInstance().refreshAll();
}
private void prepareEnvironment() {
if (configCenter.isValid()) {
if (!configCenter.checkOrUpdateInited()) {
return;
}
// 动态配置中心,管理台上的配置中心
DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
// 如果是zookeeper,获取的就是/dubbo/config/dubbo/dubbo.properties节点中的内容
String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());
String appGroup = application != null ? application.getName() : null;
String appConfigContent = null;
if (StringUtils.isNotEmpty(appGroup)) {
// 获取的就是/dubbo/config/dubbo-demo-consumer-application/dubbo.properties节点中的内容
// 这里有bug
appConfigContent = dynamicConfiguration.getProperties
(StringUtils.isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(),
appGroup
);
}
try {
Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority());
//这个就是全局的,就是在网页上那个配置管理里的global
Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent));
//这个就是某个应用的配置
Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent));
} catch (IOException e) {
throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
}
}
}
public void refreshAll() {
// refresh all configs here,
getApplication().ifPresent(ApplicationConfig::refresh);
getMonitor().ifPresent(MonitorConfig::refresh);
getModule().ifPresent(ModuleConfig::refresh);
getProtocols().values().forEach(ProtocolConfig::refresh);
getRegistries().values().forEach(RegistryConfig::refresh);
getProviders().values().forEach(ProviderConfig::refresh);
getConsumers().values().forEach(ConsumerConfig::refresh);
}
/**
* 1.刷新XxConfig
* 2.一个XxConfig对象的属性可能是有值的,也可能是没有值的,这时需要从其他位置获取属性值,来进行属性的覆盖
* 覆盖的优先级,从大到小为系统变量->配置中心应用配置->配置中心全局配置->注解或xml中定义->dubbo.properties文件
* 3.以ServiceConfig为例,ServiceConfig中包括很多属性,比如timeout
* 但是在定义一个Service时,如果在注解上没有配置timeout,那么就会其他地方获取timeout的配置
* 比如可以从系统变量->配置中心应用配置->配置中心全局配置->注解或xml中定义->dubbo.properties文件
* refresh是刷新,将当前ServiceConfig上的set方法所对应的属性更新为优先级最高的值
*/
public void refresh() {
try {
/**
* 1.这里确定的配置优先级从高到低是这样的
* 系统环境变量【JVM环境变量->操作系统环境变量】->配置中心应用配置->配置中心全局配置->dubbo.properties文件
* 2.调用的是Environment#getConfiguration()
*/
CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId());
// 表示XxConfig对象本身- AbstractConfig
Configuration config = new ConfigConfigurationAdapter(this); // ServiceConfig
if (Environment.getInstance().isConfigCenterFirst()) {//这个是默认的
// 优先级顺序: SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
compositeConfiguration.addConfiguration(4, config);
} else {
// The sequence would be: SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration
compositeConfiguration.addConfiguration(2, config);
}
// loop methods, get override value and set the new value back to method
Method[] methods = getClass().getMethods(); //ServiceBean
for (Method method : methods) {
// 是不是setXX()方法
if (MethodUtils.isSetter(method)) {
// 获取xx配置项的value
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
// isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig.
if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) {
method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
}
// 是不是setParameters()方法
} else if (isParametersSetter(method)) {
// 获取parameter配置项的value
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value)) {
Map<String, String> map = invokeGetParameters(getClass(), this);
map = map == null ? new HashMap<>() : map;
map.putAll(convert(StringUtils.parseParameters(value), ""));
invokeSetParameters(getClass(), this, map);
}
}
}
} catch (Exception e) {
logger.error("Failed to override ", e);
}
}
public CompositeConfiguration getConfiguration(String prefix, String id) {
CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
// Config center has the highest priority
// JVM环境变量
compositeConfiguration.addConfiguration(this.getSystemConfig(prefix, id));
// 操作系统环境变量
compositeConfiguration.addConfiguration(this.getEnvironmentConfig(prefix, id));
// 配置中心APP配置
compositeConfiguration.addConfiguration(this.getAppExternalConfig(prefix, id));
// 配置中心Global配置
compositeConfiguration.addConfiguration(this.getExternalConfig(prefix, id));
// dubbo.properties中的配置
compositeConfiguration.addConfiguration(this.getPropertiesConfig(prefix, id));
return compositeConfiguration;
}
确定服务所支持的协议还是比较简单的,就是看用户配了多少个Protocol。和服务参数意义,Protocol也是可以在各个配置点进行配置的。
- 首先在SpringBoot的application.properties文件中就可能配置了协议
- 也可能在dubbo.properties文件中配置了协议
- 也可能在配置中心中也配置了协议
- 也可能通过-D的方式也配置了协议
所以在服务导出时,需要从以上几个地方获取协议,结果可能是一个协议,也可能是多个协议,从而确定出协议。
-
资源
- 注册中心URL:zookeeper://ip+port?dynamic=true
- 服务:dubbo://ip+port/接口名?timeout=3000
- 服务:http://ip+port/接口名?timeout=3000
-
方便扩展
有了确定的协议,服务名,服务参数后,自然就可以组装成服务的URL了。
但是还有一点是非常重要的,在Dubbo中支持服务动态配置,注意,这个和配置中心不是同一概念,动态配置是可以在服务导出后动态的去修改服务配置的,而配置中心则不能达到这一的效果(这个我要在确定一下)。
动态配置,其实就是继续给服务增加了一些参数,所以在把服务的URL注册到注册中心去之前,得先按照动态配置中所添加的配置重写一下URL,也就是应用上动态配置中的参数。
只有这样作完之后得到的URL才是真正准确的服务提供者URL。
- 根据服务支持的不同协议,启动不同的Server,用来接收和处理请求
- 将服务URL注册到注册中心去
- 因为Dubbo支持动态配置服务参数,所以服务导出时还需要绑定一个监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进行导出
这三个步骤都会在
ServiceConfig#export()#doExport()
这个方法里做,流程比较复杂,就直接看代码吧
这个部分的源码是前面三个步骤公用的
public synchronized void export() {
//读取服务配置
checkAndUpdateSubConfigs();
// 检查服务是否需要导出,@Service里可以配置
if (!shouldExport()) {
return;
}
// 检查是否需要延迟发布,@Service里可以配置
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 导出服务
doExport();
}
}
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
// 已经导出了,就不再导出了
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
private void doExportUrls() {
// registryURL 表示一个注册中心
List<URL> registryURLs = loadRegistries(true);
//配置的每一个protocol都会产生一个dubbo服务,所以这里是循环配置的协议,
// 我们这里假设配置了dubbo,但是配了两个端口,这样也算两个protocol
for (ProtocolConfig protocolConfig : protocols) {
// pathKey = group/contextpath/path:version
// 例子:myGroup/user/org.apache.dubbo.demo.DemoService:1.0.1
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// ProviderModel中存在服务提供者访问路径,实现类,接口,以及接口中的各个方法对应的ProviderMethodModel
// ProviderMethodModel表示某一个方法,方法名,所属的服务的,
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
// ApplicationModel表示应用中有哪些服务提供者和引用了哪些服务
ApplicationModel.initProviderModel(pathKey, providerModel);
// 重点,每一个协议都会注册一个服务
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// protocolConfig表示某个协议,registryURLs表示所有的注册中心
// 如果配置的某个协议,没有配置name,那么默认为dubbo
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 这个map表示服务url的参数
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
appendRuntimeParameters(map);
// 监控中心参数
appendParameters(map, metrics);
// 应用相关参数
appendParameters(map, application);
// 模块相关参数
appendParameters(map, module);
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
// 提供者相关参数
appendParameters(map, provider);
// 协议相关参数
appendParameters(map, protocolConfig);
// 服务本身相关参数
appendParameters(map, this);
// 服务中某些方法参数,@Service里可以针对某些方法配置某些参数
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
// 某个方法的配置参数,注意有prefix
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
// 如果某个方法配置存在xx.retry=false,则改成xx.retry=0
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if (Boolean.FALSE.toString().equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
// 遍历当前方法配置中的参数配置
for (ArgumentConfig argument : arguments) {
// 如果配置了type,则遍历当前接口的所有方法,然后找到方法名和当前方法名相等的方法,可能存在多个
// 如果配置了index,则看index对应位置的参数类型是否等于type,如果相等,则向map中存入argument对象中的参数
// 如果没有配置index,那么则遍历方法所有的参数类型,等于type则向map中存入argument对象中的参数
// 如果没有配置type,但配置了index,则把对应位置的argument放入map
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 通过接口对应的Wrapper,拿到接口中所有的方法名字
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// Token是为了防止服务被消费者直接调用(伪造http请求),可以在@Service里配置
// 这里防止的是某些消费者不是从注册中心拿到的URL调用提供者,而是消费者自己拼出的URL进行调用
// 服务调用的是会有个Tokenfilter过滤器进行拦截(后面讲)
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// export service
// 通过该host和port访问该服务
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 服务url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
/**
* url:http://192.168.40.17:80/org.apache.dubbo.demo.DemoService?anyhost=true&application=
* dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService
* &bind.ip=192.168.40.17&bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true&
* generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=285072
* &release=&side=provider×tamp=1585206500409
*
* 1.可以通过ConfiguratorFactory,对服务url再次进行配置
* 2.意思就是可以自己实现一个ConfiguratorFactory的实现类,实现对应方法对URL进行自定义修改
* 3.这个实现类是通过SPI进行加载的
*/
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY); // scope可能为null,remote, local,none
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果scope为none,则不会进行任何的服务导出,既不会远程,也不会本地
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
// 如果scope不是remote,则会进行本地导出,会把当前url的protocol改为injvm,然后进行导出
// 这样的话就只有本地的JVM才能调用
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 如果scope不是local,则会进行远程导出
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 如果有注册中心,则将服务注册到注册中心
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
// 如果是injvm,则不需要进行注册中心注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 拿到监控中心地址
URL monitorUrl = loadMonitor(registryURL);
// 当前服务连接哪个监控中心
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// 服务的register参数,如果为true,则表示要注册到注册中心
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
// 服务使用的动态代理机制,如果为空则使用javassit
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
/**
* 1.生成一个当前服务接口的代理对象
* 2.使用代理生成一个Invoker,Invoker表示服务提供者的代理,可以使用Invoker的invoke方法执行服务
* 就是把注册中心的URL和服务的URL拼起来,registryURL + "export" + url,对应的url为:
* <code>
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=
http://192.168.40.17:80/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-annotation-provider&bean.name=
ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17&
bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true&generic=
false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&
pid=19472&release=&side=provider×tamp=1585207994860&pid=19472&
registry=zookeeper×tamp=1585207994828
* <code/>
* 3.这个Invoker中包括了服务的实现者、服务接口类、服务的注册地址(针对当前服务的,参数export
指定了当前服务)
* 4.此invoker表示一个可执行的服务,调用invoker的invoke()方法即可执行服务,同时此invoker也可用来导出
* 在服务导出(注册)的时候,invoker只是存在某一个地方,等消费者调用服务的时候才会执行
* 5.ref就是之前讲过的服务具体实现类
* 6.这里第二个参数传的是URL(具体就是registryURL),后面exporter马上会用
*/
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// invoker.invoke(Invocation)
// DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
//把this(也就是serviceconfig服务参数)和invoker 服务实现类等 再包装一下
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
/**
* 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter
* 1.exporter导出器怎么确定用哪个实现类的export方法呢?SPI机制会判断哪个invoker里面有getURL这个方法
* 【这里不知道怎么调哪个类的哪个方法的请看前面讲的SPI】
* 2.因为前面invoker传的是registryURL,所以我们这里就会使用RegistryProtocol进行服务注册
* registryURL可以理解为注册中心的注册协议吧,debug这里,就会看到是这样的registry://127.0.0.1:2181......
* 3.注册完了之后,使用DubboProtocol进行导出
* 4.到此为止做了哪些事情? ServiceBean.export()-->刷新ServiceBean的参数-->得到注册中心URL和协议URL-->
* 遍历每个协议URL-->组成服务URL-->生成可执行服务Invoker-->导出服务
* 5.这里就会调用RegistryProtocol#export(Invoker)
*/
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 没有配置注册中心时,也会导出服务
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
// 根据服务url,讲服务的元信息存入元数据中心
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
/**
* 1.这里又是SPI的知识。protocol属性的值是哪来的,是在SPI中注入进来的,是一个代理类
* 2.InvokerDelegate的父类InvokerWrapper有getURL方法,所以最终SPI决定调哪个扩展点
* 是通过providerUrl决定的,而providerUrl这里基本就是DubboProtocol或HttpProtocol去export
* 3.我们这里用的是dubbo协议,所以会调用DubboProtocol
* 4.为什么需要ExporterChangeableWrapper?方便注销已经被导出的服务
*/
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 唯一标识一个服务的key
String key = serviceKey(url);
// 构造一个Exporter进行服务导出
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
// 服务的stub方法
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 开启NettyServer
// 请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务
openServer(url);
// 特殊的一些序列化机制,比如kryo提供了注册机制来注册类,提高序列化和反序列化的速度
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress(); // 获得ip地址和port, 192.168.40.17:20880
// NettyClient, NettyServer
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 缓存Server对象
ExchangeServer server = serverMap.get(key);
// DCL,Double Check Lock
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建Server,并进行缓存
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
// 服务重新导出时,就会走这里 这里会调用HeaderExchangeServer#reset
server.reset(url);
}
}
}
protected static String serviceKey(URL url) {
int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
// path就是@Service注解配的,path配了就用,不配就不用。这四个参数作用就是生成唯一标识一个服务的key
// 从这里就可以看出,协议相同的只要端口号不一样,依然算不同的服务
return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY));
}
这里不是dubbo的核心,就不贴源码了
在服务URL中指定了协议,比如Http协议、Dubbo协议。根据不同的协议启动对应的Server。
比如Http协议就启动Tomcat、Jetty。
比如Dubbo协议就启动Netty。
不能只启动Server,还需要绑定一个RequestHandler,用来处理请求。
比如,Http协议对应的就是InternalHandler。Dubbo协议对应的就是ExchangeHandler。
这里来详细分析一下Dubbo协议所启动的Server。
- 调用DubboProtocol的openServer(URL url)方法开启启动Server
- 调用DubboProtocol的createServer(url)方法,在createServer()方法中调用**Exchangers.bind(url, requestHandler)**得到一个ExchangeServer
- 其中requestHandler表示请求处理器,用来处理请求
- 在**Exchangers.bind(url, requestHandler)**中,先会根据URL得到一个Exchanger,默认为HeaderExchanger
- HeaderExchanger中包括HeaderExchangeClient、HeaderExchangeServer
- HeaderExchangeClient负责发送心跳,HeaderExchangeServer负责接收心跳,如果超时则会关闭channel
- 在构造HeaderExchangeServer之前,会通过调用Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))方法的到一个Server
- 默认会使用getTransporter去bind(URL url, ChannelHandler listener)从而得到一个Servlet,此时的listener就是外部传进来的DecodeHandler
- 在NettyTransporter的bind方法中会去new NettyServer(url, listener),所以上面返回的Server默认就是NettyServer
- 在构造NettyServer时,会调用ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))再构造一个ChannelHandler。
- wrap中的handler就是上面的listener
- 在wrap方法中会调用new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));构造一个ChannelHandler。
- 构造完ChannelHandler后,就是真正的去开启Server了,会调用AbstractServer抽象类的doOpen方法。
- 在NettyServer中,会实现doOpen方法,会调用new NettyServerHandler(getUrl(), this)构造一个NettyServerHandler,并bind地址
- 至此,DubboProtocol协议的启动Server流程就结束。
总结一下DubboProtocol协议的RequestHandler链路:
-
NettyServerHandler:与NettyServer直接绑定的请求处理器,负责从Netty接收到请求,channelRead()方法获取到请求,然后调用下一层的Handler(NettyServer)的received()方法将请求传递下去,此时的请求还是Object msg
-
NettyServer:NettyServer的父类AbstractPeer中存在received(),该方法没有做什么,直接把msg传递给下一层Handler(MultiMessageHandler)
-
MultiMessageHandler:此Handler会判断msg是否是一个MultiMessage,如果是,则对MultiMessage进行拆分,则把拆分出来的msg传递给下层Handler(HeartbeatHandler),如果不是,则直接把msg传递给下层Handler(HeartbeatHandler)
-
HeartbeatHandler:此Handler通过received()方法接收到msg,然后判断该msg是不是一个心跳请求或心跳响应,如果是心跳请求,则此Handler返回一个Response对象(很简单的一个对象),如果是心跳响应,则打印一个日志,不会有其他逻辑,如果都不是,则把msg传递给下层Handler(AllChannelHandler)。
-
AllChannelHandler:此Handler通过received()方法接收到msg,然后把msg封装为一个ChannelEventRunnable对象,并把ChannelEventRunnable扔到线程池中去,异步去处理该msg。在ChannelEventRunnable中会把msg交给下一个Handler(DecodeHandler)
-
DecodeHandler:此Handler通过received()方法接收到msg,会对msg解析decode解码,然后交给下一个Handler(HeaderExchangeHandler)
-
HeaderExchangeHandler:此Handler通过received()方法接收到msg,会判断msg的类型
- 如果Request是TwoWay,则会调用下一个Handler(DubboProtocol中的requestHandler)的reply方法得到一个结果,然后返回
- 如果Request不是TwoWay,则会调用下一个Handler(DubboProtocol中的requestHandler)的received方法处理该请求,不会返回结果
-
requestHandler:此Handler是真正的处理请求逻辑,在received()方法中,如果msg是Invocation,则会调用reply方法,但不会返回reply方法所返回的结果,在reply方法中把msg强制转换为Invocation类型 inv,然后根据inv得到对应的服务Invoker,然后调用invoke(inv)方法,得到结果。
public void register(URL registryUrl, URL registeredProviderUrl) {
// 这里最终也是通过SPI机制,判断传过来的是什么,我们这里在前面把registry转成了zookeeper的URL
Registry registry = registryFactory.getRegistry(registryUrl);
// 所以这里会调用ZookeeperRegistry的register方法,实际上是先调用ZookeeperRegistry的父类FailbackRegistry
registry.register(registeredProviderUrl);
}
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 然后在这里调用ZookeeperRegistry#doRegister,这个URL参数很明显又是一个SPI的提现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
动态配置不能改端口
-
服务在导出的过程中需要向动态配置中心的数据进行订阅,以便当管理人员修改了动态配置中心中对应服务的参数后,服务提供者能及时做出变化。此功能涉及到版本兼容,因为在Dubbo2.7之前也存在此功能,Dubbo2.7开始对此功能进行了调整。
-
在Dubbo2.7之前,仅支持多某个服务的动态配置
-
在Dubbo2.7之后,不仅支持对单个服务的动态配置,也支持对某个应用的动态配置(相当于对这个应用下的所有服务生效)
-
为了达到这个功能,需要利用Zookeeper的Watcher机制,所以对于服务提供者而言,我到底监听哪个Zookeeper节点的数据变化呢?
-
这个节点是有规则的,并且在Dubbo2.7前后也不一样:
- Dubbo2.7之前:监听的zk路径是:
/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000
注意,注意监听的是节点名字的变化,而不是节点内容 - Dubbo2.7之后,监听的zk路径是:
- 服务:
/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators
节点的内容 - 应用:
/dubbo/config/dubbo/dubbo-demo-provider-application.configurators
节点的内容
- 服务:
- Dubbo2.7之前:监听的zk路径是:
-
注意,要和配置中心的路径区分开来,配置中心的路径是:
- 应用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties节点的内容
- 全局:/dubbo/config/dubbo/dubbo.properties节点的内容
所以在一个服务进行导出时,需要在服务提供者端给当前服务生成一个对应的监听器实例,这个监听器实例为OverrideListener,它负责监听对应服务的动态配置变化,并且根据动态配置中心的参数重写服务URL。
除开有OverrideListener之外,在Dubbo2.7之后增加了另外两个:
- ProviderConfigurationListener:监听的是应用的动态配置数据修改,所以它是在RegistryProtocol类中的一个属性,并且是随着RegistryProtocol实例化而实例化好的,一个应用中只有一个
- ServiceConfigurationListener:监听的是服务的动态配置数据修改,和OverrideListener类似,也是对应一个服务的,所以在每个服务进行导出时都会生成一个,实际上ServiceConfigurationListener的内部有一个属性就是OverrideListener,所以当ServiceConfigurationListener监听数据发生了变化时,就会把配置中心的最新数据交给OverrideListener去重写服务URL。
- 同时在RegistryProtocol类中保存了所有服务所对应的OverrideListener,所以实际上当ProviderConfigurationListener监听到数据发生了变化时,也会把它所得到的最新数据依次调用每个OverrideListener去重写服务对应的服务URL。
- ProviderConfigurationListener会监听/dubbo/config/dubbo/dubbo-demo-provider-application.configurators节点
- ServiceConfigurationListener会监听/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点
整理修改动态配置触发流程:
-
修改服务动态配置,底层会修改Zookeeper中的数据,
- /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点的内容
-
ServiceConfigurationListener会监听到节点内容的变化,会触发ServiceConfigurationListener的父类AbstractConfiguratorListener的process(ConfigChangeEvent event)方法
-
ConfigChangeEvent表示一个事件,事件中有事件类型,还有事件内容(节点内容),还有触发这个事件的节点名字,事件类型有三个:
- ADDED
- MODIFIED
- DELETED
-
当接收到一个ConfigChangeEvent事件后,会根据事件类型做对应的处理
- ADDED、MODIFIED:会根据节点内容去生成override://协议的URL,然后根据URL去生成Configurator, Configurator对象很重要,表示一个配置器,根据配置器可以去重写URL
- DELETED:删除ServiceConfigurationListener内的所有的Configurator
-
生成了Configurator后,调用notifyOverrides()方法对服务URL进行重写
-
注意,每次重写并不仅仅只是用到上面所生成的Configurator,每次重写要用到所有的Configurator,包括本服务的Configurator,也包括本应用的Configurator,也包括老版本管理台的Configurator,重写URL的逻辑如下:
-
从exporter中获取目前已经导出了的服务URL-currentUrl
-
根据老版本管理台的Configurator重写服务URL
-
根据providerConfigurationListener中的Configurator重写服务URL
-
根据serviceConfigurationListeners中对应的服务的Configurator重写服务URL
-
如果重写之后newUrl和currentUrl相等,那么不需要做什么了
-
如果重写之后newUrl和currentUrl不相等,则需要进行服务重新导出:
- 根据newUrl进行导出,注意,这里只是就是调用DubboProtocol的export,再次去启动NettyServer
- 对newUrl进行简化,简化为registeredProviderUrl
- 调用RegistryProtocol的unregister()方法,把当前服务之前的服务提供URL从注册中心删掉
- 调用RegistryProtocol的register()方法,把新的registeredProviderUrl注册到注册中心
-
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 导出服务
// registry:// ---> RegistryProtocol
// zookeeper:// ---> ZookeeperRegistry
// dubbo:// ---> DubboProtocol
/**
* 1.registry://xxx?xx=xx®istry=zookeeper ---> zookeeper://xxx?xx=xx 表示注册中心
* 这里就是把registry替换成zookeeper
* 2.示例:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=
* dubbo-demo-provider-application&dubbo=2.0.2&export=dubbo://192.168.40.17:20880/
* org.apache.dubbo.demo.DemoService?anyhost=true&application=
* dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService
* &bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&
* dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&
* logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000&
* timestamp=1590735956489&logger=log4j&pid=27656&release=2.7.0×tamp=1590735956479
*/
URL registryUrl = getRegistryUrl(originInvoker);
// 得到服务提供者url,表示服务提供者
/**
* 1.这里就是把之前export后面拼的dubbo服务url拿出来
* 2.示例:dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=
* dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&
* bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&
* generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello
* &pid=27656&release=2.7.0&side=provider&timeout=3000×tamp=1590735956489
* 3.服务导出最终的目的就是要把providerUrl存到注册中心上,只不过中间有一些其他操作
*/
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// overrideSubscribeUrl是老版本的动态配置监听url,表示了需要监听的服务以及监听的类型(configurators, 这是老版本上的动态配置)
// 在服务提供者url的基础上,生成一个overrideSubscribeUrl,协议为provider://,增加参数category=configurators&check=false
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 一个overrideSubscribeUrl对应一个OverrideListener,用来监听变化事件,监听到overrideSubscribeUrl的变化后,
// OverrideListener就会根据变化进行相应处理,具体处理逻辑看OverrideListener的实现
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
/**
* 在这个方法里会利用providerConfigurationListener和serviceConfigurationListener去重写providerUrl
* providerConfigurationListener表示应用级别的动态配置监听器,providerConfigurationListener是RegistyProtocol的一个属性
* serviceConfigurationListener表示服务级别的动态配置监听器,serviceConfigurationListener是在每暴露一个服务时就会生成一个
* 这两个监听器都是新版本中的监听器
* 新版本监听的zk路径是:
* 服务: /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点的内容
* 应用: /dubbo/config/dubbo/dubbo-demo-provider-application.configurators节点的内容
* 注意,要和配置中心的路径区分开来,配置中心的路径是:
* 应用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties节点的内容
* 全局:/dubbo/config/dubbo/dubbo.properties节点的内容
*/
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker
// 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了,
// 这里会启动netty,启动tomcat这些
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 得到注册中心-ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化,
// 因为有些参数存到注册中心是没有用的
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
//是否需要注册到注册中心
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册服务,把简化后的服务提供者url注册到registryUrl中去
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
/**
* 针对老版本的动态配置,需要把overrideSubscribeListener绑定到overrideSubscribeUrl上去进行监听
* 兼容老版本的配置修改,利用overrideSubscribeListener去监听旧版本的动态配置变化
* 监听overrideSubscribeUrl provider://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&
* application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&
* bind.ip=192.168.40.17&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2&
* dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=416332&
* release=&side=provider×tamp=1585318241955
* 那么新版本的providerConfigurationListener和serviceConfigurationListener是在什么时候进行订阅的呢?在这两个类构造的时候
* Deprecated! Subscribe to override rules in 2.6.x or before.
* 老版本监听的zk路径是:/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000
* 监听的是路径的内容,不是节点的内容
*/
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
/**
* 1.应用配置,providerConfigurationListener是在属性那里直接初始化好的,
* providerConfigurationListener会监听配置中心的应用配置信息变动
* 这个是每一个应用只有一个providerConfigurationListener
* 2.首先这里流程是:
* 1.ProviderConfigurationListener通过构造函数调用父类AbstractConfiguratorListener
* #initWith方法
* 2.在initWith方法中通过传进来的路径key,监听注册中心(常用的是zookeeper)
* key路径下的节点,会先从注册中心拿到当前配置然后转换成configurators
* 3.接着这里调用overrideUrl,用前面的configurators生成新的providerUrl
* 4.这里因为之前的providerUrl是经过@Service注解,配置中心文件(yml或properties)
* 还有-D这种启动参数里的配置,组合成的一个URL。但是这个providerUrl还没有经过
* 网页端的动态配置,所以这里需要重写下URL
* 5.ServiceConfigurationListener同理,而且ServiceConfigurationListener代码顺序在后面
* 所以很明显'服务配置'会覆盖'应用配置'
*/
providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
// 服务配置,new ServiceConfigurationListener的时候回初始化,ServiceConfigurationListener会监听配置中心的服务信息配置信息变动
// 这个是每个服务都会重新new一个ServiceConfigurationListener
ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
return serviceConfigurationListener.overrideUrl(providerUrl);
}
//RegistryProtocol内部类
public ProviderConfigurationListener() {
//订阅 应用名+".configurators" 这里就是新版本ProviderConfigurationListener的监听路径
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
// 在构造ProviderConfigurationListener和ServiceConfigurationListener都会调用到这个方法
// 完成Listener自身订阅到对应的应用和服务
// 订阅关系绑定完了之后,主动从动态配置中心获取一下对应的配置数据生成configurators,后面需要重写providerUrl
protected final void initWith(String key) {
//这里拿到的就是注册中心,我们大部分情况用的是zookeeper
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
// 添加Listener,进行了订阅
dynamicConfiguration.addListener(key, this);
// 从配置中心ConfigCenter获取属于当前应用的动态配置数据,从zk中拿到原始数据(主动从配置中心获取数据)
String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
// 如果存在应用配置信息则根据配置信息生成Configurator
if (!StringUtils.isEmpty(rawConfig)) {
genConfiguratorsFromRawRule(rawConfig);
}
}
private boolean genConfiguratorsFromRawRule(String rawConfig) {
boolean parseSuccess = true;
try {
// parseConfigurators will recognize app/service config automatically.
// 先把应用或服务配置转成url,再根据url生成对应的Configurator
configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig))
.orElse(configurators);
} catch (Exception e) {
logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " +
rawConfig, e);
parseSuccess = false;
}
return parseSuccess;
}
private class ProviderConfigurationListener extends AbstractConfiguratorListener {
public ProviderConfigurationListener() {
//订阅 应用名+".configurators" 这里就是新版本ProviderConfigurationListener的监听路径
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
/**
* Get existing configuration rule and override provider url before exporting.
*
* @param providerUrl
* @param <T>
* @return
*/
private <T> URL overrideUrl(URL providerUrl) {
// 通过configurators去修改/装配providerUrl
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
}
private class ServiceConfigurationListener extends AbstractConfiguratorListener {
private URL providerUrl;
private OverrideListener notifyListener;
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
this.providerUrl = providerUrl;
this.notifyListener = notifyListener;
// 订阅 服务接口名+group+version+".configurators"
this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
//这里是监听入口
@Override
protected void notifyOverrides() {
notifyListener.doOverrideIfNecessary();
}
}
public synchronized void doOverrideIfNecessary() {
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegate) {
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
//The origin invoker 当前服务的原始服务提供者url,没有经过任何动态配置改变的URL
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return;
}
//The current, may have been merged many times,事件触发之前,当前服务被导出的url
URL currentUrl = exporter.getInvoker().getUrl();
//根据configurators修改url,configurators是全量的,并不是某个新增的或删除的,
// 所以是基于原始的url进行修改,并不是基于currentUrl,这里是老版本的configurators
//Merged with this configuration
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
// 这是新版本的configurators
newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
.getConfigurators(), newUrl);
// 修改过的url如果和目前的url不相同,则重新按newUrl导出
if (!currentUrl.equals(newUrl)) {
RegistryProtocol.this.reExport(originInvoker, newUrl);
logger.info("exported provider url changed, origin url: " + originUrl +
", old export url: " + currentUrl + ", new export url: " + newUrl);
}
}
public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
// 根据newInvokerUrl进行导出
// update local exporter
ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);
// 获取准确的ProviderUrl
// update registry
URL registryUrl = getRegistryUrl(originInvoker);
// 对于一个服务提供者url,在注册到注册中心时,会先进行简化
final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl);
//decide if we need to re-publish
// 根据getServiceKey获取ProviderInvokerWrapper
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker);
// 生成一个新的ProviderInvokerWrapper
ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
/**
* Only if the new url going to Registry is different with the previous one should we do unregister and register.
* 如果新的服务提供者url简化后的url和这个服务之前的服务提供者url简化后的url不相等,则需要把新的简化后的服务提供者url注册到注册中心去
*/
if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) {
unregister(registryUrl, providerInvokerWrapper.getProviderUrl());
register(registryUrl, registeredProviderUrl);
newProviderInvokerWrapper.setReg(true);
}
exporter.setRegisterUrl(registeredProviderUrl);
}
private <T> ExporterChangeableWrapper doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
String key = getCacheKey(originInvoker);
final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
} else {
// 到这里才能真正明白,为什么需要InvokerDelegate
// InvokerDelegate表示一个调用者,由invoker+url构成,invoker不变,url可变
final Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl);
// 这里最后又会走到DubboProtocol#export 那里的逻辑,服务重新导出前面见过了
exporter.setExporter(protocol.export(invokerDelegate));
}
return exporter;
}
Q:这里引出一个问题,配置改变之后Netty,tomcat需要重启吗?
A:不需要,为什么?前面的DubboProtocol#export 那里的reset逻辑讲过
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// ....省略
// 开启NettyServer
// 请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务
openServer(url);
// 特殊的一些序列化机制,比如kryo提供了注册机制来注册类,提高序列化和反序列化的速度
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress(); // 获得ip地址和port, 192.168.40.17:20880
// NettyClient, NettyServer
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 缓存Server对象
ExchangeServer server = serverMap.get(key);
// DCL,Double Check Lock
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建Server,并进行缓存
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
// 服务重新导出时,就会走这里 这里会调用HeaderExchangeServer#reset
server.reset(url);
}
}
}
//启动netty的时候会调用这个
public HeaderExchangeServer(Server server) {
Assert.notNull(server, "server == null");
this.server = server;
// 启动定义关闭Channel(socket)的Task
startIdleCheckTask(getUrl());
}
private void startIdleCheckTask(URL url) {
if (!server.canHandleIdle()) { // 底层NettyServer自己有心跳机制,那么上层的ExchangeServer就不用开启心跳任务了
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
int idleTimeout = getIdleTimeout(url);
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
// 定义关闭Channel的Task
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;
// init task and start timer.
// 定时运行closeTimerTask
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}
}
public void reset(URL url) {
server.reset(url);
try {
int currHeartbeat = getHeartbeat(getUrl());
int currIdleTimeout = getIdleTimeout(getUrl());
int heartbeat = getHeartbeat(url);
int idleTimeout = getIdleTimeout(url);
/**
* 1.动态改配置,重新导出服务时不需要重新启动netty,tomcat等等
* 2.这里直接关闭那个服务的channel任务,然后根据新的url重启一个任务就行了
*/
if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) {
cancelCloseTask();
startIdleCheckTask(url);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
@Override
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
Long now = now();
// check ping & pong at server
// 表示Server端有多长时间没有读到过数据或写出过数据了,说白就是超时了
if ((lastRead != null && now - lastRead > idleTimeout)
|| (lastWrite != null && now - lastWrite > idleTimeout)) {
logger.warn("Close channel " + channel + ", because idleCheck timeout: "
+ idleTimeout + "ms");
channel.close();
}
} catch (Throwable t) {
logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t);
}
}
-
ServiceBean.export()方法是导出的入口方法,会执行ServiceConfig.export()方法完成服务导出,导出完了之后会发布一个Spring事件ServiceBeanExportedEvent
-
在ServiceConfig.export()方法中会先调用checkAndUpdateSubConfigs(),这个方法主要完成AbstractConfig的参数刷新(从配置中心获取参数等等),AbstractConfig是指ApplicationConfig、ProtocolConfig、ServiceConfig等等,刷新完后会检查stub、local、mock等参数是否配置正确
-
参数刷新和检查完成了之后,就会开始导出服务,如果配置了延迟导出,那么则按指定的时间利用ScheduledExecutorService来进行延迟导出
-
否则调用doExport()进行服务导出
-
继续调用doExportUrls()进行服务导出
-
首先通过loadRegistries()方法获得所配置的注册中心的URL,可能配了多个配置中心,那么当前所导出的服务需要注册到每个配置中心去,这里,注册中心的是以URL的方式来表示的,使用的是什么注册中心、注册中心的地址和端口,给注册中心所配置的参数等等,都会存在在URL上,此URL以**registry://**开始
-
获得到注册中心的registryURLs之后,就会遍历当前服务所有的ProtocolConfig,调用doExportUrlsFor1Protocol(protocolConfig, registryURLs);方法把当前服务按每个协议每个注册中心分别进行导出
-
在doExportUrlsFor1Protocol()方法中,会先构造一个服务URL,包括
- 服务的协议dubbo://,
- 服务的IP和PORT,如果指定了就取指定的,没有指定IP就获取服务器上网卡的IP,
- 以及服务的PATH,如果没有指定PATH参数,则取接口名
- 以及服务的参数,参数包括服务的参数,服务中某个方法的参数
- 最终得到的URL类似: dubbo://192.168.1.110:20880/com.tuling.DemoService?timeout=3000&&sayHello.loadbalance=random
-
得到服务的URL之后,会把服务URL作为一个参数添加到registryURL中去,然后把registryURL、服务的接口、当前服务实现类ref生成一个Invoker代理对象,再把这个代理对象和当前ServiceConfig对象包装成一个DelegateProviderMetaDataInvoker对象,DelegateProviderMetaDataInvoker就表示了完整的一个服务
-
接下来就会使用Protocol去export导出服务了,导出之后将得到一个Exporter对象(该Exporter对象,可以理解为主要可以用来卸载(unexport)服务,什么时候会卸载服务?在优雅关闭Dubbo应用的时候)
-
接下来我们来详细看看Protocol是怎么导出服务的?
-
但调用protocol.export(wrapperInvoker)方法时,因为protocol是Protocol接口的一个Adaptive对象,所以此时会根据wrapperInvoker的genUrl方法得到一个url,根据此url的协议找到对应的扩展点,此时扩展点就是RegistryProtocol,但是,因为Protocol接口有两个包装类,一个是ProtocolFilterWrapper、ProtocolListenerWrapper,所以实际上在调用export方法时,会经过这两个包装类的export方法,但是在这两个包装类的export方法中都会Registry协议进行了判断,不会做过多处理,所以最终会直接调用到RegistryProtocol的export(Invoker<T> originInvoker)方法
-
在RegistryProtocol的export(Invoker<T> originInvoker)方法中,主要完成了以下几件事情:
-
生成监听器,监听动态配置中心此服务的参数数据的变化,一旦监听到变化,则重写服务URL,并且在服务导出时先重写一次服务URL
-
拿到重写之后的URL之后,调用doLocalExport()进行服务导出,在这个方法中就会调用DubboProtocol的export方法去导出服务了,导出成功后将得到一个ExporterChangeableWrapper
- 在DubboProtocol的export方法中主要要做的事情就是启动NettyServer,并且设置一系列的RequestHandler,以便在接收到请求时能依次被这些RequestHandler所处理
- 这些RequestHandler在上文已经整理过了
-
从originInvoker中获取注册中心的实现类,比如ZookeeperRegistry
-
将重写后的服务URL进行简化,把不用存到注册中心去的参数去除
-
把简化后的服务URL调用ZookeeperRegistry.registry()方法注册到注册中心去
-
最后将ExporterChangeableWrapper封装为DestroyableExporter对象返回,完成服务导出
-
一个服务导出成功后,会生成对应的Exporter:
- DestroyableExporter:Exporter的最外层包装类,这个类的主要作用是可以用来unexporter对应的服务
- ExporterChangeableWrapper:这个类主要负责在unexport对应服务之前,把服务URL从注册中心中移除,把该服务对应的动态配置监听器移除
- ListenerExporterWrapper:这个类主要负责在unexport对应服务之后,把服务导出监听器移除
- DubboExporter:这个类中保存了对应服务的Invoker对象,和当前服务的唯一标志,当NettyServer接收到请求后,会根据请求中的服务信息,找到服务对应的DubboExporter对象,然后从对象中得到Invoker对象
- ProtocolFilterWrapper$CallbackRegistrationInvoker:会去调用下层Invoker,下层Invoker执行完了之后,会遍历过滤器,查看是否有过滤器实现了ListenableFilter接口,如果有,则回调对应的onResponse方法,比如TimeoutFilter,当调用完下层Invoker之后,就会计算服务的执行时间
- ProtocolFilterWrapper$1:ProtocolFilterWrapper中的过滤器组成的Invoker,利用该Invoker,可以执行服务端的过滤器,执行完过滤器之后,调用下层Invoker
- RegistryProtocol$InvokerDelegate:服务的的委托类,里面包含了DelegateProviderMetaDataInvoker对象和服务对应的providerUrl,执行时直接调用下层Invoker
- DelegateProviderMetaDataInvoker:服务的的委托类,里面包含了AbstractProxyInvoker对象和ServiceConfig对象,执行时直接调用下层Invoker
- AbstractProxyInvoker:服务接口的代理类,绑定了对应的实现类,执行时会利用反射调用服务实现类实例的具体方法,并得到结果