diff --git a/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java b/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java index 63515107f..7356cc52e 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java +++ b/common/src/main/java/org/dromara/dynamictp/common/em/CollectorTypeEnum.java @@ -33,7 +33,19 @@ public enum CollectorTypeEnum { */ LOGGING, + /** + * Micrometer collect type. + */ MICROMETER, - INTERNAL_LOGGING + /** + * Logging collect type. + */ + INTERNAL_LOGGING, + + /** + * JMX collect type. + */ + JMX + } diff --git a/common/src/main/java/org/dromara/dynamictp/common/entity/AlarmInfo.java b/common/src/main/java/org/dromara/dynamictp/common/entity/AlarmInfo.java index f83d241e5..55844c3a4 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/entity/AlarmInfo.java +++ b/common/src/main/java/org/dromara/dynamictp/common/entity/AlarmInfo.java @@ -17,10 +17,10 @@ package org.dromara.dynamictp.common.entity; +import lombok.Data; +import lombok.experimental.Accessors; import org.dromara.dynamictp.common.em.NotifyItemEnum; import org.dromara.dynamictp.common.util.DateUtil; -import lombok.Builder; -import lombok.Data; import java.util.concurrent.atomic.AtomicInteger; @@ -31,7 +31,7 @@ * @since 1.0.4 **/ @Data -@Builder +@Accessors(chain = true) public class AlarmInfo { private NotifyItemEnum notifyItem; diff --git a/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java b/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java index ab87007fa..c294765b5 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java +++ b/common/src/main/java/org/dromara/dynamictp/common/entity/ThreadPoolStats.java @@ -17,19 +17,18 @@ package org.dromara.dynamictp.common.entity; -import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; + /** * ThreadPoolStats related * * @author yanhom * @since 1.0.0 **/ -@EqualsAndHashCode(callSuper = true) @Data -@Builder +@EqualsAndHashCode(callSuper = true) public class ThreadPoolStats extends Metrics { /** diff --git a/core/src/main/java/org/dromara/dynamictp/core/converter/ExecutorConverter.java b/core/src/main/java/org/dromara/dynamictp/core/converter/ExecutorConverter.java index 24a4f3de6..70af16371 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/converter/ExecutorConverter.java +++ b/core/src/main/java/org/dromara/dynamictp/core/converter/ExecutorConverter.java @@ -36,7 +36,8 @@ **/ public class ExecutorConverter { - private ExecutorConverter() { } + private ExecutorConverter() { + } public static TpMainFields toMainFields(ExecutorWrapper executorWrapper) { TpMainFields mainFields = new TpMainFields(); @@ -82,20 +83,20 @@ public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) { } private static ThreadPoolStats convertCommon(ExecutorAdapter executor) { - return ThreadPoolStats.builder() - .corePoolSize(executor.getCorePoolSize()) - .maximumPoolSize(executor.getMaximumPoolSize()) - .poolSize(executor.getPoolSize()) - .activeCount(executor.getActiveCount()) - .largestPoolSize(executor.getLargestPoolSize()) - .queueType(executor.getQueueType()) - .queueCapacity(executor.getQueueCapacity()) - .queueSize(executor.getQueueSize()) - .queueRemainingCapacity(executor.getQueueRemainingCapacity()) - .taskCount(executor.getTaskCount()) - .completedTaskCount(executor.getCompletedTaskCount()) - .waitTaskCount(executor.getQueueSize()) - .rejectHandlerName(executor.getRejectHandlerType()) - .build(); + ThreadPoolStats poolStats = new ThreadPoolStats(); + poolStats.setCorePoolSize(executor.getCorePoolSize()); + poolStats.setMaximumPoolSize(executor.getMaximumPoolSize()); + poolStats.setPoolSize(executor.getPoolSize()); + poolStats.setActiveCount(executor.getActiveCount()); + poolStats.setLargestPoolSize(executor.getLargestPoolSize()); + poolStats.setQueueType(executor.getQueueType()); + poolStats.setQueueCapacity(executor.getQueueCapacity()); + poolStats.setQueueSize(executor.getQueueSize()); + poolStats.setQueueRemainingCapacity(executor.getQueueRemainingCapacity()); + poolStats.setTaskCount(executor.getTaskCount()); + poolStats.setCompletedTaskCount(executor.getCompletedTaskCount()); + poolStats.setWaitTaskCount(executor.getQueueSize()); + poolStats.setRejectHandlerName(executor.getRejectHandlerType()); + return poolStats; } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/handler/CollectorHandler.java b/core/src/main/java/org/dromara/dynamictp/core/handler/CollectorHandler.java index 7b8a38dd3..8275a3bc0 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/handler/CollectorHandler.java +++ b/core/src/main/java/org/dromara/dynamictp/core/handler/CollectorHandler.java @@ -25,6 +25,7 @@ import org.dromara.dynamictp.core.monitor.collector.LogCollector; import org.dromara.dynamictp.core.monitor.collector.MetricsCollector; import org.dromara.dynamictp.core.monitor.collector.MicroMeterCollector; +import org.dromara.dynamictp.core.monitor.collector.jmx.JMXCollector; import org.springframework.util.CollectionUtils; import java.util.List; @@ -48,9 +49,11 @@ private CollectorHandler() { MetricsCollector microMeterCollector = new MicroMeterCollector(); LogCollector logCollector = new LogCollector(); InternalLogCollector internalLogCollector = new InternalLogCollector(); + JMXCollector jmxCollector = new JMXCollector(); COLLECTORS.put(microMeterCollector.type(), microMeterCollector); COLLECTORS.put(logCollector.type(), logCollector); COLLECTORS.put(internalLogCollector.type(), internalLogCollector); + COLLECTORS.put(jmxCollector.type(), jmxCollector); } public void collect(ThreadPoolStats poolStats, List types) { diff --git a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/MicroMeterCollector.java b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/MicroMeterCollector.java index 8484fa0df..37b7f1d48 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/MicroMeterCollector.java +++ b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/MicroMeterCollector.java @@ -17,19 +17,19 @@ package org.dromara.dynamictp.core.monitor.collector; -import cn.hutool.core.bean.BeanUtil; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.dromara.dynamictp.common.em.CollectorTypeEnum; import org.dromara.dynamictp.common.entity.ThreadPoolStats; import org.dromara.dynamictp.common.util.CommonUtil; +import org.springframework.beans.BeanUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -61,7 +61,7 @@ public void collect(ThreadPoolStats threadPoolStats) { if (Objects.isNull(oldStats)) { GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats); } else { - BeanUtil.copyProperties(threadPoolStats, oldStats); + BeanUtils.copyProperties(threadPoolStats, oldStats); } gauge(GAUGE_CACHE.get(threadPoolStats.getPoolName())); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java new file mode 100644 index 000000000..94de51c33 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/JMXCollector.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.monitor.collector.jmx; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.common.em.CollectorTypeEnum; +import org.dromara.dynamictp.common.entity.ThreadPoolStats; +import org.dromara.dynamictp.core.monitor.collector.AbstractCollector; +import org.springframework.beans.BeanUtils; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * ThreadPoolStatsInfo related + * + * @author KamTo Hung + */ +@Slf4j +public class JMXCollector extends AbstractCollector { + + public static final String DTP_METRIC_NAME_PREFIX = "dtp.thread.pool"; + + /** + * thread pool stats map + */ + private static final Map GAUGE_CACHE = new ConcurrentHashMap<>(); + + @Override + public void collect(ThreadPoolStats threadPoolStats) { + if (GAUGE_CACHE.containsKey(threadPoolStats.getPoolName())) { + ThreadPoolStats poolStats = GAUGE_CACHE.get(threadPoolStats.getPoolName()); + BeanUtils.copyProperties(threadPoolStats, poolStats); + } else { + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName()); + ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats); + server.registerMBean(stats, name); + } catch (JMException e) { + log.error("collect thread pool stats error", e); + } + GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats); + } + } + + @Override + public String type() { + return CollectorTypeEnum.JMX.name().toLowerCase(); + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/ThreadPoolStatsJMX.java b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/ThreadPoolStatsJMX.java new file mode 100644 index 000000000..4b339084b --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/ThreadPoolStatsJMX.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.monitor.collector.jmx; + +import org.dromara.dynamictp.common.entity.ThreadPoolStats; + +/** + * @author KamTo Hung + */ +public class ThreadPoolStatsJMX implements ThreadPoolStatsMXBean { + + private ThreadPoolStats threadPoolStats; + + public ThreadPoolStatsJMX(ThreadPoolStats threadPoolStats) { + this.threadPoolStats = threadPoolStats; + } + + @Override + public ThreadPoolStats getThreadPoolStats() { + return this.threadPoolStats; + } + + @Override + public void setThreadPoolStats(ThreadPoolStats threadPoolStats) { + this.threadPoolStats = threadPoolStats; + } +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/ThreadPoolStatsMXBean.java b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/ThreadPoolStatsMXBean.java new file mode 100644 index 000000000..b907fe5b0 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/monitor/collector/jmx/ThreadPoolStatsMXBean.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.monitor.collector.jmx; + +import org.dromara.dynamictp.common.entity.ThreadPoolStats; + +import javax.management.MXBean; + + +/** + * ThreadPoolStatsMXBean related + * + * @author KamTo Hung + */ +@MXBean +public interface ThreadPoolStatsMXBean { + + /** + * get thread pool stats + * + * @return thread pool stats + */ + ThreadPoolStats getThreadPoolStats(); + + /** + * set thread pool stats + * + * @param threadPoolStats thread pool stats + */ + void setThreadPoolStats(ThreadPoolStats threadPoolStats); + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/notifier/alarm/AlarmCounter.java b/core/src/main/java/org/dromara/dynamictp/core/notifier/alarm/AlarmCounter.java index 1c7cdc794..5e1271d2b 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/notifier/alarm/AlarmCounter.java +++ b/core/src/main/java/org/dromara/dynamictp/core/notifier/alarm/AlarmCounter.java @@ -44,9 +44,7 @@ private AlarmCounter() { } public static void init(String threadPoolName, String notifyItemType) { String key = buildKey(threadPoolName, notifyItemType); - val alarmInfo = AlarmInfo.builder() - .notifyItem(NotifyItemEnum.of(notifyItemType)) - .build(); + val alarmInfo = new AlarmInfo().setNotifyItem(NotifyItemEnum.of(notifyItemType)); ALARM_INFO_CACHE.putIfAbsent(key, alarmInfo); } diff --git a/test/test-core/src/test/resources/dynamic-tp-demo.yml b/test/test-core/src/test/resources/dynamic-tp-demo.yml index 29cd15b36..4596e712b 100644 --- a/test/test-core/src/test/resources/dynamic-tp-demo.yml +++ b/test/test-core/src/test/resources/dynamic-tp-demo.yml @@ -5,11 +5,11 @@ spring: enabled: true enabledBanner: true # 是否开启banner打印,默认true enabledCollect: true # 是否开启监控指标采集,默认false - collectorTypes: micrometer,logging # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer + collectorTypes: jmx,micrometer,logging # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer logPath: /home/logs # 监控日志数据路径,默认 ${user.home}/logs monitorInterval: 5 # 监控时间间隔(报警判断、指标采集),默认5s configType: yml # 配置文件类型 - executors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量 + executors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量 - threadPoolName: testRunTimeoutDtpExecutor executorType: common # 线程池类型common、eager:适用于io密集型 corePoolSize: 1 @@ -25,7 +25,7 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) - taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 + taskWrapperNames: [ "ttl" ] # 任务包装器名称,集成TaskWrapper接口 - threadPoolName: testQueueTimeoutDtpExecutor executorType: common # 线程池类型common、eager:适用于io密集型 corePoolSize: 1 @@ -41,7 +41,7 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) - taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 + taskWrapperNames: [ "ttl" ] # 任务包装器名称,集成TaskWrapper接口 - threadPoolName: testRejectedQueueTimeoutCancelDtpExecutor executorType: common # 线程池类型common、eager:适用于io密集型 corePoolSize: 1 @@ -57,7 +57,7 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) - taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 + taskWrapperNames: [ "ttl" ] # 任务包装器名称,集成TaskWrapper接口 - threadPoolName: eagerDtpThreadPoolExecutor executorType: eager # 线程池类型common、eager:适用于io密集型 corePoolSize: 1 @@ -73,7 +73,7 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) - taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 + taskWrapperNames: [ "ttl" ] # 任务包装器名称,集成TaskWrapper接口 - threadPoolName: priorityDtpThreadPoolExecutor executorType: priority # 线程池类型common、eager:适用于io密集型 corePoolSize: 1 @@ -88,4 +88,4 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 10000 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 10000 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) - taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file + taskWrapperNames: [ "ttl" ] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file