Skip to content

Commit

Permalink
[ISSUE #305] config file supports plugins config
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Jul 8, 2023
1 parent 820ff83 commit 4e01d20
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.dromara.dynamictp.common.constant;

import org.dromara.dynamictp.common.em.NotifyItemEnum;
import com.google.common.collect.ImmutableList;
import org.dromara.dynamictp.common.em.NotifyItemEnum;

import java.util.List;

Expand All @@ -30,8 +30,7 @@
**/
public final class DynamicTpConst {

private DynamicTpConst() {
}
private DynamicTpConst() { }

public static final String MAIN_PROPERTIES_PREFIX = "spring.dynamic.tp";

Expand Down Expand Up @@ -79,10 +78,8 @@ private DynamicTpConst() {
public static final String QUEUE_TIMEOUT = "queueTimeout";

public static final String TASK_WRAPPERS = "taskWrappers";
/**
* alarm
*/
public static final String ALARM_NAME = "dtp-alarm";

public static final String PLUGIN_NAMES = "pluginNames";

/**
* symbol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.dromara.dynamictp.common.entity;

import org.dromara.dynamictp.common.em.QueueTypeEnum;
import org.dromara.dynamictp.common.em.RejectedTypeEnum;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.dromara.dynamictp.common.em.QueueTypeEnum;
import org.dromara.dynamictp.common.em.RejectedTypeEnum;

import java.util.Set;

Expand Down Expand Up @@ -114,6 +114,11 @@ public class DtpExecutorProps extends TpExecutorProps {
*/
private Set<String> taskWrapperNames;

/**
* Plugin names.
*/
private Set<String> pluginNames;

/**
* 检查核心参数
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
*/
public class DtpInterceptorProxyFactory {

private DtpInterceptorProxyFactory() { }

public static Object enhance(Object target, DtpInterceptor interceptor) {
return enhance(target, null, null, interceptor);
}
Expand All @@ -56,7 +58,7 @@ private static Map<Class<?>, Set<Method>> getSignatureMap(DtpInterceptor interce
throw new PluginException("No @DtpIntercepts annotation was found in interceptor " + interceptor.getClass().getName());
}

DtpSignature[] signatures = interceptsAnno.value();
DtpSignature[] signatures = interceptsAnno.signatures();
Map<Class<?>, Set<Method>> signatureMap = Maps.newHashMap();
for (DtpSignature signature : signatures) {
Set<Method> methods = signatureMap.computeIfAbsent(signature.clazz(), k -> new HashSet<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.dromara.dynamictp.core.plugin;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.common.util.StringUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.stream.Collectors.toList;

/**
* DtpInterceptorRegistry related
Expand All @@ -37,37 +46,64 @@ public class DtpInterceptorRegistry {
/**
* Maintain all automatically registered and manually registered INTERCEPTORS
*/
private static final List<DtpInterceptor> INTERCEPTORS = new ArrayList<>();
private static final Map<String, DtpInterceptor> INTERCEPTORS = Maps.newConcurrentMap();

static {
static {
List<DtpInterceptor> loadedInterceptors = ExtensionServiceLoader.get(DtpInterceptor.class);
if (CollectionUtils.isNotEmpty(loadedInterceptors)) {
INTERCEPTORS.addAll(loadedInterceptors);
loadedInterceptors.forEach(x -> {
DtpIntercepts interceptsAnno = x.getClass().getAnnotation(DtpIntercepts.class);
if (Objects.nonNull(interceptsAnno)) {
String name = StringUtils.isBlank(interceptsAnno.name()) ? x.getClass().getSimpleName() : interceptsAnno.name();
INTERCEPTORS.put(name, x);
}
});
}
}

private DtpInterceptorRegistry() { }

public static void register(DtpInterceptor dtpInterceptor) {
log.info("DynamicTp register DtpInterceptor: {}", dtpInterceptor);
INTERCEPTORS.add(dtpInterceptor);
public static void register(String name, DtpInterceptor dtpInterceptor) {
log.info("DynamicTp register DtpInterceptor, name: {}, interceptor: {}", name, dtpInterceptor);
INTERCEPTORS.put(name, dtpInterceptor);
}

public static List<DtpInterceptor> getInterceptors() {
return Collections.unmodifiableList(INTERCEPTORS);
public static Map<String, DtpInterceptor> getInterceptors() {
return Collections.unmodifiableMap(INTERCEPTORS);
}

public static Object pluginAll(Object target) {
for (DtpInterceptor dtpInterceptor : INTERCEPTORS) {
target = dtpInterceptor.plugin(target);
return plugin(target, INTERCEPTORS.keySet());
}

public static Object pluginAll(Object target, Class<?>[] argTypes, Object[] args) {
return plugin(target, INTERCEPTORS.keySet(), argTypes, args);
}

public static Object plugin(Object target, Set<String> interceptors) {
val filterInterceptors = getInterceptors(interceptors);
for (DtpInterceptor interceptor : filterInterceptors) {
target = interceptor.plugin(target);
}
return target;
}

public static Object pluginAll(Object target, Class<?>[] argumentTypes, Object[] arguments) {
for (DtpInterceptor dtpInterceptor : INTERCEPTORS) {
target = dtpInterceptor.plugin(target, argumentTypes, arguments);
public static Object plugin(Object target, Set<String> interceptors, Class<?>[] argTypes, Object[] args) {
val filterInterceptors = getInterceptors(interceptors);
for (DtpInterceptor interceptor : filterInterceptors) {
target = interceptor.plugin(target, argTypes, args);
}
return target;
}

private static Collection<DtpInterceptor> getInterceptors(Set<String> interceptors) {
if (CollectionUtils.isEmpty(interceptors)) {
return INTERCEPTORS.values();
}
return INTERCEPTORS.entrySet()
.stream()
.filter(x -> StringUtil.containsIgnoreCase(x.getKey(), interceptors))
.map(Map.Entry::getValue)
.collect(toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
@Target(ElementType.TYPE)
public @interface DtpIntercepts {

DtpSignature[] value();
/**
* Intercept name.
*
* @return the intercept name.
*/
String name();

/**
* Signatures.
*
* @return the dtp signatures.
*/
DtpSignature[] signatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ENABLED;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLUGIN_NAMES;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PRE_START_ALL_CORE_THREADS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.QUEUE_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_ENHANCED;
Expand Down Expand Up @@ -110,6 +111,7 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {

val taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
propertyValues.put(TASK_WRAPPERS, taskWrappers);
propertyValues.put(PLUGIN_NAMES, props.getPluginNames());

return propertyValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.dynamictp.core.spring;

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.util.ConstructorUtil;
import org.dromara.dynamictp.core.DtpRegistry;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -65,25 +67,25 @@ public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull Stri
}

if (bean instanceof DtpExecutor) {
// register DtpExecutor
DtpExecutor dtpExecutor = (DtpExecutor) bean;
Object[] args = ConstructorUtil.buildTpExecutorConstructorArgs(dtpExecutor);
Class<?>[] argTypes = ConstructorUtil.buildTpExecutorConstructorArgTypes();
bean = DtpInterceptorRegistry.pluginAll(bean, argTypes, args);
registerDtp(bean);
} else {
// register ThreadPoolExecutor or ThreadPoolTaskExecutor
registerCommon(bean, beanName);
return registerDtp(bean);
}
// register ThreadPoolExecutor or ThreadPoolTaskExecutor
registerCommon(bean, beanName);
return bean;
}

private void registerDtp(Object bean) {
private Object registerDtp(Object bean) {
DtpExecutor dtpExecutor = (DtpExecutor) bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
Object[] args = ConstructorUtil.buildTpExecutorConstructorArgs(dtpExecutor);
Class<?>[] argTypes = ConstructorUtil.buildTpExecutorConstructorArgTypes();
Set<String> pluginNames = dtpExecutor.getPluginNames();

val enhancedBean = (DtpExecutor) DtpInterceptorRegistry.plugin(bean, pluginNames, argTypes, args);
if (enhancedBean instanceof EagerDtpExecutor) {
((TaskQueue) enhancedBean.getQueue()).setExecutor((EagerDtpExecutor) enhancedBean);
}
DtpRegistry.registerExecutor(ExecutorWrapper.of(dtpExecutor), "beanPostProcessor");
DtpRegistry.registerExecutor(ExecutorWrapper.of(enhancedBean), "beanPostProcessor");
return enhancedBean;
}

private void registerCommon(Object bean, String beanName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.dynamictp.core.thread;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dynamictp.common.em.NotifyItemEnum;
Expand All @@ -33,14 +34,15 @@

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.LongAdder;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID;
Expand Down Expand Up @@ -85,6 +87,11 @@ public class DtpExecutor extends ThreadPoolExecutor
*/
private List<TaskWrapper> taskWrappers = Lists.newArrayList();

/**
* Plugin names.
*/
private Set<String> pluginNames = Sets.newHashSet();

/**
* If pre start all core threads.
*/
Expand Down Expand Up @@ -313,6 +320,14 @@ public void setTaskWrappers(List<TaskWrapper> taskWrappers) {
this.taskWrappers = taskWrappers;
}

public Set<String> getPluginNames() {
return pluginNames;
}

public void setPluginNames(Set<String> pluginNames) {
this.pluginNames = pluginNames;
}

public boolean isPreStartAllCoreThreads() {
return preStartAllCoreThreads;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
/**
* @author windsearcher.lq
*/
@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
})
@DtpIntercepts(
name = "testExecuteInterceptor1",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
}
)
@Slf4j
public class TestExecuteInterceptor implements DtpInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
/**
* @author windsearcher.lq
*/
@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
})
@DtpIntercepts(
name = "testExecuteInterceptor2",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
}
)
@Slf4j
public class TestExecuteInterceptor implements DtpInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
* @author yanhom
* @since 1.1.4
**/
@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "beforeExecute", args = {Thread.class, Runnable.class}),
@DtpSignature(clazz = DtpExecutor.class, method = "afterExecute", args = {Runnable.class, Throwable.class})
})
@DtpIntercepts(
name = "TtlInterceptor",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "beforeExecute", args = {Thread.class, Runnable.class}),
@DtpSignature(clazz = DtpExecutor.class, method = "afterExecute", args = {Runnable.class, Throwable.class})
}
)
@Slf4j
public class TtlDtpInterceptor implements DtpInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.dromara.dynamictp.core.plugin.DtpSignature;
import org.dromara.dynamictp.core.thread.DtpExecutor;

@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class})
})
@DtpIntercepts(
name = "testInterceptorLoader",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class})
}
)
@Slf4j
public class TestInterceptorLoader implements DtpInterceptor {

Expand Down

0 comments on commit 4e01d20

Please sign in to comment.