Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dromara/dynamic-tp
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Jan 22, 2024
2 parents f078e80 + fb7505a commit 6f65745
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@ public enum CollectorTypeEnum {
*/
LOGGING,

/**
* Micrometer collect type.
*/
MICROMETER,

INTERNAL_LOGGING
/**
* Logging collect type.
*/
INTERNAL_LOGGING,

/**
* JMX collect type.
*/
JMX

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.dromara.dynamictp.common.entity;

import lombok.Data;
import lombok.experimental.Accessors;
import org.dromara.dynamictp.common.em.NotifyItemEnum;
import org.dromara.dynamictp.common.util.DateUtil;
import lombok.Builder;
import lombok.Data;

import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -31,7 +31,7 @@
* @since 1.0.4
**/
@Data
@Builder
@Accessors(chain = true)
public class AlarmInfo {

private NotifyItemEnum notifyItem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.dromara.dynamictp.common.entity;

import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;


/**
* ThreadPoolStats related
*
* @author yanhom
* @since 1.0.0
**/
@EqualsAndHashCode(callSuper = true)
@Data
@Builder
@EqualsAndHashCode(callSuper = true)
public class ThreadPoolStats extends Metrics {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
**/
public class ExecutorConverter {

private ExecutorConverter() { }
private ExecutorConverter() {
}

public static TpMainFields toMainFields(ExecutorWrapper executorWrapper) {
TpMainFields mainFields = new TpMainFields();
Expand Down Expand Up @@ -82,20 +83,20 @@ public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
return ThreadPoolStats.builder()
.corePoolSize(executor.getCorePoolSize())
.maximumPoolSize(executor.getMaximumPoolSize())
.poolSize(executor.getPoolSize())
.activeCount(executor.getActiveCount())
.largestPoolSize(executor.getLargestPoolSize())
.queueType(executor.getQueueType())
.queueCapacity(executor.getQueueCapacity())
.queueSize(executor.getQueueSize())
.queueRemainingCapacity(executor.getQueueRemainingCapacity())
.taskCount(executor.getTaskCount())
.completedTaskCount(executor.getCompletedTaskCount())
.waitTaskCount(executor.getQueueSize())
.rejectHandlerName(executor.getRejectHandlerType())
.build();
ThreadPoolStats poolStats = new ThreadPoolStats();
poolStats.setCorePoolSize(executor.getCorePoolSize());
poolStats.setMaximumPoolSize(executor.getMaximumPoolSize());
poolStats.setPoolSize(executor.getPoolSize());
poolStats.setActiveCount(executor.getActiveCount());
poolStats.setLargestPoolSize(executor.getLargestPoolSize());
poolStats.setQueueType(executor.getQueueType());
poolStats.setQueueCapacity(executor.getQueueCapacity());
poolStats.setQueueSize(executor.getQueueSize());
poolStats.setQueueRemainingCapacity(executor.getQueueRemainingCapacity());
poolStats.setTaskCount(executor.getTaskCount());
poolStats.setCompletedTaskCount(executor.getCompletedTaskCount());
poolStats.setWaitTaskCount(executor.getQueueSize());
poolStats.setRejectHandlerName(executor.getRejectHandlerType());
return poolStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,42 @@

/**
* Priority related
* <p>The {@link #getPriority()} is optional and represents a priority value as defined in the
* {@link Priority} interface. Lower values have higher priority. The default value is
* {@code Priority.LOWEST_PRECEDENCE}, indicating the lowest priority (losing to any
* other specified priority value).
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
* @since 1.1.7
*/
public interface Priority {

/**
* Useful constant for the highest precedence value.
*
* @see java.lang.Integer#MIN_VALUE
*/
int HIGHEST_PRECEDENCE = Integer.MIN_VALUE;

/**
* Useful constant for the lowest precedence value.
*
* @see java.lang.Integer#MAX_VALUE
*/
int LOWEST_PRECEDENCE = Integer.MAX_VALUE;

/**
* Get the priority value of this object.
* <p>Higher values are interpreted as lower priority. As a consequence,
* the object with the lowest value has the highest priority (somewhat
* analogous to Servlet {@code load-on-startup} values).
* <p>Same priority values will result in arbitrary sort positions for the
* affected objects.
*
* @return the priority value
* @see #HIGHEST_PRECEDENCE
* @see #LOWEST_PRECEDENCE
*/
int getPriority();

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* PriorityCallable related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
* @since 1.1.7
*/
public class PriorityCallable<V> implements Priority, Callable<V> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@
* PriorityDtpExecutor related, extending DtpExecutor, implements priority feature
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
* @since 1.1.7
*/
@Slf4j
public class PriorityDtpExecutor extends DtpExecutor {

/**
* The default priority.
*/
private static final int DEFAULT_PRIORITY = 0;

public PriorityDtpExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
Expand Down Expand Up @@ -133,7 +129,7 @@ public void execute(Runnable command, int priority) {

@Override
public Future<?> submit(Runnable task) {
return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY));
return super.submit(PriorityRunnable.of(task, Priority.LOWEST_PRECEDENCE));
}

public Future<?> submit(Runnable task, int priority) {
Expand All @@ -142,7 +138,7 @@ public Future<?> submit(Runnable task, int priority) {

@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY), result);
return super.submit(PriorityRunnable.of(task, Priority.LOWEST_PRECEDENCE), result);
}

public <T> Future<T> submit(Runnable task, T result, int priority) {
Expand All @@ -151,7 +147,7 @@ public <T> Future<T> submit(Runnable task, T result, int priority) {

@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(PriorityCallable.of(task, DEFAULT_PRIORITY));
return super.submit(PriorityCallable.of(task, Priority.LOWEST_PRECEDENCE));
}

public <T> Future<T> submit(Callable<T> task, int priority) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* PriorityFutureTask related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
* @since 1.1.7
*/
public class PriorityFutureTask<V> extends FutureTask<V> implements Priority {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* PriorityRunnable related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
* @since 1.1.7
*/
public class PriorityRunnable implements Priority, Runnable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dromara.dynamictp.core.monitor.collector.LogCollector;
import org.dromara.dynamictp.core.monitor.collector.MetricsCollector;
import org.dromara.dynamictp.core.monitor.collector.MicroMeterCollector;
import org.dromara.dynamictp.core.monitor.collector.jmx.JMXCollector;
import org.springframework.util.CollectionUtils;

import java.util.List;
Expand All @@ -48,9 +49,11 @@ private CollectorHandler() {
MetricsCollector microMeterCollector = new MicroMeterCollector();
LogCollector logCollector = new LogCollector();
InternalLogCollector internalLogCollector = new InternalLogCollector();
JMXCollector jmxCollector = new JMXCollector();
COLLECTORS.put(microMeterCollector.type(), microMeterCollector);
COLLECTORS.put(logCollector.type(), logCollector);
COLLECTORS.put(internalLogCollector.type(), internalLogCollector);
COLLECTORS.put(jmxCollector.type(), jmxCollector);
}

public void collect(ThreadPoolStats poolStats, List<String> types) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.dromara.dynamictp.core.monitor.collector;

import cn.hutool.core.bean.BeanUtil;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.util.CommonUtil;
import org.springframework.beans.BeanUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -61,7 +61,7 @@ public void collect(ThreadPoolStats threadPoolStats) {
if (Objects.isNull(oldStats)) {
GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats);
} else {
BeanUtil.copyProperties(threadPoolStats, oldStats);
BeanUtils.copyProperties(threadPoolStats, oldStats);
}
gauge(GAUGE_CACHE.get(threadPoolStats.getPoolName()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.dynamictp.core.monitor.collector.jmx;

import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.em.CollectorTypeEnum;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.core.monitor.collector.AbstractCollector;
import org.springframework.beans.BeanUtils;

import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ThreadPoolStatsInfo related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
@Slf4j
public class JMXCollector extends AbstractCollector {

public static final String DTP_METRIC_NAME_PREFIX = "dtp.thread.pool";

/**
* thread pool stats map
*/
private static final Map<String, ThreadPoolStats> GAUGE_CACHE = new ConcurrentHashMap<>();

@Override
public void collect(ThreadPoolStats threadPoolStats) {
if (GAUGE_CACHE.containsKey(threadPoolStats.getPoolName())) {
ThreadPoolStats poolStats = GAUGE_CACHE.get(threadPoolStats.getPoolName());
BeanUtils.copyProperties(threadPoolStats, poolStats);
} else {
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName());
ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats);
server.registerMBean(stats, name);
} catch (JMException e) {
log.error("collect thread pool stats error", e);
}
GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats);
}
}

@Override
public String type() {
return CollectorTypeEnum.JMX.name().toLowerCase();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.dynamictp.core.monitor.collector.jmx;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;

/**
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
public class ThreadPoolStatsJMX implements ThreadPoolStatsMXBean {

private ThreadPoolStats threadPoolStats;

public ThreadPoolStatsJMX(ThreadPoolStats threadPoolStats) {
this.threadPoolStats = threadPoolStats;
}

@Override
public ThreadPoolStats getThreadPoolStats() {
return this.threadPoolStats;
}

@Override
public void setThreadPoolStats(ThreadPoolStats threadPoolStats) {
this.threadPoolStats = threadPoolStats;
}
}
Loading

0 comments on commit 6f65745

Please sign in to comment.