diff --git a/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java b/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java index 15f2466ee..fadf9508c 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java +++ b/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java @@ -35,19 +35,19 @@ public interface TaskEnhanceAware extends DtpAware { /** * Enhance task * - * @param command command + * @param command command * @param taskWrappers task wrappers * @return enhanced task */ default Runnable getEnhancedTask(Runnable command, List taskWrappers) { + Runnable wrapRunnable = command; if (CollectionUtils.isNotEmpty(taskWrappers)) { for (TaskWrapper t : taskWrappers) { - command = t.wrap(command); + wrapRunnable = t.wrap(wrapRunnable); } } - String taskName = (command instanceof NamedRunnable) ? ((NamedRunnable) command).getName() : null; - command = new DtpRunnable(command, taskName); - return command; + String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null; + return new DtpRunnable(command, wrapRunnable, taskName); } /** diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java b/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java index e5888bb12..4f60669a9 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java @@ -19,6 +19,7 @@ import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import lombok.Getter; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; /** * ExecutorType related @@ -30,12 +31,29 @@ public enum ExecutorType { /** - * Executor type. + * Common executor type. */ COMMON("common", DtpExecutor.class), + + /** + * Eager executor type. + */ EAGER("eager", EagerDtpExecutor.class), + + /** + * Scheduled executor type. + */ SCHEDULED("scheduled", ScheduledDtpExecutor.class), - ORDERED("ordered", OrderedDtpExecutor.class); + + /** + * Ordered executor type. + */ + ORDERED("ordered", OrderedDtpExecutor.class), + + /** + * Priority executor type. + */ + PRIORITY("priority", PriorityDtpExecutor.class); private final String name; @@ -54,4 +72,5 @@ public static Class getClass(String name) { } return COMMON.getClazz(); } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java new file mode 100644 index 000000000..594a7523a --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.executor.priority; + +/** + * Priority related + * + * @author KamTo Hung + */ +public interface Priority { + + int getPriority(); + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java new file mode 100644 index 000000000..f8262ca91 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.executor.priority; + +import lombok.Getter; + +import java.util.concurrent.Callable; + +/** + * PriorityCallable related + * + * @author KamTo Hung + */ +public class PriorityCallable implements Priority, Callable { + + private final Callable callable; + + @Getter + private final int priority; + + private PriorityCallable(Callable callable, int priority) { + this.callable = callable; + this.priority = priority; + } + + public static Callable of(Callable task, int i) { + return new PriorityCallable<>(task, i); + } + + @Override + public V call() throws Exception { + return callable.call(); + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java new file mode 100644 index 000000000..5f9eb432f --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.executor.priority; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.DtpExecutor; +import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable; + +import java.util.Comparator; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * PriorityDtpExecutor related, extending DtpExecutor, implements priority feature + * + * @author KamTo Hung + */ +@Slf4j +public class PriorityDtpExecutor extends DtpExecutor { + + /** + * The default priority. + */ + private static final int DEFAULT_PRIORITY = 0; + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), handler); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, handler); + } + + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new PriorityFutureTask<>(runnable, value); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new PriorityFutureTask<>(callable); + } + + public void execute(Runnable command, int priority) { + super.execute(PriorityRunnable.of(command, priority)); + } + + @Override + public Future submit(Runnable task) { + return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY)); + } + + public Future submit(Runnable task, int priority) { + return super.submit(PriorityRunnable.of(task, priority)); + } + + @Override + public Future submit(Runnable task, T result) { + return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY), result); + } + + public Future submit(Runnable task, T result, int priority) { + return super.submit(PriorityRunnable.of(task, priority), result); + } + + + @Override + public Future submit(Callable task) { + return super.submit(PriorityCallable.of(task, DEFAULT_PRIORITY)); + } + + public Future submit(Callable task, int priority) { + return super.submit(PriorityCallable.of(task, priority)); + } + + + /** + * Priority Comparator + * + * @return Comparator + */ + public static Comparator getRunnableComparator() { + return (o1, o2) -> { + if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) { + return 0; + } + Runnable po1 = ((DtpRunnable) o1).getOriginRunnable(); + Runnable po2 = ((DtpRunnable) o2).getOriginRunnable(); + if (po1 instanceof Priority && po2 instanceof Priority) { + return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority()); + } + return 0; + }; + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java new file mode 100644 index 000000000..d9dc03c6f --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.executor.priority; + + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +/** + * PriorityFutureTask related + * + * @author KamTo Hung + */ +public class PriorityFutureTask extends FutureTask implements Priority { + + /** + * The runnable. + */ + private final Priority obj; + + private final int priority; + + public PriorityFutureTask(Runnable runnable, V result) { + super(runnable, result); + this.obj = (PriorityRunnable) runnable; + this.priority = this.obj.getPriority(); + } + + public PriorityFutureTask(Callable callable) { + super(callable); + this.obj = (PriorityCallable) callable; + this.priority = this.obj.getPriority(); + } + + @Override + public int getPriority() { + return this.priority; + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java new file mode 100644 index 000000000..d6eb62450 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dromara.dynamictp.core.executor.priority; + + +import lombok.Getter; + +/** + * PriorityRunnable related + * + * @author KamTo Hung + */ +public class PriorityRunnable implements Priority, Runnable { + + private final Runnable runnable; + + @Getter + private final int priority; + + private PriorityRunnable(Runnable runnable, int priority) { + this.runnable = runnable; + this.priority = priority; + } + + @Override + public void run() { + this.runnable.run(); + } + + public static PriorityRunnable of(Runnable runnable, int priority) { + return new PriorityRunnable(runnable, priority); + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index 30a257c6b..a01caf296 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -28,6 +28,7 @@ import org.dromara.dynamictp.core.executor.NamedThreadFactory; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.BinderHelper; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import static org.dromara.dynamictp.common.constant.DynamicTpConst.ALLOW_CORE_THREAD_TIMEOUT; import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWAIT_TERMINATION_SECONDS; @@ -123,6 +125,8 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { BlockingQueue taskQueue; if (clazz.equals(EagerDtpExecutor.class)) { taskQueue = new TaskQueue(props.getQueueCapacity()); + } else if (clazz.equals(PriorityDtpExecutor.class)) { + taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); } else { taskQueue = buildLbq(props.getQueueType(), props.getQueueCapacity(), @@ -130,7 +134,7 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { props.getMaxFreeMemory()); } - return new Object[] { + return new Object[]{ props.getCorePoolSize(), props.getMaximumPoolSize(), props.getKeepAliveTime(), @@ -140,4 +144,5 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()) }; } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java index c21d7e2f9..3b81eaf89 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java @@ -34,6 +34,7 @@ import org.dromara.dynamictp.core.executor.ScheduledDtpExecutor; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; import org.springframework.util.Assert; @@ -42,6 +43,7 @@ import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -149,6 +151,12 @@ public class ThreadPoolBuilder { */ private boolean scheduled = false; + /** + * If priority thread pool. + * default false, true priority thread pool. + */ + private boolean priority = false; + /** * If pre start all core threads. */ @@ -194,7 +202,8 @@ public class ThreadPoolBuilder { */ private List platformIds = Lists.newArrayList(); - private ThreadPoolBuilder() { } + private ThreadPoolBuilder() { + } public static ThreadPoolBuilder newBuilder() { return new ThreadPoolBuilder(); @@ -340,6 +349,11 @@ public ThreadPoolBuilder scheduled(boolean scheduled) { return this; } + public ThreadPoolBuilder priority(boolean priority) { + this.priority = priority; + return this; + } + public ThreadPoolBuilder preStartAllCoreThreads(boolean preStartAllCoreThreads) { this.preStartAllCoreThreads = preStartAllCoreThreads; return this; @@ -459,6 +473,16 @@ public EagerDtpExecutor buildEager() { return (EagerDtpExecutor) buildDtpExecutor(this); } + /** + * Build priority thread pool executor. + * + * @return the newly created PriorityDtpExecutor instance + */ + public PriorityDtpExecutor buildPriority() { + priority = true; + return (PriorityDtpExecutor) buildDtpExecutor(this); + } + /** * Build thread pool executor and wrapper with ttl * @@ -531,6 +555,15 @@ private DtpExecutor createInternal(ThreadPoolBuilder builder) { builder.workQueue, builder.threadFactory, builder.rejectedExecutionHandler); + } else if (priority) { + dtpExecutor = new PriorityDtpExecutor( + builder.corePoolSize, + builder.maximumPoolSize, + builder.keepAliveTime, + builder.timeUnit, + new PriorityBlockingQueue<>(builder.queueCapacity, PriorityDtpExecutor.getRunnableComparator()), + builder.threadFactory, + builder.rejectedExecutionHandler); } else { dtpExecutor = new DtpExecutor( builder.corePoolSize, diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java index 218f6de80..796bf0216 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java @@ -17,6 +17,7 @@ package org.dromara.dynamictp.core.support.task.runnable; +import lombok.Getter; import org.slf4j.MDC; import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID; @@ -27,15 +28,19 @@ * @author yanhom * @since 1.0.4 */ +@Getter public class DtpRunnable implements Runnable { + private final Runnable originRunnable; + private final Runnable runnable; private final String taskName; private final String traceId; - public DtpRunnable(Runnable runnable, String taskName) { + public DtpRunnable(Runnable originRunnable, Runnable runnable, String taskName) { + this.originRunnable = originRunnable; this.runnable = runnable; this.taskName = taskName; this.traceId = MDC.get(TRACE_ID); @@ -46,12 +51,4 @@ public void run() { runnable.run(); } - public String getTaskName() { - return taskName; - } - - public String getTraceId() { - return traceId; - } - } diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java index cfb662ea5..5443e3fb4 100644 --- a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java @@ -67,7 +67,6 @@ void orderedExecute() throws InterruptedException { MDC.put("traceId", String.valueOf(i)); orderedDtpExecutor.execute(new TestOrderedRunnable("TEST")); } -// new CountDownLatch(1).await(); } @Test diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java new file mode 100644 index 000000000..a37301a10 --- /dev/null +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java @@ -0,0 +1,193 @@ +package org.dromara.dynamictp.test.core.thread; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; +import org.dromara.dynamictp.core.support.ThreadPoolBuilder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * @author KamTo Hung + */ +@Slf4j +public class PriorityDtpExecutorStaticTest { + + private static PriorityDtpExecutor priorityDtpExecutor; + + @BeforeAll + public static void setUp() { + priorityDtpExecutor = ThreadPoolBuilder.newBuilder() + .threadPoolName("dtpExecutor1") + .corePoolSize(1) + .maximumPoolSize(1000) + .keepAliveTime(15000) + .timeUnit(TimeUnit.MILLISECONDS) + .waitForTasksToCompleteOnShutdown(true) + .awaitTerminationSeconds(5) + .runTimeout(10000) + .queueTimeout(10000) + .buildPriority(); + } + + @Test + void execute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void priorityExecute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void submit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void prioritySubmit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void submitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void prioritySubmitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name, i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void submitCallable() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch)); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void prioritySubmitCallable() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch), i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + private static class TestPriorityRunnable implements Runnable { + + private final int number; + + private final CountDownLatch countDownLatch; + + public TestPriorityRunnable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + log.info("work-{} completed successfully", number); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); + } + } + } + + private static class TestPriorityCallable implements Callable { + + private final int number; + + private final CountDownLatch countDownLatch; + + private TestPriorityCallable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public String call() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + return "work-" + number; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + log.info("work-{} completed successfully", number); + countDownLatch.countDown(); + } + } + } + +} diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java new file mode 100644 index 000000000..0b62232d9 --- /dev/null +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java @@ -0,0 +1,190 @@ +package org.dromara.dynamictp.test.core.thread; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; +import org.dromara.dynamictp.core.spring.EnableDynamicTp; +import org.dromara.dynamictp.core.spring.YamlPropertySourceFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.PropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * @author KamTo Hung + */ +@Slf4j +@EnableDynamicTp +@EnableAutoConfiguration +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = PriorityDtpExecutorTest.class) +@PropertySource(value = "classpath:/dynamic-tp-demo.yml", factory = YamlPropertySourceFactory.class) +public class PriorityDtpExecutorTest { + + @Resource + private PriorityDtpExecutor priorityDtpExecutor; + + @Test + void execute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void priorityExecute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void submit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void prioritySubmit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void submitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void prioritySubmitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name, i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void submitCallable() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch)); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void prioritySubmitCallable() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch), i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + private static class TestPriorityRunnable implements Runnable { + + private final int number; + + private final CountDownLatch countDownLatch; + + public TestPriorityRunnable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + log.info("work-{} completed successfully", number); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); + } + } + } + + private static class TestPriorityCallable implements Callable { + + private final int number; + + private final CountDownLatch countDownLatch; + + private TestPriorityCallable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public String call() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + return "work-" + number; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + log.info("work-{} completed successfully", number); + countDownLatch.countDown(); + } + } + } + +} diff --git a/test/test-core/src/test/resources/dynamic-tp-demo.yml b/test/test-core/src/test/resources/dynamic-tp-demo.yml index 91d5e3ec6..29cd15b36 100644 --- a/test/test-core/src/test/resources/dynamic-tp-demo.yml +++ b/test/test-core/src/test/resources/dynamic-tp-demo.yml @@ -73,4 +73,19 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) + taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 + - threadPoolName: priorityDtpThreadPoolExecutor + executorType: priority # 线程池类型common、eager:适用于io密集型 + corePoolSize: 1 + maximumPoolSize: 1 + queueCapacity: 5000 + rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类 + keepAliveTime: 50 + allowCoreThreadTimeOut: false # 是否允许核心线程池超时 + threadNamePrefix: eagerDtp # 线程名前缀 + waitForTasksToCompleteOnShutdown: false # 参考spring线程池设计,优雅关闭线程池 + awaitTerminationSeconds: 5 # 单位(s) + preStartAllCoreThreads: false # 是否预热所有核心线程,默认false + runTimeout: 10000 # 任务执行超时阈值,目前只做告警用,单位(ms) + queueTimeout: 10000 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file diff --git a/test/test-core/src/test/resources/junit-platform.properties b/test/test-core/src/test/resources/junit-platform.properties index a6d5ae56c..11fdcdb39 100644 --- a/test/test-core/src/test/resources/junit-platform.properties +++ b/test/test-core/src/test/resources/junit-platform.properties @@ -1,15 +1,5 @@ -# 并行开关true/false junit.jupiter.execution.parallel.enabled=true -# 方法级多线程开关 same_thread/concurrent junit.jupiter.execution.parallel.mode.default = concurrent -# 类级多线程开关 same_thread/concurrent junit.jupiter.execution.parallel.mode.classes.default = concurrent - -# 并发策略有以下三种可选: -# fixed:固定线程数,此时还要通过junit.jupiter.execution.parallel.config.fixed.parallelism指定线程数 -# dynamic:表示根据处理器和核数计算线程数 -# custom:自定义并发策略,通过这个配置来指定:junit.jupiter.execution.parallel.config.custom.class junit.jupiter.execution.parallel.config.strategy = fixed - -# 并发线程数,该配置项只有当并发策略为fixed的时候才有用 junit.jupiter.execution.parallel.config.fixed.parallelism = 5