Skip to content

Commit

Permalink
fix conflicts and optimize cloud refresher
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Dec 16, 2023
2 parents 5396aae + 5075cce commit 980c9dc
Show file tree
Hide file tree
Showing 169 changed files with 6,481 additions and 2,910 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build-jvmti.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
run: |
cd ${{ github.workspace }}/jvmti/jvmti-build
mvn package
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: lib
path: jvmti/jvmti-build/target/classes/lib*
Expand All @@ -35,7 +35,7 @@ jobs:
run: |
cd ${{ github.workspace }}/jvmti/jvmti-build
mvn package
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: lib
path: jvmti/jvmti-build/target/classes/lib*
Expand All @@ -54,7 +54,7 @@ jobs:
run: |
cd ${{ github.workspace }}/jvmti/jvmti-build
mvn package
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: lib
path: jvmti/jvmti-build/target/classes/*.dll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.baidu.cloud.starlight.api.rpc.StarlightClient;
import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
import com.baidu.cloud.starlight.core.rpc.SingleStarlightClient;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -61,8 +60,7 @@ protected String getTpPrefix() {
protected void initialize() {
super.initialize();

List<StarlightClient> starlightClients = Lists.newArrayList();
starlightClients.addAll(JVMTI.getInstances(StarlightClient.class));
List<StarlightClient> starlightClients = JVMTI.getInstances(StarlightClient.class);
if (CollectionUtils.isEmpty(starlightClients)) {
log.warn("Cannot find beans of type StarlightClient.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ protected void initialize() {
super.initialize();

val bean = JVMTI.getInstance(StarlightServer.class);
if (!(bean instanceof DefaultStarlightServer)) {
if (!(bean instanceof DefaultStarlightServer starlightServer)) {
return;
}
val starlightServer = (DefaultStarlightServer) bean;
val uri = (URI) ReflectionUtil.getFieldValue(DefaultStarlightServer.class, URI_FIELD, starlightServer);
val serverPeer = (ServerPeer) ReflectionUtil.getFieldValue(DefaultStarlightServer.class,
SERVER_PEER_FIELD, starlightServer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.dromara.dynamictp.adapter.common;

import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.dromara.dynamictp.common.event.AlarmCheckEvent;
import org.dromara.dynamictp.common.event.CollectEvent;
import org.dromara.dynamictp.common.event.RefreshEvent;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.core.handler.CollectorHandler;
import org.dromara.dynamictp.core.notifier.manager.AlarmManager;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.GenericApplicationListener;
import org.springframework.core.ResolvableType;
Expand Down Expand Up @@ -58,12 +58,12 @@ public boolean supportsEventType(ResolvableType resolvableType) {
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
if (event instanceof RefreshEvent refreshEvent) {
doRefresh(refreshEvent.getDtpProperties());
} else if (event instanceof CollectEvent collectEvent) {
doCollect(collectEvent.getDtpProperties());
} else if (event instanceof AlarmCheckEvent alarmCheckEvent) {
doAlarmCheck(alarmCheckEvent.getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
Expand Down
5 changes: 5 additions & 0 deletions adapter/adapter-dubbo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
<artifactId>dynamic-tp-adapter-common</artifactId>
</dependency>

<dependency>
<groupId>org.dromara.dynamictp</groupId>
<artifactId>dynamic-tp-jvmti-runtime</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.store.DataStore;
import com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
import lombok.val;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.dromara.dynamictp.jvmti.JVMTI;
import org.springframework.beans.factory.InitializingBean;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -46,6 +46,8 @@ public class AlibabaDubboDtpAdapter extends AbstractDtpAdapter implements Initia

private static final String TP_PREFIX = "dubboTp";

private static final String EXECUTOR_FIELD = "executor";

private final AtomicBoolean registered = new AtomicBoolean(false);

@Override
Expand Down Expand Up @@ -74,13 +76,17 @@ public void refresh(DtpProperties dtpProperties) {
@Override
protected void initialize() {
super.initialize();
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executorMap = dataStore.get(EXECUTOR_SERVICE_COMPONENT_KEY);
if (MapUtils.isNotEmpty(executorMap) && registered.compareAndSet(false, true)) {
executorMap.forEach((k, v) -> {
val proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v);
executorMap.replace(k, proxy);
putAndFinalize(genTpName(k), (ExecutorService) v, proxy);
val handlers = JVMTI.getInstances(WrappedChannelHandler.class);
if (CollectionUtils.isNotEmpty(handlers) && registered.compareAndSet(false, true)) {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
handlers.forEach(handler -> {
val executor = handler.getExecutor();
if (executor instanceof ThreadPoolExecutor) {
String port = String.valueOf(handler.getUrl().getPort());
String tpName = genTpName(port);
enhanceOriginExecutor(tpName, (ThreadPoolExecutor) executor, EXECUTOR_FIELD, handler);
dataStore.put(EXECUTOR_SERVICE_COMPONENT_KEY, port, handler.getExecutor());
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ protected void initialize() {
String key = Optional.ofNullable(internalServer)
.map(server -> {
final SocketAddress address = server.getListenSocketAddress();
if (address instanceof InetSocketAddress) {
return String.valueOf(((InetSocketAddress) address).getPort());
} else if (address instanceof InProcessSocketAddress) {
return ((InProcessSocketAddress) address).getName();
if (address instanceof InetSocketAddress inetSocketAddress) {
return String.valueOf(inetSocketAddress.getPort());
} else if (address instanceof InProcessSocketAddress inProcessSocketAddress) {
return inProcessSocketAddress.getName();
}
return null;
}).orElse(null);
if (Objects.isNull(key)) {
continue;
}
val executor = (Executor) ReflectionUtil.getFieldValue(ServerImpl.class, EXECUTOR_FIELD, serverImpl);
if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor) {
enhanceOriginExecutor(genTpName(key), (ThreadPoolExecutor) executor, EXECUTOR_FIELD, serverImpl);
if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor threadPoolExecutor) {
enhanceOriginExecutor(genTpName(key), threadPoolExecutor, EXECUTOR_FIELD, serverImpl);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ protected void initialize() {
}
beans.forEach((k, v) -> {
val executor = v.dispatcher().executorService();
if (executor instanceof ThreadPoolExecutor) {
enhanceOriginExecutor(genTpName(k), (ThreadPoolExecutor) executor, EXECUTOR_SERVICE_FIELD, v.dispatcher());
if (executor instanceof ThreadPoolExecutor threadPoolExecutor) {
enhanceOriginExecutor(genTpName(k), threadPoolExecutor, EXECUTOR_SERVICE_FIELD, v.dispatcher());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,14 @@ protected void initialize() {
servers.forEach(v -> {
ThreadPoolExecutor executor = null;
ServerConfig serverConfig = null;
if (v instanceof BoltServer) {
BoltServer server = (BoltServer) v;
executor = server.getBizThreadPool();
if (v instanceof BoltServer boltServer) {
executor = boltServer.getBizThreadPool();
serverConfig = (ServerConfig) ReflectionUtil.getFieldValue(BoltServer.class,
SERVER_CONFIG_FIELD, server);
} else if (v instanceof AbstractHttpServer) {
AbstractHttpServer server = (AbstractHttpServer) v;
executor = server.getBizThreadPool();
SERVER_CONFIG_FIELD, boltServer);
} else if (v instanceof AbstractHttpServer httpServer) {
executor = httpServer.getBizThreadPool();
serverConfig = (ServerConfig) ReflectionUtil.getFieldValue(AbstractHttpServer.class,
SERVER_CONFIG_FIELD, server);
SERVER_CONFIG_FIELD, httpServer);
}
if (Objects.isNull(executor) || Objects.isNull(serverConfig)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private DtpProperties() { }
private boolean enabledBanner = true;

/**
* Config file type.
* Config file type, for zookeeper and etcd.
*/
private String configType;

