Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Jun 13, 2024
2 parents 7d4e5b7 + 7fb89d2 commit f84b378
Show file tree
Hide file tree
Showing 29 changed files with 636 additions and 130 deletions.
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void afterExecute(Runnable r, Throwable t);
>
> 4. 集成常用三方中间件内部线程池管理
**经过多个版本的迭代,目前最新版本 v1.1.6.1 具有以下特性**
**经过多个版本的迭代,目前最新版本 v1.1.7 具有以下特性**

- **代码零侵入**:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入

Expand Down Expand Up @@ -194,12 +194,6 @@ protected void afterExecute(Runnable r, Throwable t);

---

## 友情链接

- [HertzBeat](https://github.com/dromara/hertzbeat) : 易用友好的实时监控告警系统,无需Agent,强大自定义监控能力.

---

## 联系我

看到这儿,**请给项目一个 star**,你的支持是我们前进的动力!
Expand All @@ -214,6 +208,19 @@ protected void afterExecute(Runnable r, Throwable t);

---

## 友情链接

- [HertzBeat](https://github.com/dromara/hertzbeat) : 易用友好的实时监控告警系统,无需Agent,强大自定义监控能力.

---

## 特别赞助

**JNPF低代码开发平台**

<a href="https://www.jnpfsoft.com/?from=dynamic-tp"><img src="/resources/img/jnpfsoft.png"></a>

---
## 鸣谢

感谢 JetBrains 对开源项目的支持
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.dromara.dynamictp.jvmti.JVMTI;
import org.springframework.util.ReflectionUtils;

import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -102,6 +103,9 @@ public void adaptProducerExecutors() {
return;
}
for (DefaultMQProducer defaultMQProducer : beans) {
if (Objects.isNull(ReflectionUtils.findMethod(DefaultMQProducerImpl.class, "getAsyncSenderExecutor"))) {
continue;
}
val producer = (DefaultMQProducerImpl) ReflectionUtil.getFieldValue(DefaultMQProducer.class,
"defaultMQProducerImpl", defaultMQProducer);
if (Objects.isNull(producer)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class ThreadPoolStats extends Metrics {
*/
private int maximumPoolSize;

/**
* 空闲时间 (ms)
*/
private long keepAliveTime;

/**
* 队列类型
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static void execute(Executor executor, Runnable r) {
public static void beforeExecute(Executor executor, Thread t, Runnable r) {
for (ExecutorAware aware : EXECUTOR_AWARE_LIST) {
try {
aware.beforeExecute(executor, t, r);
r = aware.beforeExecuteWrap(executor, t, r);
} catch (Exception e) {
log.error("DynamicTp aware [{}], enhance beforeExecute error.", aware.getName(), e);
}
Expand All @@ -106,7 +106,7 @@ public static void beforeExecute(Executor executor, Thread t, Runnable r) {
public static void afterExecute(Executor executor, Runnable r, Throwable t) {
for (ExecutorAware aware : EXECUTOR_AWARE_LIST) {
try {
aware.afterExecute(executor, r, t);
r = aware.afterExecuteWrap(executor, r, t);
} catch (Exception e) {
log.error("DynamicTp aware [{}], enhance afterExecute error.", aware.getName(), e);
}
Expand Down Expand Up @@ -146,7 +146,7 @@ public static void terminated(Executor executor) {
public static void beforeReject(Runnable r, Executor executor) {
for (ExecutorAware aware : EXECUTOR_AWARE_LIST) {
try {
aware.beforeReject(r, executor);
r = aware.beforeRejectWrap(r, executor);
} catch (Exception e) {
log.error("DynamicTp aware [{}], enhance beforeReject error.", aware.getName(), e);
}
Expand All @@ -156,7 +156,7 @@ public static void beforeReject(Runnable r, Executor executor) {
public static void afterReject(Runnable r, Executor executor) {
for (ExecutorAware aware : EXECUTOR_AWARE_LIST) {
try {
aware.afterReject(r, executor);
r = aware.afterRejectWrap(r, executor);
} catch (Exception e) {
log.error("DynamicTp aware [{}], enhance afterReject error.", aware.getName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ default void beforeExecute(Executor executor, Thread t, Runnable r) {
// default no Operation
}

default Runnable beforeExecuteWrap(Executor executor, Thread t, Runnable r) {
beforeExecute(executor, t, r);
return r;
}

/**
* enhance afterExecute
*
Expand All @@ -102,6 +107,11 @@ default void afterExecute(Executor executor, Runnable r, Throwable t) {
// default no Operation
}

default Runnable afterExecuteWrap(Executor executor, Runnable r, Throwable t) {
afterExecute(executor, r, t);
return r;
}

/**
* enhance shutdown
*
Expand Down Expand Up @@ -139,6 +149,11 @@ default void beforeReject(Runnable r, Executor executor) {
// default no Operation
}

default Runnable beforeRejectWrap(Runnable r, Executor executor) {
beforeReject(r, executor);
return r;
}

/**
* enhance after reject
* @param r runnable
Expand All @@ -147,4 +162,9 @@ default void beforeReject(Runnable r, Executor executor) {
default void afterReject(Runnable r, Executor executor) {
// default no Operation
}

default Runnable afterRejectWrap(Runnable r, Executor executor) {
afterReject(r, executor);
return r;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public interface TaskEnhanceAware extends DtpAware {
*/
default Runnable getEnhancedTask(Runnable command, List<TaskWrapper> taskWrappers) {
Runnable wrapRunnable = command;
String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null;
if (CollectionUtils.isNotEmpty(taskWrappers)) {
for (TaskWrapper t : taskWrappers) {
wrapRunnable = t.wrap(wrapRunnable);
}
}
String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null;
return new DtpRunnable(command, wrapRunnable, taskName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
poolStats.setCompletedTaskCount(executor.getCompletedTaskCount());
poolStats.setWaitTaskCount(executor.getQueueSize());
poolStats.setRejectHandlerName(executor.getRejectHandlerType());
poolStats.setKeepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
return poolStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.core.monitor.collector.AbstractCollector;
import org.springframework.beans.BeanUtils;

import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ThreadPoolStatsInfo related
Expand All @@ -40,26 +37,15 @@ public class JMXCollector extends AbstractCollector {

public static final String DTP_METRIC_NAME_PREFIX = "dtp.thread.pool";

/**
* thread pool stats map
*/
private static final Map<String, ThreadPoolStats> GAUGE_CACHE = new ConcurrentHashMap<>();

@Override
public void collect(ThreadPoolStats threadPoolStats) {
if (GAUGE_CACHE.containsKey(threadPoolStats.getPoolName())) {
ThreadPoolStats poolStats = GAUGE_CACHE.get(threadPoolStats.getPoolName());
BeanUtils.copyProperties(threadPoolStats, poolStats);
} else {
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName());
ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats);
server.registerMBean(stats, name);
} catch (JMException e) {
log.error("collect thread pool stats error", e);
}
GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats);
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName());
ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats);
server.registerMBean(stats, name);
} catch (JMException e) {
log.error("collect thread pool stats error", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@

package org.dromara.dynamictp.core.spring;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry;
import org.dromara.dynamictp.common.util.ConstructorUtil;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.DtpRegistry;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry;
import org.dromara.dynamictp.core.support.DynamicTp;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
Expand All @@ -41,6 +43,7 @@
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.type.MethodMetadata;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
Expand Down Expand Up @@ -139,9 +142,11 @@ private Object registerAndReturnCommon(Object bean, String beanName) {

private Object doRegisterAndReturnCommon(Object bean, String poolName) {
if (bean instanceof ThreadPoolTaskExecutor) {
val proxy = newProxy(poolName, ((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor());
ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor) bean;
val proxy = newProxy(poolName, poolTaskExecutor.getThreadPoolExecutor());
try {
ReflectionUtil.setFieldValue("threadPoolExecutor", bean, proxy);
tryWrapTaskDecorator(poolTaskExecutor, proxy);
} catch (IllegalAccessException ignored) { }
DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE);
return bean;
Expand Down Expand Up @@ -177,4 +182,23 @@ private ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, Schedu
shutdownGracefulAsync(originExecutor, name, 0);
return proxy;
}

private void tryWrapTaskDecorator(ThreadPoolTaskExecutor poolTaskExecutor, ThreadPoolExecutorProxy proxy) throws IllegalAccessException {
Object taskDecorator = ReflectionUtil.getFieldValue("taskDecorator", poolTaskExecutor);
if (Objects.isNull(taskDecorator)) {
return;
}
TaskWrapper taskWrapper = (taskDecorator instanceof TaskWrapper) ? (TaskWrapper) taskDecorator : new TaskWrapper() {
@Override
public String name() {
return taskDecorator.getClass().getName();
}

@Override
public Runnable wrap(Runnable runnable) {
return ((TaskDecorator) taskDecorator).decorate(runnable);
}
};
ReflectionUtil.setFieldValue("taskWrappers", proxy, Lists.newArrayList(taskWrapper));
}
}
8 changes: 7 additions & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<url>https://github.com/yanhom1314/dynamic-tp</url>

<properties>
<revision>1.1.7-3.x</revision>
<revision>1.1.8-3.x</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<maven.compiler.source>17</maven.compiler.source>
Expand Down Expand Up @@ -433,6 +433,12 @@
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.dromara.dynamictp</groupId>
<artifactId>dynamic-tp-extension-agent</artifactId>
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.dromara.dynamictp</groupId>
<artifactId>dynamic-tp-extension-notify-email</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions extension/extension-agent/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dromara.dynamictp</groupId>
<artifactId>dynamic-tp-extension</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>dynamic-tp-extension-agent</artifactId>

<dependencies>
</dependencies>
</project>
Loading

0 comments on commit f84b378

Please sign in to comment.