You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
//默认使用fixed线程池
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
解决方案
可以参考官网issue
https://github.com/apache/dubbo/pull/4131
Reduce context switching cost by optimizing thread model on consumer side
可以升级版本或者自定义线程池
比如2.7.5
public ExecutorService getExecutor(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}
Integer portKey = url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null) {
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);//相同port共享
}
}
return executor;
}
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
Map<Integer, ExecutorService> executors = data.computeIfAbsent(getExecutorKey(url), k -> new ConcurrentHashMap<>());
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
if (url.getParameter(THREAD_NAME_KEY) == null) {
url = url.putAttribute(THREAD_NAME_KEY, "Dubbo-protocol-" + portKey);
}
URL finalUrl = url;
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(finalUrl));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);//创建线程池
executors.put(portKey, executor);
}
return executor;
}
/**
* Return the executor key based on the type (internal or biz) of the current service.
*
* @param url
* @return
*/
private String getExecutorKey(URL url) {
String executorKey = INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
ServiceDescriptor serviceDescriptor = applicationModel.getInternalModule().getServiceRepository().lookupService(url.getServiceInterface());
// if not found in internal service repository, then it's biz service defined by user.
if (serviceDescriptor == null) {
executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;
}
return executorKey;
}
private ExecutorService createExecutor(URL url) {
return (ExecutorService) extensionAccessor.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
@Override
public ExecutorService getExecutor(URL url) {
Map<Integer, ExecutorService> executors = data.get(getExecutorKey(url));
/*
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {//如果已经shutdown 直接清除缓存
executors.remove(portKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
}
if (executor == null) {//如果没有使用默认的共享线程池,是一个cache类型的
return frameworkExecutorRepository.getSharedExecutor();
} else {
return executor;
}
}
The text was updated successfully, but these errors were encountered:
异常现象
在进行性能测试的时候发现,dubbo线程consumer服务直接线程打满超过 5000个线程,而实际配置的线程数 明明最多配置的是1000
原因
先看下consumer配置参数
这里配置了10个连接,线程池大小1000;在只有一台provider的前提下,consumer怎么可能会超过5000个线程,加上容器最大线程限制,导致start完consumer服务直接卡死或者down掉了
实际上早起dubbo版本是一个连接一个线程池的,那么10个连接,自然就是最大10*1000
解决方案
可以参考官网issue
Reduce context switching cost by optimizing thread model on consumer side
可以升级版本或者自定义线程池
比如2.7.5
3.x
1.先调用createExecutorIfAbsent 创建线程池,注意这里是以port为维度的,也就是同一个dubbo服务共享一个线程池
2.createExecutor具体的创建,这里才调用那几种fixed,cached,eager等
3.用的时候判断有没有就行了,没有的话只能用shared的共享线程池
The text was updated successfully, but these errors were encountered: