Skip to content

Commit

Permalink
feat(priority executor): add ThreadPoolBuilder #283
Browse files Browse the repository at this point in the history
  • Loading branch information
KamToHung committed Jan 7, 2024
1 parent 76034e9 commit 683b647
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.dromara.dynamictp.core.executor.priority;

/**
* Priority related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
public interface Priority {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.Callable;

/**
* PriorityCallable related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
public class PriorityCallable<V> implements Priority, Callable<V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable;

import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -29,6 +31,8 @@
import java.util.concurrent.TimeUnit;

/**
* PriorityDtpExecutor related, extending DtpExecutor, implements priority feature
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
@Slf4j
Expand Down Expand Up @@ -155,4 +159,24 @@ public <T> Future<T> submit(Callable<T> task, int priority) {
return super.submit(PriorityCallable.of(task, priority));
}


/**
* Priority Comparator
*
* @return Comparator
*/
public static Comparator<Runnable> getRunnableComparator() {
return (o1, o2) -> {
if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) {
return 0;
}
Runnable po1 = ((DtpRunnable) o1).getOriginRunnable();
Runnable po2 = ((DtpRunnable) o2).getOriginRunnable();
if (po1 instanceof Priority && po2 instanceof Priority) {
return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority());
}
return 0;
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.FutureTask;

/**
* PriorityFutureTask related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
public class PriorityFutureTask<V> extends FutureTask<V> implements Priority {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import lombok.Getter;

/**
* PriorityRunnable related
*
* @author <a href = "mailto:[email protected]">KamTo Hung</a>
*/
public class PriorityRunnable implements Priority, Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,16 @@
import org.dromara.dynamictp.core.executor.NamedThreadFactory;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.core.executor.priority.Priority;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.BinderHelper;
import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;

import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
Expand Down Expand Up @@ -129,7 +126,7 @@ private Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) {
if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(props.getQueueCapacity());
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), getRunnableComparator());
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else {
taskQueue = buildLbq(props.getQueueType(),
props.getQueueCapacity(),
Expand All @@ -148,18 +145,4 @@ private Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) {
};
}

private Comparator<Runnable> getRunnableComparator() {
return (o1, o2) -> {
if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) {
return 0;
}
Runnable po1 = ((DtpRunnable) o1).getOriginRunnable();
Runnable po2 = ((DtpRunnable) o2).getOriginRunnable();
if (po1 instanceof Priority && po2 instanceof Priority) {
return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority());
}
return 0;
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.dromara.dynamictp.core.executor.ScheduledDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
import org.springframework.util.Assert;
Expand All @@ -42,6 +43,7 @@
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -149,6 +151,12 @@ public class ThreadPoolBuilder {
*/
private boolean scheduled = false;

/**
* If priority thread pool.
* default false, true priority thread pool.
*/
private boolean priority = false;

/**
* If pre start all core threads.
*/
Expand Down Expand Up @@ -194,7 +202,8 @@ public class ThreadPoolBuilder {
*/
private List<String> platformIds = Lists.newArrayList();

private ThreadPoolBuilder() { }
private ThreadPoolBuilder() {
}

public static ThreadPoolBuilder newBuilder() {
return new ThreadPoolBuilder();
Expand Down Expand Up @@ -340,6 +349,11 @@ public ThreadPoolBuilder scheduled(boolean scheduled) {
return this;
}

public ThreadPoolBuilder priority(boolean priority) {
this.priority = priority;
return this;
}

public ThreadPoolBuilder preStartAllCoreThreads(boolean preStartAllCoreThreads) {
this.preStartAllCoreThreads = preStartAllCoreThreads;
return this;
Expand Down Expand Up @@ -459,6 +473,16 @@ public EagerDtpExecutor buildEager() {
return (EagerDtpExecutor) buildDtpExecutor(this);
}

/**
* Build priority thread pool executor.
*
* @return the newly created PriorityDtpExecutor instance
*/
public PriorityDtpExecutor buildPriority() {
priority = true;
return (PriorityDtpExecutor) buildDtpExecutor(this);
}

/**
* Build thread pool executor and wrapper with ttl
*
Expand Down Expand Up @@ -531,6 +555,15 @@ private DtpExecutor createInternal(ThreadPoolBuilder builder) {
builder.workQueue,
builder.threadFactory,
builder.rejectedExecutionHandler);
} else if (priority) {
dtpExecutor = new PriorityDtpExecutor(
builder.corePoolSize,
builder.maximumPoolSize,
builder.keepAliveTime,
builder.timeUnit,
new PriorityBlockingQueue<>(builder.queueCapacity, PriorityDtpExecutor.getRunnableComparator()),
builder.threadFactory,
builder.rejectedExecutionHandler);
} else {
dtpExecutor = new DtpExecutor(
builder.corePoolSize,
Expand Down
Loading

0 comments on commit 683b647

Please sign in to comment.