Skip to content

Commit

Permalink
new task wrapper with task decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Apr 3, 2024
1 parent 82dddae commit 0f2801c
Showing 1 changed file with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry;
import org.dromara.dynamictp.common.util.ConstructorUtil;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.DtpRegistry;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.common.plugin.DtpInterceptorRegistry;
import org.dromara.dynamictp.core.support.DynamicTp;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy;
Expand Down Expand Up @@ -145,22 +145,8 @@ private Object doRegisterAndReturnCommon(Object bean, String poolName) {
ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor) bean;
val proxy = newProxy(poolName, poolTaskExecutor.getThreadPoolExecutor());
try {
Object taskDecorator = ReflectionUtil.getFieldValue("taskDecorator", poolTaskExecutor);
if (Objects.nonNull(taskDecorator)) {
TaskWrapper taskWrapper = (taskDecorator instanceof TaskWrapper) ? (TaskWrapper)taskDecorator : new TaskWrapper() {
@Override
public String name() {
return taskDecorator.getClass().getName();
}

@Override
public Runnable wrap(Runnable runnable) {
return ((TaskDecorator) taskDecorator).decorate(runnable);
}
};
ReflectionUtil.setFieldValue("taskWrappers",proxy, Lists.newArrayList(taskWrapper));
}
ReflectionUtil.setFieldValue("threadPoolExecutor", bean, proxy);
tryWrapTaskDecorator(poolTaskExecutor, proxy);
} catch (IllegalAccessException ignored) { }
DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE);
return bean;
Expand Down Expand Up @@ -196,4 +182,23 @@ private ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, Schedu
shutdownGracefulAsync(originExecutor, name, 0);
return proxy;
}

private void tryWrapTaskDecorator(ThreadPoolTaskExecutor poolTaskExecutor, ThreadPoolExecutorProxy proxy) throws IllegalAccessException {
Object taskDecorator = ReflectionUtil.getFieldValue("taskDecorator", poolTaskExecutor);
if (Objects.isNull(taskDecorator)) {
return;
}
TaskWrapper taskWrapper = (taskDecorator instanceof TaskWrapper) ? (TaskWrapper) taskDecorator : new TaskWrapper() {
@Override
public String name() {
return taskDecorator.getClass().getName();
}

@Override
public Runnable wrap(Runnable runnable) {
return ((TaskDecorator) taskDecorator).decorate(runnable);
}
};
ReflectionUtil.setFieldValue("taskWrappers", proxy, Lists.newArrayList(taskWrapper));
}
}

0 comments on commit 0f2801c

Please sign in to comment.