Expand Down Expand Up @@ -79,11 +79,6 @@ private DtpProperties() { }
*/
private List<NotifyPlatform> platforms;

/**
* Apollo config.
*/
private Apollo apollo;

/**
* Zookeeper config.
*/
Expand Down Expand Up @@ -168,12 +163,6 @@ public static DtpProperties getInstance() {
return Holder.INSTANCE;
}

@Data
public static class Apollo {

private String namespace;
}

@Data
public static class Zookeeper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.em.ConfigFileTypeEnum;
Expand All @@ -34,6 +35,10 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTIES_PREFIX;

/**
* AbstractRefresher related
Expand Down Expand Up @@ -93,6 +98,13 @@ protected void doRefresh(DtpProperties properties) {
publishEvent(properties);
}

protected boolean needRefresh(Set<String> keys) {
keys = keys.stream()
.filter(str -> str.startsWith(MAIN_PROPERTIES_PREFIX))
.collect(Collectors.toSet());
return CollectionUtils.isNotEmpty(keys);
}

private void publishEvent(DtpProperties dtpProperties) {
RefreshEvent event = new RefreshEvent(this, dtpProperties);
ApplicationContextHolder.publishEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public class ThreadPoolBuilder {
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down.
*/
private int awaitTerminationSeconds = 0;
private int awaitTerminationSeconds = 3;

/**
* If io intensive thread pool.
Expand Down Expand Up @@ -429,6 +429,16 @@ public ScheduledExecutorService buildScheduled() {
}
}

/**
* Build ordered thread pool executor.
*
* @return the newly created DtpExecutor instance
*/
public OrderedDtpExecutor buildOrdered() {
ordered = true;
return (OrderedDtpExecutor) buildDtpExecutor(this);
}

/**
* Build thread pool executor and wrapper with ttl
*
Expand Down
2 changes: 1 addition & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<url>https://github.com/yanhom1314/dynamic-tp</url>

<properties>
<revision>1.1.6-3.x-alpha</revision>
<revision>1.1.6-3.x</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<maven.compiler.source>17</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ spring:
dynamic:
tp:
enabled: true
enabledBanner: true # 是否开启banner打印,默认true
enabledCollect: true # 是否开启监控指标采集,默认false
collectorTypes: logging # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer
collectorTypes: micrometer # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer
monitorInterval: 5
platforms: # 通知报警平台配置
- platform: wechat
Expand All @@ -25,35 +24,23 @@ spring:
- platform: lark
urlKey: 0d944ae7-b24a-40 # 替换
receivers: test1,test2 # 接受人飞书名称/openid
- platform: email
receivers: [email protected],[email protected] # 收件人
brpcTp: # brpc 线程池配置
- threadPoolName: rpc#server # 名称规则:biz_thread_pool_name + "#" + client/server
corePoolSize: 100
maximumPoolSize: 200
keepAliveTime: 60

# email notify configuration
mail:
# (optional) email subject, default:ThreadPool Notify
title: ThreadPool Notify
# mail service address
host: smtp.qq.com
port: 465
# send from
username: [email protected]
# authorization code eg: rlpadadtcugh4152e
password: xxxxxxxxxxxxxxxx
default-encoding: UTF-8
properties:
mail:
smtp:
socketFactoryClass: javax.net.ssl.SSLSocketFactory
ssl:
enable: true
debug: false

starlight:
server:
enable: true
port: 8777
port: 8777

management:
endpoints:
web:
exposure:
include: '*'
prometheus:
metrics:
export:
enabled: true
Loading

0 comments on commit 980c9dc

Please sign in to comment.