Skip to content

Commit

Permalink
Merge pull request #375 from dromara/dev
Browse files Browse the repository at this point in the history
[ISSUE #374] try interrupt current thread when task run timeout
  • Loading branch information
yanhom1314 authored Dec 20, 2023
2 parents ce754f4 + c9d8c8a commit 836934f
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ private DynamicTpConst() { }

public static final String RUN_TIMEOUT = "runTimeout";

public static final String TRY_INTERRUPT_WHEN_TIMEOUT = "tryInterrupt";

public static final String QUEUE_TIMEOUT = "queueTimeout";

public static final String TASK_WRAPPERS = "taskWrappers";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class TpExecutorProps {
*/
private long runTimeout = 0;

/**
* If try interrupt task when timeout.
*/
private boolean tryInterrupt = false;

/**
* Task queue wait timeout, unit (ms), just for statistics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ protected void refresh(TpExecutorProps props, ThreadPoolStatProvider statProvide
if (Objects.nonNull(props)) {
statProvider.setRunTimeout(props.getRunTimeout());
statProvider.setQueueTimeout(props.getQueueTimeout());
statProvider.setTryInterrupt(props.isTryInterrupt());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public class DtpExecutor extends ThreadPoolExecutor
*/
private long runTimeout = 0;

/**
* for manual builder thread pools only
*/
private boolean tryInterrupt = false;

/**
* for manual builder thread pools only
*/
Expand Down Expand Up @@ -355,22 +360,30 @@ public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}

public long getRunTimeout() {
return runTimeout;
}

public void setRunTimeout(long runTimeout) {
this.runTimeout = runTimeout;
}

public void setQueueTimeout(long queueTimeout) {
this.queueTimeout = queueTimeout;
public boolean isTryInterrupt() {
return tryInterrupt;
}

public long getRunTimeout() {
return runTimeout;
public void setTryInterrupt(boolean tryInterrupt) {
this.tryInterrupt = tryInterrupt;
}

public long getQueueTimeout() {
return queueTimeout;
}

public void setQueueTimeout(long queueTimeout) {
this.queueTimeout = queueTimeout;
}

public boolean isWaitForTasksToCompleteOnShutdown() {
return waitForTasksToCompleteOnShutdown;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.SpringBeanHelper;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.ExecutorType;
import org.dromara.dynamictp.core.executor.NamedThreadFactory;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.BinderHelper;
import org.dromara.dynamictp.core.executor.ExecutorType;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
Expand All @@ -55,6 +55,7 @@
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TASK_WRAPPERS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_ALIAS_NAME;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_NAME;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRY_INTERRUPT_WHEN_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN;
import static org.dromara.dynamictp.common.em.QueueTypeEnum.buildLbq;
import static org.dromara.dynamictp.common.entity.NotifyItem.mergeAllNotifyItems;
Expand Down Expand Up @@ -104,6 +105,7 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
propertyValues.put(NOTIFY_ITEMS, notifyItems);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public class ThreadPoolBuilder {
*/
private long runTimeout = 0;

/**
* If try interrupt task when timeout.
*/
private boolean tryInterrupt = false;

/**
* Task queue wait timeout, unit (ms), just for statistics.
*/
Expand Down Expand Up @@ -355,6 +360,11 @@ public ThreadPoolBuilder runTimeout(long runTimeout) {
return this;
}

public ThreadPoolBuilder tryInterrupt(boolean tryInterrupt) {
this.tryInterrupt = tryInterrupt;
return this;
}

public ThreadPoolBuilder queueTimeout(long queueTimeout) {
this.queueTimeout = queueTimeout;
return this;
Expand Down Expand Up @@ -480,6 +490,7 @@ private DtpExecutor buildDtpExecutor(ThreadPoolBuilder builder) {
dtpExecutor.setPreStartAllCoreThreads(builder.preStartAllCoreThreads);
dtpExecutor.setRejectEnhanced(builder.rejectEnhanced);
dtpExecutor.setRunTimeout(builder.runTimeout);
dtpExecutor.setTryInterrupt(builder.tryInterrupt);
dtpExecutor.setQueueTimeout(builder.queueTimeout);
dtpExecutor.setTaskWrappers(builder.taskWrappers);
dtpExecutor.setNotifyItems(builder.notifyItems);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public class ThreadPoolStatProvider {
*/
private long runTimeout = 0;

/**
* Try interrupt task when timeout.
*/
private boolean tryInterrupt = false;

/**
* Task queue wait timeout, unit (ms), just for statistics.
*/
Expand Down Expand Up @@ -98,6 +103,7 @@ public static ThreadPoolStatProvider of(ExecutorWrapper executorWrapper) {
val dtpExecutor = (DtpExecutor) executorWrapper.getExecutor();
provider.setRunTimeout(dtpExecutor.getRunTimeout());
provider.setQueueTimeout(dtpExecutor.getQueueTimeout());
provider.setTryInterrupt(dtpExecutor.isTryInterrupt());
}
return provider;
}
Expand All @@ -114,6 +120,14 @@ public void setRunTimeout(long runTimeout) {
this.runTimeout = runTimeout;
}

public boolean isTryInterrupt() {
return tryInterrupt;
}

public void setTryInterrupt(boolean tryInterrupt) {
this.tryInterrupt = tryInterrupt;
}

public long getQueueTimeout() {
return queueTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ protected void doRun() {
statProvider.getExecutorWrapper().getExecutor().getQueueCapacity(), executor.getQueue().size(),
executor.getQueue().remainingCapacity(), traceToString(thread.getStackTrace()));
log.warn(logMsg);
if (statProvider.isTryInterrupt()) {
thread.interrupt();
}
}

public String traceToString(StackTraceElement[] trace) {
Expand Down

0 comments on commit 836934f

Please sign in to comment.