From e6b9a900e198e0de49c1e6bf27f0ba8a92d7403f Mon Sep 17 00:00:00 2001 From: kamtohung Date: Thu, 4 Jan 2024 11:42:18 +0800 Subject: [PATCH 01/12] feat(priority executor): init --- .../executor/priority/PriorityCallable.java | 12 ++ .../executor/priority/PriorityExecutor.java | 119 ++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java new file mode 100644 index 000000000..06ee17049 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -0,0 +1,12 @@ +package org.dromara.dynamictp.core.executor.priority; + +import java.util.concurrent.Callable; + +/** + * @author KamTo Hung + */ +public interface PriorityCallable extends Callable { + + int priority(); + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java new file mode 100644 index 000000000..144ff8de2 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java @@ -0,0 +1,119 @@ +package org.dromara.dynamictp.core.executor.priority; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.DtpExecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * @author KamTo Hung + */ +@Slf4j +public class PriorityExecutor extends DtpExecutor { + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), new AbortPolicy()); + } + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, new AbortPolicy()); + } + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), handler); + } + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, handler); + } + + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), new AbortPolicy()); + } + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new AbortPolicy()); + } + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); + } + + public PriorityExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + + @Override + public void execute(Runnable task, long startTimeout) { + super.execute(task, startTimeout); + } + + @Override + public void execute(Runnable command) { + super.execute(command); + } + + + @Override + public Future submit(Runnable task) { + return super.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return super.submit(task, result); + } + + @Override + public Future submit(Callable task) { + return super.submit(task); + } +} From b015b4ff30e727f9738b2d2cd1885505393c6476 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Thu, 4 Jan 2024 15:35:00 +0800 Subject: [PATCH 02/12] feat(priority executor): init --- .../executor/priority/PriorityExecutor.java | 28 ++------------ .../executor/priority/PriorityFutureTask.java | 33 +++++++++++++++++ .../executor/priority/PriorityRunnable.java | 37 +++++++++++++++++++ .../support/task/runnable/DtpRunnable.java | 11 ++---- 4 files changed, 76 insertions(+), 33 deletions(-) create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java index 144ff8de2..bc97d52e9 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java @@ -3,11 +3,10 @@ import lombok.extern.slf4j.Slf4j; import org.dromara.dynamictp.core.executor.DtpExecutor; -import java.util.concurrent.Callable; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -92,28 +91,7 @@ public PriorityExecutor(int corePoolSize, @Override - public void execute(Runnable task, long startTimeout) { - super.execute(task, startTimeout); - } - - @Override - public void execute(Runnable command) { - super.execute(command); - } - - - @Override - public Future submit(Runnable task) { - return super.submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return super.submit(task, result); - } - - @Override - public Future submit(Callable task) { - return super.submit(task); + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new PriorityFutureTask<>(runnable, value); } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java new file mode 100644 index 000000000..5e177c4ee --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -0,0 +1,33 @@ +package org.dromara.dynamictp.core.executor.priority; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.FutureTask; + +/** + * @author KamTo Hung + */ +@Slf4j +public class PriorityFutureTask extends FutureTask implements Comparable> { + + /** + * The underlying RunnableFuture + */ + private final PriorityRunnable runnable; + + + public PriorityFutureTask(Runnable runnable, V result) { + super(runnable, result); + this.runnable = (PriorityRunnable)runnable; + } + + @Override + public void run() { + runnable.run(); + } + + @Override + public int compareTo(PriorityFutureTask o) { + return Integer.compare(o.runnable.getPriority(), this.runnable.getPriority()); + } +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java new file mode 100644 index 000000000..6fe644bb2 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -0,0 +1,37 @@ +package org.dromara.dynamictp.core.executor.priority; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author KamTo Hung + */ +@Slf4j +public class PriorityRunnable implements Comparable, Runnable { + + + private final Runnable runnable; + + @Getter + private final int priority; + + public PriorityRunnable(Runnable runnable, int priority) { + this.runnable = runnable; + this.priority = priority; + } + + @Override + public void run() { + this.runnable.run(); + } + + @Override + public int compareTo(PriorityRunnable o) { + return Integer.compare(o.priority, this.priority); + } + + public static PriorityRunnable of(Runnable runnable, int priority) { + return new PriorityRunnable(runnable, priority); + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java index 218f6de80..98ad2a578 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java @@ -17,6 +17,7 @@ package org.dromara.dynamictp.core.support.task.runnable; +import lombok.Getter; import org.slf4j.MDC; import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID; @@ -31,8 +32,10 @@ public class DtpRunnable implements Runnable { private final Runnable runnable; + @Getter private final String taskName; + @Getter private final String traceId; public DtpRunnable(Runnable runnable, String taskName) { @@ -46,12 +49,4 @@ public void run() { runnable.run(); } - public String getTaskName() { - return taskName; - } - - public String getTraceId() { - return traceId; - } - } From c286d7d6fd995bcde42c3c4193ab306b42a1eb85 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Fri, 5 Jan 2024 01:19:45 +0800 Subject: [PATCH 03/12] feat(priority executor): add Callable --- .../core/executor/priority/Priority.java | 12 +++++++++++ .../executor/priority/PriorityCallable.java | 21 +++++++++++++++++-- .../executor/priority/PriorityExecutor.java | 7 +++++++ .../executor/priority/PriorityFutureTask.java | 19 +++++++++-------- .../executor/priority/PriorityRunnable.java | 12 +++++------ 5 files changed, 54 insertions(+), 17 deletions(-) create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java new file mode 100644 index 000000000..d769fc32c --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java @@ -0,0 +1,12 @@ +package org.dromara.dynamictp.core.executor.priority; + + +/** + * The interface Priority. + * @author KamTo Hung + */ +public interface Priority { + + int getPriority(); + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 06ee17049..1db2b158b 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -5,8 +5,25 @@ /** * @author KamTo Hung */ -public interface PriorityCallable extends Callable { +public class PriorityCallable implements Priority, Callable { - int priority(); + private final Callable callable; + + private final int priority; + + public PriorityCallable(Callable callable, int priority) { + this.callable = callable; + this.priority = priority; + } + + @Override + public V call() throws Exception { + return callable.call(); + } + + @Override + public int getPriority() { + return this.priority; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java index bc97d52e9..c86eb84eb 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java @@ -3,6 +3,7 @@ import lombok.extern.slf4j.Slf4j; import org.dromara.dynamictp.core.executor.DtpExecutor; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -94,4 +95,10 @@ public PriorityExecutor(int corePoolSize, protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new PriorityFutureTask<>(runnable, value); } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new PriorityFutureTask<>(callable); + } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java index 5e177c4ee..d6884845f 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -1,33 +1,34 @@ package org.dromara.dynamictp.core.executor.priority; -import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; /** * @author KamTo Hung */ -@Slf4j public class PriorityFutureTask extends FutureTask implements Comparable> { /** - * The underlying RunnableFuture + * The runnable. */ - private final PriorityRunnable runnable; + private final Priority obj; public PriorityFutureTask(Runnable runnable, V result) { super(runnable, result); - this.runnable = (PriorityRunnable)runnable; + this.obj = (PriorityRunnable)runnable; } - @Override - public void run() { - runnable.run(); + public PriorityFutureTask(Callable callable) { + super(callable); + this.obj = (PriorityCallable)callable; } @Override public int compareTo(PriorityFutureTask o) { - return Integer.compare(o.runnable.getPriority(), this.runnable.getPriority()); + return Integer.compare(o.obj.getPriority(), this.obj.getPriority()); } + + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 6fe644bb2..91919fdb7 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -1,18 +1,13 @@ package org.dromara.dynamictp.core.executor.priority; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; /** * @author KamTo Hung */ -@Slf4j -public class PriorityRunnable implements Comparable, Runnable { - +public class PriorityRunnable implements Priority, Comparable, Runnable { private final Runnable runnable; - @Getter private final int priority; public PriorityRunnable(Runnable runnable, int priority) { @@ -34,4 +29,9 @@ public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable(runnable, priority); } + @Override + public int getPriority() { + return this.priority; + } + } From 17627546817f8b50e627b4b4fa0d447e63c11539 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Fri, 5 Jan 2024 01:33:24 +0800 Subject: [PATCH 04/12] feat(priority executor): change comparable to Object #283 --- .../core/executor/priority/PriorityCallable.java | 7 ++++++- .../core/executor/priority/PriorityFutureTask.java | 10 ++++------ .../core/executor/priority/PriorityRunnable.java | 6 +++--- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 1db2b158b..3cdda34e4 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -5,7 +5,7 @@ /** * @author KamTo Hung */ -public class PriorityCallable implements Priority, Callable { +public class PriorityCallable implements Priority, Comparable, Callable { private final Callable callable; @@ -26,4 +26,9 @@ public int getPriority() { return this.priority; } + @Override + public int compareTo(Object o) { + return Integer.compare(((PriorityCallable) o).priority, this.priority); + } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java index d6884845f..6a7752957 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -7,13 +7,12 @@ /** * @author KamTo Hung */ -public class PriorityFutureTask extends FutureTask implements Comparable> { +public class PriorityFutureTask extends FutureTask implements Comparable { /** * The runnable. */ - private final Priority obj; - + private final Comparable obj; public PriorityFutureTask(Runnable runnable, V result) { super(runnable, result); @@ -26,9 +25,8 @@ public PriorityFutureTask(Callable callable) { } @Override - public int compareTo(PriorityFutureTask o) { - return Integer.compare(o.obj.getPriority(), this.obj.getPriority()); + public int compareTo(Object o) { + return this.obj.compareTo(((PriorityFutureTask)o).obj); } - } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 91919fdb7..a7e90e824 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -4,7 +4,7 @@ /** * @author KamTo Hung */ -public class PriorityRunnable implements Priority, Comparable, Runnable { +public class PriorityRunnable implements Priority, Comparable, Runnable { private final Runnable runnable; @@ -21,8 +21,8 @@ public void run() { } @Override - public int compareTo(PriorityRunnable o) { - return Integer.compare(o.priority, this.priority); + public int compareTo(Object o) { + return Integer.compare(((PriorityRunnable) o).priority, this.priority); } public static PriorityRunnable of(Runnable runnable, int priority) { From a267222f0ea1d1f4f5b5e56f32784bd2d5c0775d Mon Sep 17 00:00:00 2001 From: kamtohung Date: Fri, 5 Jan 2024 18:11:07 +0800 Subject: [PATCH 05/12] feat(priority executor): add to spring #283 --- .../dynamictp/core/executor/ExecutorType.java | 23 ++- .../core/executor/priority/Priority.java | 12 -- .../executor/priority/PriorityCallable.java | 13 +- .../priority/PriorityDtpExecutor.java | 147 ++++++++++++++++++ .../executor/priority/PriorityExecutor.java | 104 ------------- .../executor/priority/PriorityRunnable.java | 9 +- .../spring/DtpBeanDefinitionRegistrar.java | 6 +- .../core/thread/PriorityDtpExecutorTest.java | 64 ++++++++ .../src/test/resources/dynamic-tp-demo.yml | 15 ++ 9 files changed, 260 insertions(+), 133 deletions(-) delete mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java delete mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java create mode 100644 test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java b/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java index e5888bb12..4f60669a9 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java @@ -19,6 +19,7 @@ import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import lombok.Getter; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; /** * ExecutorType related @@ -30,12 +31,29 @@ public enum ExecutorType { /** - * Executor type. + * Common executor type. */ COMMON("common", DtpExecutor.class), + + /** + * Eager executor type. + */ EAGER("eager", EagerDtpExecutor.class), + + /** + * Scheduled executor type. + */ SCHEDULED("scheduled", ScheduledDtpExecutor.class), - ORDERED("ordered", OrderedDtpExecutor.class); + + /** + * Ordered executor type. + */ + ORDERED("ordered", OrderedDtpExecutor.class), + + /** + * Priority executor type. + */ + PRIORITY("priority", PriorityDtpExecutor.class); private final String name; @@ -54,4 +72,5 @@ public static Class getClass(String name) { } return COMMON.getClazz(); } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java deleted file mode 100644 index d769fc32c..000000000 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.dromara.dynamictp.core.executor.priority; - - -/** - * The interface Priority. - * @author KamTo Hung - */ -public interface Priority { - - int getPriority(); - -} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 3cdda34e4..214c56fd2 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -5,25 +5,24 @@ /** * @author KamTo Hung */ -public class PriorityCallable implements Priority, Comparable, Callable { +public class PriorityCallable implements Comparable, Callable { private final Callable callable; private final int priority; - public PriorityCallable(Callable callable, int priority) { + private PriorityCallable(Callable callable, int priority) { this.callable = callable; this.priority = priority; } - @Override - public V call() throws Exception { - return callable.call(); + public static Callable of(Callable task, int i) { + return new PriorityCallable<>(task, i); } @Override - public int getPriority() { - return this.priority; + public V call() throws Exception { + return callable.call(); } @Override diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java new file mode 100644 index 000000000..239ff8ea4 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java @@ -0,0 +1,147 @@ +package org.dromara.dynamictp.core.executor.priority; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.DtpExecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * @author KamTo Hung + */ +@Slf4j +public class PriorityDtpExecutor extends DtpExecutor { + + /** + * The default priority. + */ + private static final int DEFAULT_PRIORITY = 0; + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), handler); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + int capacity, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, handler); + } + + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new AbortPolicy()); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); + } + + public PriorityDtpExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + PriorityBlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new PriorityFutureTask<>(runnable, value); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return new PriorityFutureTask<>(callable); + } + + @Override + public void execute(Runnable command) { + this.execute(command, DEFAULT_PRIORITY); + } + + public void execute(Runnable command, int priority) { + super.execute(PriorityRunnable.of(command, priority)); + } + + @Override + public Future submit(Runnable task) { + return super.submit(PriorityRunnable. of(task, DEFAULT_PRIORITY)); + } + + public Future submit(Runnable task, int priority) { + return this.submit(PriorityRunnable.of(task, priority)); + } + + @Override + public Future submit(Runnable task, T result) { + return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY), result); + } + + public Future submit(Runnable task, T result, int priority) { + return this.submit(PriorityRunnable.of(task, priority), result); + } + + + @Override + public Future submit(Callable task) { + return super.submit(PriorityCallable.of(task, DEFAULT_PRIORITY)); + } + + public Future submit(Callable task, int priority) { + return this.submit(PriorityCallable.of(task, priority)); + } + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java deleted file mode 100644 index c86eb84eb..000000000 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityExecutor.java +++ /dev/null @@ -1,104 +0,0 @@ -package org.dromara.dynamictp.core.executor.priority; - -import lombok.extern.slf4j.Slf4j; -import org.dromara.dynamictp.core.executor.DtpExecutor; - -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * @author KamTo Hung - */ -@Slf4j -public class PriorityExecutor extends DtpExecutor { - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - int capacity) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), new AbortPolicy()); - } - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - int capacity, - ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, new AbortPolicy()); - } - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - int capacity, - RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), handler); - } - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - int capacity, - ThreadFactory threadFactory, - RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, handler); - } - - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - PriorityBlockingQueue workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), new AbortPolicy()); - } - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - PriorityBlockingQueue workQueue, - ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new AbortPolicy()); - } - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - PriorityBlockingQueue workQueue, - RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); - } - - public PriorityExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - PriorityBlockingQueue workQueue, - ThreadFactory threadFactory, - RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - } - - - @Override - protected RunnableFuture newTaskFor(Runnable runnable, T value) { - return new PriorityFutureTask<>(runnable, value); - } - - @Override - protected RunnableFuture newTaskFor(Callable callable) { - return new PriorityFutureTask<>(callable); - } - -} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index a7e90e824..81033ef3c 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -4,13 +4,13 @@ /** * @author KamTo Hung */ -public class PriorityRunnable implements Priority, Comparable, Runnable { +public class PriorityRunnable implements Comparable, Runnable { private final Runnable runnable; private final int priority; - public PriorityRunnable(Runnable runnable, int priority) { + private PriorityRunnable(Runnable runnable, int priority) { this.runnable = runnable; this.priority = priority; } @@ -29,9 +29,4 @@ public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable(runnable, priority); } - @Override - public int getPriority() { - return this.priority; - } - } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index 30a257c6b..c70c4eb98 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -28,6 +28,7 @@ import org.dromara.dynamictp.core.executor.NamedThreadFactory; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.BinderHelper; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import static org.dromara.dynamictp.common.constant.DynamicTpConst.ALLOW_CORE_THREAD_TIMEOUT; import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWAIT_TERMINATION_SECONDS; @@ -123,7 +125,9 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { BlockingQueue taskQueue; if (clazz.equals(EagerDtpExecutor.class)) { taskQueue = new TaskQueue(props.getQueueCapacity()); - } else { + } else if (clazz.equals(PriorityDtpExecutor.class)) { + taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity()); + }else { taskQueue = buildLbq(props.getQueueType(), props.getQueueCapacity(), props.isFair(), diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java new file mode 100644 index 000000000..8041893f7 --- /dev/null +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java @@ -0,0 +1,64 @@ +package org.dromara.dynamictp.test.core.thread; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; +import org.dromara.dynamictp.core.spring.EnableDynamicTp; +import org.dromara.dynamictp.core.spring.YamlPropertySourceFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.PropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; + +/** + * @author KamTo Hung + */ +@Slf4j +@EnableDynamicTp +@EnableAutoConfiguration +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = EagerDtpExecutorTest.class) +@PropertySource(value = "classpath:/dynamic-tp-demo.yml", factory = YamlPropertySourceFactory.class) +public class PriorityDtpExecutorTest { + + @Resource + private PriorityDtpExecutor priorityDtpExecutor; + + @Test + void execute() { + priorityDtpExecutor.execute(() -> { + log.info("test"); + }); + } + + @Test + void testExecute() { + } + + @Test + void submit() { + } + + @Test + void testSubmit() { + } + + @Test + void testSubmit1() { + } + + @Test + void testSubmit2() { + } + + @Test + void testSubmit3() { + } + + @Test + void testSubmit4() { + } +} diff --git a/test/test-core/src/test/resources/dynamic-tp-demo.yml b/test/test-core/src/test/resources/dynamic-tp-demo.yml index 91d5e3ec6..1ec499e5c 100644 --- a/test/test-core/src/test/resources/dynamic-tp-demo.yml +++ b/test/test-core/src/test/resources/dynamic-tp-demo.yml @@ -73,4 +73,19 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) + taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 + - threadPoolName: priorityDtpThreadPoolExecutor + executorType: priority # 线程池类型common、eager:适用于io密集型 + corePoolSize: 1 + maximumPoolSize: 5 + queueCapacity: 5000 + rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类 + keepAliveTime: 50 + allowCoreThreadTimeOut: false # 是否允许核心线程池超时 + threadNamePrefix: eagerDtp # 线程名前缀 + waitForTasksToCompleteOnShutdown: false # 参考spring线程池设计,优雅关闭线程池 + awaitTerminationSeconds: 5 # 单位(s) + preStartAllCoreThreads: false # 是否预热所有核心线程,默认false + runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) + queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file From 86b519233971127833c023fe08ae94017fd99a3b Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sat, 6 Jan 2024 12:01:39 +0800 Subject: [PATCH 06/12] feat(priority executor): fix priority compare error #283 --- .../executor/priority/PriorityCallable.java | 2 +- .../executor/priority/PriorityRunnable.java | 2 +- .../spring/DtpBeanDefinitionRegistrar.java | 20 +++++++- .../support/task/runnable/DtpRunnable.java | 3 +- .../core/thread/PriorityDtpExecutorTest.java | 48 ++++++++++++++++--- .../src/test/resources/dynamic-tp-demo.yml | 8 ++-- 6 files changed, 68 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 214c56fd2..0c7df6170 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -27,7 +27,7 @@ public V call() throws Exception { @Override public int compareTo(Object o) { - return Integer.compare(((PriorityCallable) o).priority, this.priority); + return Integer.compare(this.priority, ((PriorityCallable) o).priority); } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 81033ef3c..1b8b302da 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -22,7 +22,7 @@ public void run() { @Override public int compareTo(Object o) { - return Integer.compare(((PriorityRunnable) o).priority, this.priority); + return Integer.compare(this.priority, ((PriorityRunnable) o).priority); } public static PriorityRunnable of(Runnable runnable, int priority) { diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index c70c4eb98..5e03550ea 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -29,8 +29,10 @@ import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; +import org.dromara.dynamictp.core.executor.priority.PriorityRunnable; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.BinderHelper; +import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.context.EnvironmentAware; @@ -38,6 +40,7 @@ import org.springframework.core.env.Environment; import org.springframework.core.type.AnnotationMetadata; +import java.util.Comparator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -126,7 +129,7 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { if (clazz.equals(EagerDtpExecutor.class)) { taskQueue = new TaskQueue(props.getQueueCapacity()); } else if (clazz.equals(PriorityDtpExecutor.class)) { - taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity()); + taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), getRunnableComparator()); }else { taskQueue = buildLbq(props.getQueueType(), props.getQueueCapacity(), @@ -144,4 +147,19 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()) }; } + + private Comparator getRunnableComparator() { + return (o1, o2) -> { + if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) { + return 0; + } + Runnable po1 = ((DtpRunnable) o1).getRunnable(); + Runnable po2 = ((DtpRunnable) o2).getRunnable(); + if (!(po1 instanceof PriorityRunnable) || !(po2 instanceof PriorityRunnable)) { + return 0; + } + return ((PriorityRunnable) po1).compareTo(po2); + }; + } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java index 98ad2a578..3c82c9b76 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java @@ -28,14 +28,13 @@ * @author yanhom * @since 1.0.4 */ +@Getter public class DtpRunnable implements Runnable { private final Runnable runnable; - @Getter private final String taskName; - @Getter private final String traceId; public DtpRunnable(Runnable runnable, String taskName) { diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java index 8041893f7..25ac036cc 100644 --- a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java @@ -12,6 +12,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import javax.annotation.Resource; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @author KamTo Hung @@ -20,7 +22,7 @@ @EnableDynamicTp @EnableAutoConfiguration @ExtendWith(SpringExtension.class) -@SpringBootTest(classes = EagerDtpExecutorTest.class) +@SpringBootTest(classes = PriorityDtpExecutorTest.class) @PropertySource(value = "classpath:/dynamic-tp-demo.yml", factory = YamlPropertySourceFactory.class) public class PriorityDtpExecutorTest { @@ -28,14 +30,48 @@ public class PriorityDtpExecutorTest { private PriorityDtpExecutor priorityDtpExecutor; @Test - void execute() { - priorityDtpExecutor.execute(() -> { - log.info("test"); - }); + void execute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); } @Test - void testExecute() { + void priorityExecute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + private static class TestPriorityRunnable implements Runnable { + + private final int number; + + private final CountDownLatch countDownLatch; + + public TestPriorityRunnable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(100); + log.info("work-{} completed successfully", number); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); + } + } } @Test diff --git a/test/test-core/src/test/resources/dynamic-tp-demo.yml b/test/test-core/src/test/resources/dynamic-tp-demo.yml index 1ec499e5c..cae5577b3 100644 --- a/test/test-core/src/test/resources/dynamic-tp-demo.yml +++ b/test/test-core/src/test/resources/dynamic-tp-demo.yml @@ -77,7 +77,7 @@ spring: - threadPoolName: priorityDtpThreadPoolExecutor executorType: priority # 线程池类型common、eager:适用于io密集型 corePoolSize: 1 - maximumPoolSize: 5 + maximumPoolSize: 1 queueCapacity: 5000 rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类 keepAliveTime: 50 @@ -86,6 +86,6 @@ spring: waitForTasksToCompleteOnShutdown: false # 参考spring线程池设计,优雅关闭线程池 awaitTerminationSeconds: 5 # 单位(s) preStartAllCoreThreads: false # 是否预热所有核心线程,默认false - runTimeout: 200 # 任务执行超时阈值,目前只做告警用,单位(ms) - queueTimeout: 100 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) - taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file + runTimeout: 10000 # 任务执行超时阈值,目前只做告警用,单位(ms) + queueTimeout: 10000 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) +# taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file From d3bbe571e7b00d1af8e7ce88bcb1e6d11c83b901 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sat, 6 Jan 2024 15:37:33 +0800 Subject: [PATCH 07/12] feat(priority executor): fix priority compare error #283 --- .../priority/PriorityDtpExecutor.java | 11 +--- .../spring/DtpBeanDefinitionRegistrar.java | 9 ++- .../core/thread/PriorityDtpExecutorTest.java | 61 +++++++++++-------- 3 files changed, 46 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java index 239ff8ea4..13773640e 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java @@ -107,11 +107,6 @@ protected RunnableFuture newTaskFor(Callable callable) { return new PriorityFutureTask<>(callable); } - @Override - public void execute(Runnable command) { - this.execute(command, DEFAULT_PRIORITY); - } - public void execute(Runnable command, int priority) { super.execute(PriorityRunnable.of(command, priority)); } @@ -122,7 +117,7 @@ public Future submit(Runnable task) { } public Future submit(Runnable task, int priority) { - return this.submit(PriorityRunnable.of(task, priority)); + return super.submit(PriorityRunnable.of(task, priority)); } @Override @@ -131,7 +126,7 @@ public Future submit(Runnable task, T result) { } public Future submit(Runnable task, T result, int priority) { - return this.submit(PriorityRunnable.of(task, priority), result); + return super.submit(PriorityRunnable.of(task, priority), result); } @@ -141,7 +136,7 @@ public Future submit(Callable task) { } public Future submit(Callable task, int priority) { - return this.submit(PriorityCallable.of(task, priority)); + return super.submit(PriorityCallable.of(task, priority)); } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index 5e03550ea..fed945fdf 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -29,6 +29,7 @@ import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; +import org.dromara.dynamictp.core.executor.priority.PriorityFutureTask; import org.dromara.dynamictp.core.executor.priority.PriorityRunnable; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.BinderHelper; @@ -155,10 +156,12 @@ private Comparator getRunnableComparator() { } Runnable po1 = ((DtpRunnable) o1).getRunnable(); Runnable po2 = ((DtpRunnable) o2).getRunnable(); - if (!(po1 instanceof PriorityRunnable) || !(po2 instanceof PriorityRunnable)) { - return 0; + if (po1 instanceof PriorityRunnable && po2 instanceof PriorityRunnable) { + return ((PriorityRunnable) po1).compareTo(po2); + } else if (po1 instanceof PriorityFutureTask && po2 instanceof PriorityFutureTask) { + return ((PriorityFutureTask) po1).compareTo(po2); } - return ((PriorityRunnable) po1).compareTo(po2); + return 0; }; } diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java index 25ac036cc..0f57e8b91 100644 --- a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java @@ -49,6 +49,42 @@ void priorityExecute() throws InterruptedException { countDownLatch.await(); } + @Test + void submit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void prioritySubmit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void prioritySubmit1() { + } + + @Test + void prioritySubmit2() { + } + + @Test + void prioritySubmit3() { + } + + @Test + void prioritySubmit4() { + } + private static class TestPriorityRunnable implements Runnable { private final int number; @@ -64,7 +100,7 @@ public TestPriorityRunnable(int number, CountDownLatch countDownLatch) { public void run() { try { log.info("work-{} triggered successfully", number); - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.MILLISECONDS.sleep(10); log.info("work-{} completed successfully", number); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -74,27 +110,4 @@ public void run() { } } - @Test - void submit() { - } - - @Test - void testSubmit() { - } - - @Test - void testSubmit1() { - } - - @Test - void testSubmit2() { - } - - @Test - void testSubmit3() { - } - - @Test - void testSubmit4() { - } } From 36dd3325f8e12fbe1bb953576738db7d95df1a6a Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sat, 6 Jan 2024 16:23:57 +0800 Subject: [PATCH 08/12] feat(priority executor): fix priority compare error #283 --- .../executor/priority/PriorityCallable.java | 6 ++ .../executor/priority/PriorityFutureTask.java | 5 ++ .../executor/priority/PriorityRunnable.java | 6 ++ .../core/thread/PriorityDtpExecutorTest.java | 85 ++++++++++++++++++- 4 files changed, 98 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 0c7df6170..4cfccee23 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -1,5 +1,7 @@ package org.dromara.dynamictp.core.executor.priority; +import lombok.Getter; + import java.util.concurrent.Callable; /** @@ -9,6 +11,7 @@ public class PriorityCallable implements Comparable, Callable { private final Callable callable; + @Getter private final int priority; private PriorityCallable(Callable callable, int priority) { @@ -27,6 +30,9 @@ public V call() throws Exception { @Override public int compareTo(Object o) { + if (o instanceof PriorityRunnable) { + return Integer.compare(this.priority, ((PriorityRunnable) o).getPriority()); + } return Integer.compare(this.priority, ((PriorityCallable) o).priority); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java index 6a7752957..047129727 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -1,6 +1,8 @@ package org.dromara.dynamictp.core.executor.priority; +import lombok.var; + import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -26,6 +28,9 @@ public PriorityFutureTask(Callable callable) { @Override public int compareTo(Object o) { + if (this.obj == null || o == null) { + return 0; + } return this.obj.compareTo(((PriorityFutureTask)o).obj); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 1b8b302da..303d78d17 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -1,6 +1,8 @@ package org.dromara.dynamictp.core.executor.priority; +import lombok.Getter; + /** * @author KamTo Hung */ @@ -8,6 +10,7 @@ public class PriorityRunnable implements Comparable, Runnable { private final Runnable runnable; + @Getter private final int priority; private PriorityRunnable(Runnable runnable, int priority) { @@ -22,6 +25,9 @@ public void run() { @Override public int compareTo(Object o) { + if (o instanceof PriorityCallable) { + return Integer.compare(this.priority, ((PriorityCallable) o).getPriority()); + } return Integer.compare(this.priority, ((PriorityRunnable) o).priority); } diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java index 0f57e8b91..944581802 100644 --- a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java @@ -12,7 +12,12 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -70,19 +75,65 @@ void prioritySubmit() throws InterruptedException { } @Test - void prioritySubmit1() { + void submitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } } @Test - void prioritySubmit2() { + void prioritySubmitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name, i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } } @Test - void prioritySubmit3() { + void prioritySubmit3() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch)); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } } @Test - void prioritySubmit4() { + void prioritySubmit4() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch), i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } } private static class TestPriorityRunnable implements Runnable { @@ -110,4 +161,30 @@ public void run() { } } + private static class TestPriorityCallable implements Callable { + + private final int number; + + private final CountDownLatch countDownLatch; + + private TestPriorityCallable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public String call() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + return "work-" + number; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + log.info("work-{} completed successfully", number); + countDownLatch.countDown(); + } + } + } + } From e9af9c6c47f63e91829877981b42f232715b1215 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sat, 6 Jan 2024 16:48:44 +0800 Subject: [PATCH 09/12] feat(priority executor): fix priority compare error #283 --- .../core/executor/priority/Priority.java | 10 ++++++++++ .../core/executor/priority/PriorityCallable.java | 10 +--------- .../executor/priority/PriorityFutureTask.java | 15 ++++++++------- .../core/executor/priority/PriorityRunnable.java | 10 +--------- .../core/spring/DtpBeanDefinitionRegistrar.java | 8 ++++---- .../test/core/thread/OrderedDtpExecutorTest.java | 1 - .../test/core/thread/PriorityDtpExecutorTest.java | 4 ++-- .../src/test/resources/dynamic-tp-demo.yml | 2 +- .../src/test/resources/junit-platform.properties | 10 ---------- 9 files changed, 27 insertions(+), 43 deletions(-) create mode 100644 core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java new file mode 100644 index 000000000..0bf689a57 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java @@ -0,0 +1,10 @@ +package org.dromara.dynamictp.core.executor.priority; + +/** + * @author KamTo Hung + */ +public interface Priority { + + int getPriority(); + +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 4cfccee23..bd9fff6c1 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -7,7 +7,7 @@ /** * @author KamTo Hung */ -public class PriorityCallable implements Comparable, Callable { +public class PriorityCallable implements Priority, Callable { private final Callable callable; @@ -28,12 +28,4 @@ public V call() throws Exception { return callable.call(); } - @Override - public int compareTo(Object o) { - if (o instanceof PriorityRunnable) { - return Integer.compare(this.priority, ((PriorityRunnable) o).getPriority()); - } - return Integer.compare(this.priority, ((PriorityCallable) o).priority); - } - } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java index 047129727..61dacb23a 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -9,29 +9,30 @@ /** * @author KamTo Hung */ -public class PriorityFutureTask extends FutureTask implements Comparable { +public class PriorityFutureTask extends FutureTask implements Priority { /** * The runnable. */ - private final Comparable obj; + private final Priority obj; + + private final int priority; public PriorityFutureTask(Runnable runnable, V result) { super(runnable, result); this.obj = (PriorityRunnable)runnable; + this.priority = this.obj.getPriority(); } public PriorityFutureTask(Callable callable) { super(callable); this.obj = (PriorityCallable)callable; + this.priority = this.obj.getPriority(); } @Override - public int compareTo(Object o) { - if (this.obj == null || o == null) { - return 0; - } - return this.obj.compareTo(((PriorityFutureTask)o).obj); + public int getPriority() { + return this.priority; } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 303d78d17..3e26a8c9b 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -6,7 +6,7 @@ /** * @author KamTo Hung */ -public class PriorityRunnable implements Comparable, Runnable { +public class PriorityRunnable implements Priority, Runnable { private final Runnable runnable; @@ -23,14 +23,6 @@ public void run() { this.runnable.run(); } - @Override - public int compareTo(Object o) { - if (o instanceof PriorityCallable) { - return Integer.compare(this.priority, ((PriorityCallable) o).getPriority()); - } - return Integer.compare(this.priority, ((PriorityRunnable) o).priority); - } - public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable(runnable, priority); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index fed945fdf..052f4c07d 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -28,6 +28,7 @@ import org.dromara.dynamictp.core.executor.NamedThreadFactory; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; +import org.dromara.dynamictp.core.executor.priority.Priority; import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; import org.dromara.dynamictp.core.executor.priority.PriorityFutureTask; import org.dromara.dynamictp.core.executor.priority.PriorityRunnable; @@ -156,10 +157,9 @@ private Comparator getRunnableComparator() { } Runnable po1 = ((DtpRunnable) o1).getRunnable(); Runnable po2 = ((DtpRunnable) o2).getRunnable(); - if (po1 instanceof PriorityRunnable && po2 instanceof PriorityRunnable) { - return ((PriorityRunnable) po1).compareTo(po2); - } else if (po1 instanceof PriorityFutureTask && po2 instanceof PriorityFutureTask) { - return ((PriorityFutureTask) po1).compareTo(po2); + // TODO 如果使用了别的Runnable包装器会走不到这里 + if (po1 instanceof Priority && po2 instanceof Priority) { + return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority()); } return 0; }; diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java index cfb662ea5..5443e3fb4 100644 --- a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/OrderedDtpExecutorTest.java @@ -67,7 +67,6 @@ void orderedExecute() throws InterruptedException { MDC.put("traceId", String.valueOf(i)); orderedDtpExecutor.execute(new TestOrderedRunnable("TEST")); } -// new CountDownLatch(1).await(); } @Test diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java index 944581802..0b62232d9 100644 --- a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorTest.java @@ -107,7 +107,7 @@ void prioritySubmitWithResult() throws InterruptedException, ExecutionException } @Test - void prioritySubmit3() throws InterruptedException, ExecutionException { + void submitCallable() throws InterruptedException, ExecutionException { int count = 5; CountDownLatch countDownLatch = new CountDownLatch(count); List> list = new ArrayList<>(); @@ -122,7 +122,7 @@ void prioritySubmit3() throws InterruptedException, ExecutionException { } @Test - void prioritySubmit4() throws InterruptedException, ExecutionException { + void prioritySubmitCallable() throws InterruptedException, ExecutionException { int count = 5; CountDownLatch countDownLatch = new CountDownLatch(count); List> list = new ArrayList<>(); diff --git a/test/test-core/src/test/resources/dynamic-tp-demo.yml b/test/test-core/src/test/resources/dynamic-tp-demo.yml index cae5577b3..29cd15b36 100644 --- a/test/test-core/src/test/resources/dynamic-tp-demo.yml +++ b/test/test-core/src/test/resources/dynamic-tp-demo.yml @@ -88,4 +88,4 @@ spring: preStartAllCoreThreads: false # 是否预热所有核心线程,默认false runTimeout: 10000 # 任务执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 10000 # 任务在队列等待超时阈值,目前只做告警用,单位(ms) -# taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file + taskWrapperNames: ["ttl"] # 任务包装器名称,集成TaskWrapper接口 \ No newline at end of file diff --git a/test/test-core/src/test/resources/junit-platform.properties b/test/test-core/src/test/resources/junit-platform.properties index a6d5ae56c..11fdcdb39 100644 --- a/test/test-core/src/test/resources/junit-platform.properties +++ b/test/test-core/src/test/resources/junit-platform.properties @@ -1,15 +1,5 @@ -# 并行开关true/false junit.jupiter.execution.parallel.enabled=true -# 方法级多线程开关 same_thread/concurrent junit.jupiter.execution.parallel.mode.default = concurrent -# 类级多线程开关 same_thread/concurrent junit.jupiter.execution.parallel.mode.classes.default = concurrent - -# 并发策略有以下三种可选: -# fixed:固定线程数,此时还要通过junit.jupiter.execution.parallel.config.fixed.parallelism指定线程数 -# dynamic:表示根据处理器和核数计算线程数 -# custom:自定义并发策略,通过这个配置来指定:junit.jupiter.execution.parallel.config.custom.class junit.jupiter.execution.parallel.config.strategy = fixed - -# 并发线程数,该配置项只有当并发策略为fixed的时候才有用 junit.jupiter.execution.parallel.config.fixed.parallelism = 5 From a51ae40c1e4870ae99214ae680c6a349ef7b518c Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sun, 7 Jan 2024 12:33:25 +0800 Subject: [PATCH 10/12] feat(priority executor): add origin command in dtpRunnable #283 --- .../dromara/dynamictp/core/aware/TaskEnhanceAware.java | 10 +++++----- .../core/spring/DtpBeanDefinitionRegistrar.java | 5 ++--- .../core/support/task/runnable/DtpRunnable.java | 5 ++++- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java b/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java index 15f2466ee..fadf9508c 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java +++ b/core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java @@ -35,19 +35,19 @@ public interface TaskEnhanceAware extends DtpAware { /** * Enhance task * - * @param command command + * @param command command * @param taskWrappers task wrappers * @return enhanced task */ default Runnable getEnhancedTask(Runnable command, List taskWrappers) { + Runnable wrapRunnable = command; if (CollectionUtils.isNotEmpty(taskWrappers)) { for (TaskWrapper t : taskWrappers) { - command = t.wrap(command); + wrapRunnable = t.wrap(wrapRunnable); } } - String taskName = (command instanceof NamedRunnable) ? ((NamedRunnable) command).getName() : null; - command = new DtpRunnable(command, taskName); - return command; + String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null; + return new DtpRunnable(command, wrapRunnable, taskName); } /** diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index 052f4c07d..1dd6fb315 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -155,9 +155,8 @@ private Comparator getRunnableComparator() { if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) { return 0; } - Runnable po1 = ((DtpRunnable) o1).getRunnable(); - Runnable po2 = ((DtpRunnable) o2).getRunnable(); - // TODO 如果使用了别的Runnable包装器会走不到这里 + Runnable po1 = ((DtpRunnable) o1).getOriginRunnable(); + Runnable po2 = ((DtpRunnable) o2).getOriginRunnable(); if (po1 instanceof Priority && po2 instanceof Priority) { return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority()); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java index 3c82c9b76..796bf0216 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/DtpRunnable.java @@ -31,13 +31,16 @@ @Getter public class DtpRunnable implements Runnable { + private final Runnable originRunnable; + private final Runnable runnable; private final String taskName; private final String traceId; - public DtpRunnable(Runnable runnable, String taskName) { + public DtpRunnable(Runnable originRunnable, Runnable runnable, String taskName) { + this.originRunnable = originRunnable; this.runnable = runnable; this.taskName = taskName; this.traceId = MDC.get(TRACE_ID); From 76034e9a2e1c479b340460b0a436a80c2a4ae420 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sun, 7 Jan 2024 20:57:53 +0800 Subject: [PATCH 11/12] feat(priority executor): add license header #283 --- .../core/executor/priority/Priority.java | 16 ++++++++++++++ .../executor/priority/PriorityCallable.java | 16 ++++++++++++++ .../priority/PriorityDtpExecutor.java | 18 ++++++++++++++- .../executor/priority/PriorityFutureTask.java | 22 +++++++++++++++---- .../executor/priority/PriorityRunnable.java | 16 ++++++++++++++ .../spring/DtpBeanDefinitionRegistrar.java | 10 ++++----- 6 files changed, 87 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java index 0bf689a57..abcc2058b 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.dromara.dynamictp.core.executor.priority; /** diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index bd9fff6c1..18701f3d1 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.dromara.dynamictp.core.executor.priority; import lombok.Getter; diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java index 13773640e..753b8a92e 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.dromara.dynamictp.core.executor.priority; import lombok.extern.slf4j.Slf4j; @@ -113,7 +129,7 @@ public void execute(Runnable command, int priority) { @Override public Future submit(Runnable task) { - return super.submit(PriorityRunnable. of(task, DEFAULT_PRIORITY)); + return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY)); } public Future submit(Runnable task, int priority) { diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java index 61dacb23a..f4873cb8f 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -1,8 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.dromara.dynamictp.core.executor.priority; -import lombok.var; - import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -20,13 +34,13 @@ public class PriorityFutureTask extends FutureTask implements Priority { public PriorityFutureTask(Runnable runnable, V result) { super(runnable, result); - this.obj = (PriorityRunnable)runnable; + this.obj = (PriorityRunnable) runnable; this.priority = this.obj.getPriority(); } public PriorityFutureTask(Callable callable) { super(callable); - this.obj = (PriorityCallable)callable; + this.obj = (PriorityCallable) callable; this.priority = this.obj.getPriority(); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 3e26a8c9b..3c0efaaf9 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.dromara.dynamictp.core.executor.priority; diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index 1dd6fb315..88922e13b 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -30,8 +30,6 @@ import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.executor.priority.Priority; import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; -import org.dromara.dynamictp.core.executor.priority.PriorityFutureTask; -import org.dromara.dynamictp.core.executor.priority.PriorityRunnable; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.BinderHelper; import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable; @@ -132,14 +130,14 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { taskQueue = new TaskQueue(props.getQueueCapacity()); } else if (clazz.equals(PriorityDtpExecutor.class)) { taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), getRunnableComparator()); - }else { + } else { taskQueue = buildLbq(props.getQueueType(), props.getQueueCapacity(), props.isFair(), props.getMaxFreeMemory()); } - return new Object[] { + return new Object[]{ props.getCorePoolSize(), props.getMaximumPoolSize(), props.getKeepAliveTime(), @@ -155,8 +153,8 @@ private Comparator getRunnableComparator() { if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) { return 0; } - Runnable po1 = ((DtpRunnable) o1).getOriginRunnable(); - Runnable po2 = ((DtpRunnable) o2).getOriginRunnable(); + Runnable po1 = ((DtpRunnable) o1).getOriginRunnable(); + Runnable po2 = ((DtpRunnable) o2).getOriginRunnable(); if (po1 instanceof Priority && po2 instanceof Priority) { return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority()); } From 683b647fcc693acc4f168f4f36f3e3931b4a1286 Mon Sep 17 00:00:00 2001 From: kamtohung Date: Sun, 7 Jan 2024 21:35:49 +0800 Subject: [PATCH 12/12] feat(priority executor): add ThreadPoolBuilder #283 --- .../core/executor/priority/Priority.java | 2 + .../executor/priority/PriorityCallable.java | 2 + .../priority/PriorityDtpExecutor.java | 24 +++ .../executor/priority/PriorityFutureTask.java | 2 + .../executor/priority/PriorityRunnable.java | 2 + .../spring/DtpBeanDefinitionRegistrar.java | 19 +- .../core/support/ThreadPoolBuilder.java | 35 +++- .../thread/PriorityDtpExecutorStaticTest.java | 193 ++++++++++++++++++ 8 files changed, 260 insertions(+), 19 deletions(-) create mode 100644 test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java index abcc2058b..594a7523a 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/Priority.java @@ -17,6 +17,8 @@ package org.dromara.dynamictp.core.executor.priority; /** + * Priority related + * * @author KamTo Hung */ public interface Priority { diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java index 18701f3d1..f8262ca91 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityCallable.java @@ -21,6 +21,8 @@ import java.util.concurrent.Callable; /** + * PriorityCallable related + * * @author KamTo Hung */ public class PriorityCallable implements Priority, Callable { diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java index 753b8a92e..5f9eb432f 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityDtpExecutor.java @@ -18,7 +18,9 @@ import lombok.extern.slf4j.Slf4j; import org.dromara.dynamictp.core.executor.DtpExecutor; +import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable; +import java.util.Comparator; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -29,6 +31,8 @@ import java.util.concurrent.TimeUnit; /** + * PriorityDtpExecutor related, extending DtpExecutor, implements priority feature + * * @author KamTo Hung */ @Slf4j @@ -155,4 +159,24 @@ public Future submit(Callable task, int priority) { return super.submit(PriorityCallable.of(task, priority)); } + + /** + * Priority Comparator + * + * @return Comparator + */ + public static Comparator getRunnableComparator() { + return (o1, o2) -> { + if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) { + return 0; + } + Runnable po1 = ((DtpRunnable) o1).getOriginRunnable(); + Runnable po2 = ((DtpRunnable) o2).getOriginRunnable(); + if (po1 instanceof Priority && po2 instanceof Priority) { + return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority()); + } + return 0; + }; + } + } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java index f4873cb8f..d9dc03c6f 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityFutureTask.java @@ -21,6 +21,8 @@ import java.util.concurrent.FutureTask; /** + * PriorityFutureTask related + * * @author KamTo Hung */ public class PriorityFutureTask extends FutureTask implements Priority { diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java index 3c0efaaf9..d6eb62450 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/priority/PriorityRunnable.java @@ -20,6 +20,8 @@ import lombok.Getter; /** + * PriorityRunnable related + * * @author KamTo Hung */ public class PriorityRunnable implements Priority, Runnable { diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index 88922e13b..a01caf296 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -28,11 +28,9 @@ import org.dromara.dynamictp.core.executor.NamedThreadFactory; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; -import org.dromara.dynamictp.core.executor.priority.Priority; import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.BinderHelper; -import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.context.EnvironmentAware; @@ -40,7 +38,6 @@ import org.springframework.core.env.Environment; import org.springframework.core.type.AnnotationMetadata; -import java.util.Comparator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -129,7 +126,7 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { if (clazz.equals(EagerDtpExecutor.class)) { taskQueue = new TaskQueue(props.getQueueCapacity()); } else if (clazz.equals(PriorityDtpExecutor.class)) { - taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), getRunnableComparator()); + taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); } else { taskQueue = buildLbq(props.getQueueType(), props.getQueueCapacity(), @@ -148,18 +145,4 @@ private Object[] buildConstructorArgs(Class clazz, DtpExecutorProps props) { }; } - private Comparator getRunnableComparator() { - return (o1, o2) -> { - if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) { - return 0; - } - Runnable po1 = ((DtpRunnable) o1).getOriginRunnable(); - Runnable po2 = ((DtpRunnable) o2).getOriginRunnable(); - if (po1 instanceof Priority && po2 instanceof Priority) { - return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority()); - } - return 0; - }; - } - } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java index c21d7e2f9..3b81eaf89 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java @@ -34,6 +34,7 @@ import org.dromara.dynamictp.core.executor.ScheduledDtpExecutor; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; import org.dromara.dynamictp.core.executor.eager.TaskQueue; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; import org.springframework.util.Assert; @@ -42,6 +43,7 @@ import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -149,6 +151,12 @@ public class ThreadPoolBuilder { */ private boolean scheduled = false; + /** + * If priority thread pool. + * default false, true priority thread pool. + */ + private boolean priority = false; + /** * If pre start all core threads. */ @@ -194,7 +202,8 @@ public class ThreadPoolBuilder { */ private List platformIds = Lists.newArrayList(); - private ThreadPoolBuilder() { } + private ThreadPoolBuilder() { + } public static ThreadPoolBuilder newBuilder() { return new ThreadPoolBuilder(); @@ -340,6 +349,11 @@ public ThreadPoolBuilder scheduled(boolean scheduled) { return this; } + public ThreadPoolBuilder priority(boolean priority) { + this.priority = priority; + return this; + } + public ThreadPoolBuilder preStartAllCoreThreads(boolean preStartAllCoreThreads) { this.preStartAllCoreThreads = preStartAllCoreThreads; return this; @@ -459,6 +473,16 @@ public EagerDtpExecutor buildEager() { return (EagerDtpExecutor) buildDtpExecutor(this); } + /** + * Build priority thread pool executor. + * + * @return the newly created PriorityDtpExecutor instance + */ + public PriorityDtpExecutor buildPriority() { + priority = true; + return (PriorityDtpExecutor) buildDtpExecutor(this); + } + /** * Build thread pool executor and wrapper with ttl * @@ -531,6 +555,15 @@ private DtpExecutor createInternal(ThreadPoolBuilder builder) { builder.workQueue, builder.threadFactory, builder.rejectedExecutionHandler); + } else if (priority) { + dtpExecutor = new PriorityDtpExecutor( + builder.corePoolSize, + builder.maximumPoolSize, + builder.keepAliveTime, + builder.timeUnit, + new PriorityBlockingQueue<>(builder.queueCapacity, PriorityDtpExecutor.getRunnableComparator()), + builder.threadFactory, + builder.rejectedExecutionHandler); } else { dtpExecutor = new DtpExecutor( builder.corePoolSize, diff --git a/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java new file mode 100644 index 000000000..a37301a10 --- /dev/null +++ b/test/test-core/src/test/java/org/dromara/dynamictp/test/core/thread/PriorityDtpExecutorStaticTest.java @@ -0,0 +1,193 @@ +package org.dromara.dynamictp.test.core.thread; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor; +import org.dromara.dynamictp.core.support.ThreadPoolBuilder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * @author KamTo Hung + */ +@Slf4j +public class PriorityDtpExecutorStaticTest { + + private static PriorityDtpExecutor priorityDtpExecutor; + + @BeforeAll + public static void setUp() { + priorityDtpExecutor = ThreadPoolBuilder.newBuilder() + .threadPoolName("dtpExecutor1") + .corePoolSize(1) + .maximumPoolSize(1000) + .keepAliveTime(15000) + .timeUnit(TimeUnit.MILLISECONDS) + .waitForTasksToCompleteOnShutdown(true) + .awaitTerminationSeconds(5) + .runTimeout(10000) + .queueTimeout(10000) + .buildPriority(); + } + + @Test + void execute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void priorityExecute() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.execute(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void submit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch)); + } + countDownLatch.await(); + } + + @Test + void prioritySubmit() throws InterruptedException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + for (int i = count; i > 0; i--) { + priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), i); + } + countDownLatch.await(); + } + + @Test + void submitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void prioritySubmitWithResult() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + String name = "test-" + i; + Future result = priorityDtpExecutor.submit(new TestPriorityRunnable(i, countDownLatch), name, i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void submitCallable() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch)); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + @Test + void prioritySubmitCallable() throws InterruptedException, ExecutionException { + int count = 5; + CountDownLatch countDownLatch = new CountDownLatch(count); + List> list = new ArrayList<>(); + for (int i = count; i > 0; i--) { + Future result = priorityDtpExecutor.submit(new TestPriorityCallable(i, countDownLatch), i); + list.add(result); + } + countDownLatch.await(); + for (Future future : list) { + log.info("result: {}", future.get()); + } + } + + private static class TestPriorityRunnable implements Runnable { + + private final int number; + + private final CountDownLatch countDownLatch; + + public TestPriorityRunnable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + log.info("work-{} completed successfully", number); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); + } + } + } + + private static class TestPriorityCallable implements Callable { + + private final int number; + + private final CountDownLatch countDownLatch; + + private TestPriorityCallable(int number, CountDownLatch countDownLatch) { + this.number = number; + this.countDownLatch = countDownLatch; + } + + @Override + public String call() { + try { + log.info("work-{} triggered successfully", number); + TimeUnit.MILLISECONDS.sleep(10); + return "work-" + number; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + log.info("work-{} completed successfully", number); + countDownLatch.countDown(); + } + } + } + +}