Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #305] config file supports plugins config #306

Merged
merged 1 commit into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.dromara.dynamictp.common.constant;

import org.dromara.dynamictp.common.em.NotifyItemEnum;
import com.google.common.collect.ImmutableList;
import org.dromara.dynamictp.common.em.NotifyItemEnum;

import java.util.List;

Expand All @@ -30,8 +30,7 @@
**/
public final class DynamicTpConst {

private DynamicTpConst() {
}
private DynamicTpConst() { }

public static final String MAIN_PROPERTIES_PREFIX = "spring.dynamic.tp";

Expand Down Expand Up @@ -79,10 +78,8 @@ private DynamicTpConst() {
public static final String QUEUE_TIMEOUT = "queueTimeout";

public static final String TASK_WRAPPERS = "taskWrappers";
/**
* alarm
*/
public static final String ALARM_NAME = "dtp-alarm";

public static final String PLUGIN_NAMES = "pluginNames";

/**
* symbol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.dromara.dynamictp.common.entity;

import org.dromara.dynamictp.common.em.QueueTypeEnum;
import org.dromara.dynamictp.common.em.RejectedTypeEnum;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.dromara.dynamictp.common.em.QueueTypeEnum;
import org.dromara.dynamictp.common.em.RejectedTypeEnum;

import java.util.Set;

Expand Down Expand Up @@ -114,6 +114,11 @@ public class DtpExecutorProps extends TpExecutorProps {
*/
private Set<String> taskWrapperNames;

/**
* Plugin names.
*/
private Set<String> pluginNames;

/**
* 检查核心参数
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
*/
public class DtpInterceptorProxyFactory {

private DtpInterceptorProxyFactory() { }

public static Object enhance(Object target, DtpInterceptor interceptor) {
return enhance(target, null, null, interceptor);
}
Expand All @@ -56,7 +58,7 @@ private static Map<Class<?>, Set<Method>> getSignatureMap(DtpInterceptor interce
throw new PluginException("No @DtpIntercepts annotation was found in interceptor " + interceptor.getClass().getName());
}

DtpSignature[] signatures = interceptsAnno.value();
DtpSignature[] signatures = interceptsAnno.signatures();
Map<Class<?>, Set<Method>> signatureMap = Maps.newHashMap();
for (DtpSignature signature : signatures) {
Set<Method> methods = signatureMap.computeIfAbsent(signature.clazz(), k -> new HashSet<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.dromara.dynamictp.core.plugin;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.common.util.StringUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.stream.Collectors.toList;

/**
* DtpInterceptorRegistry related
Expand All @@ -37,37 +46,64 @@ public class DtpInterceptorRegistry {
/**
* Maintain all automatically registered and manually registered INTERCEPTORS
*/
private static final List<DtpInterceptor> INTERCEPTORS = new ArrayList<>();
private static final Map<String, DtpInterceptor> INTERCEPTORS = Maps.newConcurrentMap();

static {
static {
List<DtpInterceptor> loadedInterceptors = ExtensionServiceLoader.get(DtpInterceptor.class);
if (CollectionUtils.isNotEmpty(loadedInterceptors)) {
INTERCEPTORS.addAll(loadedInterceptors);
loadedInterceptors.forEach(x -> {
DtpIntercepts interceptsAnno = x.getClass().getAnnotation(DtpIntercepts.class);
if (Objects.nonNull(interceptsAnno)) {
String name = StringUtils.isBlank(interceptsAnno.name()) ? x.getClass().getSimpleName() : interceptsAnno.name();
INTERCEPTORS.put(name, x);
}
});
}
}

private DtpInterceptorRegistry() { }

public static void register(DtpInterceptor dtpInterceptor) {
log.info("DynamicTp register DtpInterceptor: {}", dtpInterceptor);
INTERCEPTORS.add(dtpInterceptor);
public static void register(String name, DtpInterceptor dtpInterceptor) {
log.info("DynamicTp register DtpInterceptor, name: {}, interceptor: {}", name, dtpInterceptor);
INTERCEPTORS.put(name, dtpInterceptor);
}

public static List<DtpInterceptor> getInterceptors() {
return Collections.unmodifiableList(INTERCEPTORS);
public static Map<String, DtpInterceptor> getInterceptors() {
return Collections.unmodifiableMap(INTERCEPTORS);
}

public static Object pluginAll(Object target) {
for (DtpInterceptor dtpInterceptor : INTERCEPTORS) {
target = dtpInterceptor.plugin(target);
return plugin(target, INTERCEPTORS.keySet());
}

public static Object pluginAll(Object target, Class<?>[] argTypes, Object[] args) {
return plugin(target, INTERCEPTORS.keySet(), argTypes, args);
}

public static Object plugin(Object target, Set<String> interceptors) {
val filterInterceptors = getInterceptors(interceptors);
for (DtpInterceptor interceptor : filterInterceptors) {
target = interceptor.plugin(target);
}
return target;
}

public static Object pluginAll(Object target, Class<?>[] argumentTypes, Object[] arguments) {
for (DtpInterceptor dtpInterceptor : INTERCEPTORS) {
target = dtpInterceptor.plugin(target, argumentTypes, arguments);
public static Object plugin(Object target, Set<String> interceptors, Class<?>[] argTypes, Object[] args) {
val filterInterceptors = getInterceptors(interceptors);
for (DtpInterceptor interceptor : filterInterceptors) {
target = interceptor.plugin(target, argTypes, args);
}
return target;
}

private static Collection<DtpInterceptor> getInterceptors(Set<String> interceptors) {
if (CollectionUtils.isEmpty(interceptors)) {
return INTERCEPTORS.values();
}
return INTERCEPTORS.entrySet()
.stream()
.filter(x -> StringUtil.containsIgnoreCase(x.getKey(), interceptors))
.map(Map.Entry::getValue)
.collect(toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
@Target(ElementType.TYPE)
public @interface DtpIntercepts {

DtpSignature[] value();
/**
* Intercept name.
*
* @return the intercept name.
*/
String name();

/**
* Signatures.
*
* @return the dtp signatures.
*/
DtpSignature[] signatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ENABLED;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLUGIN_NAMES;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PRE_START_ALL_CORE_THREADS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.QUEUE_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_ENHANCED;
Expand Down Expand Up @@ -110,6 +111,7 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {

val taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
propertyValues.put(TASK_WRAPPERS, taskWrappers);
propertyValues.put(PLUGIN_NAMES, props.getPluginNames());

return propertyValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.dynamictp.core.spring;

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.util.ConstructorUtil;
import org.dromara.dynamictp.core.DtpRegistry;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -65,25 +67,25 @@ public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull Stri
}

if (bean instanceof DtpExecutor) {
// register DtpExecutor
DtpExecutor dtpExecutor = (DtpExecutor) bean;
Object[] args = ConstructorUtil.buildTpExecutorConstructorArgs(dtpExecutor);
Class<?>[] argTypes = ConstructorUtil.buildTpExecutorConstructorArgTypes();
bean = DtpInterceptorRegistry.pluginAll(bean, argTypes, args);
registerDtp(bean);
} else {
// register ThreadPoolExecutor or ThreadPoolTaskExecutor
registerCommon(bean, beanName);
return registerDtp(bean);
}
// register ThreadPoolExecutor or ThreadPoolTaskExecutor
registerCommon(bean, beanName);
return bean;
}

private void registerDtp(Object bean) {
private Object registerDtp(Object bean) {
DtpExecutor dtpExecutor = (DtpExecutor) bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
Object[] args = ConstructorUtil.buildTpExecutorConstructorArgs(dtpExecutor);
Class<?>[] argTypes = ConstructorUtil.buildTpExecutorConstructorArgTypes();
Set<String> pluginNames = dtpExecutor.getPluginNames();

val enhancedBean = (DtpExecutor) DtpInterceptorRegistry.plugin(bean, pluginNames, argTypes, args);
if (enhancedBean instanceof EagerDtpExecutor) {
((TaskQueue) enhancedBean.getQueue()).setExecutor((EagerDtpExecutor) enhancedBean);
}
DtpRegistry.registerExecutor(ExecutorWrapper.of(dtpExecutor), "beanPostProcessor");
DtpRegistry.registerExecutor(ExecutorWrapper.of(enhancedBean), "beanPostProcessor");
return enhancedBean;
}

private void registerCommon(Object bean, String beanName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.dynamictp.core.thread;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dynamictp.common.em.NotifyItemEnum;
Expand All @@ -33,14 +34,15 @@

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.LongAdder;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID;
Expand Down Expand Up @@ -85,6 +87,11 @@ public class DtpExecutor extends ThreadPoolExecutor
*/
private List<TaskWrapper> taskWrappers = Lists.newArrayList();

/**
* Plugin names.
*/
private Set<String> pluginNames = Sets.newHashSet();

/**
* If pre start all core threads.
*/
Expand Down Expand Up @@ -313,6 +320,14 @@ public void setTaskWrappers(List<TaskWrapper> taskWrappers) {
this.taskWrappers = taskWrappers;
}

public Set<String> getPluginNames() {
return pluginNames;
}

public void setPluginNames(Set<String> pluginNames) {
this.pluginNames = pluginNames;
}

public boolean isPreStartAllCoreThreads() {
return preStartAllCoreThreads;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
/**
* @author windsearcher.lq
*/
@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
})
@DtpIntercepts(
name = "testExecuteInterceptor1",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
}
)
@Slf4j
public class TestExecuteInterceptor implements DtpInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
/**
* @author windsearcher.lq
*/
@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
})
@DtpIntercepts(
name = "testExecuteInterceptor2",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class}),
@DtpSignature(clazz = ScheduledDtpExecutor.class, method = "execute", args = {Runnable.class})
}
)
@Slf4j
public class TestExecuteInterceptor implements DtpInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
* @author yanhom
* @since 1.1.4
**/
@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "beforeExecute", args = {Thread.class, Runnable.class}),
@DtpSignature(clazz = DtpExecutor.class, method = "afterExecute", args = {Runnable.class, Throwable.class})
})
@DtpIntercepts(
name = "TtlInterceptor",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "beforeExecute", args = {Thread.class, Runnable.class}),
@DtpSignature(clazz = DtpExecutor.class, method = "afterExecute", args = {Runnable.class, Throwable.class})
}
)
@Slf4j
public class TtlDtpInterceptor implements DtpInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.dromara.dynamictp.core.plugin.DtpSignature;
import org.dromara.dynamictp.core.thread.DtpExecutor;

@DtpIntercepts({
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class})
})
@DtpIntercepts(
name = "testInterceptorLoader",
signatures = {
@DtpSignature(clazz = DtpExecutor.class, method = "execute", args = {Runnable.class})
}
)
@Slf4j
public class TestInterceptorLoader implements DtpInterceptor {

Expand Down