Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Mar 31, 2023
1 parent fa802ae commit ca733a9
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@

import com.dtp.adapter.common.AbstractDtpAdapter;
import com.dtp.common.ApplicationContextHolder;
import com.dtp.common.entity.TpExecutorProps;
import com.dtp.common.entity.TpMainFields;
import com.dtp.common.entity.ThreadPoolStats;
import com.dtp.common.properties.DtpProperties;
import com.dtp.core.convert.ExecutorConverter;
import com.dtp.core.support.ExecutorWrapper;
import com.dtp.common.entity.ThreadPoolStats;
import com.dtp.core.thread.ExecutorAdapter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -88,13 +85,6 @@ protected ExecutorAdapter<A> getExecutor() {
return (ExecutorAdapter<A>) wrapper.getExecutor();
}

@Override
protected TpMainFields getTpMainFields(ExecutorWrapper executorWrapper, TpExecutorProps props) {
final ExecutorAdapter<?> adapter = executorWrapper.getExecutor();
return ExecutorConverter.ofSimple(executorWrapper.getThreadPoolName(), adapter.getCorePoolSize(),
adapter.getMaximumPoolSize(), adapter.getKeepAliveTime(props.getUnit()));
}

/**
* Get thread pool executor wrapper.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.dtp.adapter.webserver;

import com.dtp.common.entity.ThreadPoolStats;
import com.dtp.common.properties.DtpProperties;
import com.dtp.common.util.ReflectionUtil;
import com.dtp.core.support.ExecutorWrapper;
import com.dtp.common.entity.ThreadPoolStats;
import com.dtp.core.thread.ExecutorAdapter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.MonitoredQueuedThreadPool;
Expand Down Expand Up @@ -144,13 +144,5 @@ public long getKeepAliveTime(TimeUnit unit) {
}
return 0;
}

@Override
public void setKeepAliveTime(long time, TimeUnit unit) {
if (this.executor instanceof QueuedThreadPool) {
final int keepAliveMs = (int) TimeUnit.MILLISECONDS.convert(time, unit);
((QueuedThreadPool) this.executor).setIdleTimeout(keepAliveMs);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
* <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
* and Hierarchical Timing Wheels: data structures to efficiently implement a
* timer facility'</a>. More comprehensive slides are located
* <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.<br/>
* <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.<br>
* Copy from dubbo, see <a href="https://github.com/apache/dubbo/blob/3.2/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java">here</a> for more details.
*/
@Slf4j
Expand Down Expand Up @@ -394,7 +394,7 @@ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
}

/**
* Returns the number of pending timeouts of this {@link Timer}.
* @return the number of pending timeouts of this {@link Timer}.
*/
public long pendingTimeouts() {
return pendingTimeouts.get();
Expand Down
10 changes: 9 additions & 1 deletion common/src/main/java/com/dtp/common/timer/Timeout.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
package com.dtp.common.timer;

/**
* A handle associated with a {@link TimerTask} that is returned by a{@link Timer}.<br/>
* A handle associated with a {@link TimerTask} that is returned by a{@link Timer}.<br>
* Copy from dubbo, see <a href="https://github.com/apache/dubbo/blob/3.2/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timeout.java">here</a> for more details.
*/
public interface Timeout {

/**
* Returns the {@link Timer} that created this handle.
*
* @return the {@link Timer} that created this handle
*/
Timer timer();

/**
* Returns the {@link TimerTask} which is associated with this handle.
*
* @return the {@link TimerTask} which is associated with this handle
*/
TimerTask task();

/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been expired.
*
* @return {@code true} if and only if the {@link TimerTask} associated
*/
boolean isExpired();

/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been cancelled.
*
* @return {@code true} if and only if the {@link TimerTask} associated
*/
boolean isCancelled();

Expand Down
3 changes: 3 additions & 0 deletions common/src/main/java/com/dtp/common/timer/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public interface Timer {
/**
* Schedules the specified {@link TimerTask} for one-time execution after
* the specified delay.
* @param task the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
*
* @return a handle which is associated with the specified task
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
Expand Down
3 changes: 2 additions & 1 deletion common/src/main/java/com/dtp/common/timer/TimerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}. <br/>
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}. <br>
* Copy from dubbo, see <a href="https://github.com/apache/dubbo/blob/3.2/dubbo-common/src/main/java/org/apache/dubbo/common/timer/TimeTask.java">here</a> for more details.
*/
public interface TimerTask {
Expand All @@ -14,6 +14,7 @@ public interface TimerTask {
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout a handle which is associated with this task
* @throws Exception if an error occurs
*/
void run(Timeout timeout) throws Exception;
}
11 changes: 4 additions & 7 deletions core/src/main/java/com/dtp/core/reject/RejectedAware.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.dtp.core.notify.manager.AlarmManager;
import com.dtp.core.support.runnable.DtpRunnable;
import com.dtp.core.support.runnable.NamedRunnable;
import com.dtp.core.thread.DtpExecutor;
import lombok.val;
import org.slf4j.Logger;
import org.slf4j.MDC;

Expand All @@ -29,19 +29,16 @@ public interface RejectedAware {
*/
default void beforeReject(Runnable runnable, ThreadPoolExecutor executor, Logger log) {
if (executor instanceof DtpExecutor) {
if (runnable instanceof DtpRunnable) {
((DtpRunnable) runnable).cancelQueueTimeoutTask();
log.warn("DynamicTp execute, thread pool is exhausted, cancel queue timeout task, traceId: {}", MDC.get(TRACE_ID));
}
val dtpRunnable = (DtpRunnable) runnable;
dtpRunnable.cancelQueueTimeoutTask();
DtpExecutor dtpExecutor = (DtpExecutor) executor;
dtpExecutor.incRejectCount(1);
AlarmManager.doAlarmAsync(dtpExecutor, REJECT);
String taskName = (runnable instanceof NamedRunnable) ? ((NamedRunnable) runnable).getName() : null;
log.warn("DynamicTp execute, thread pool is exhausted, tpName: {}, taskName: {}, traceId: {}, " +
"poolSize: {} (active: {}, core: {}, max: {}, largest: {}), " +
"task: {} (completed: {}), queueCapacity: {}, (currSize: {}, remaining: {}), " +
"executorStatus: (isShutdown: {}, isTerminated: {}, isTerminating: {})",
dtpExecutor.getThreadPoolName(), taskName, MDC.get(TRACE_ID), executor.getPoolSize(),
dtpExecutor.getThreadPoolName(), dtpRunnable.getTaskName(), MDC.get(TRACE_ID), executor.getPoolSize(),
executor.getActiveCount(), executor.getCorePoolSize(), executor.getMaximumPoolSize(),
executor.getLargestPoolSize(), executor.getTaskCount(), executor.getCompletedTaskCount(),
dtpExecutor.getQueueCapacity(), dtpExecutor.getQueue().size(), executor.getQueue().remainingCapacity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@
import com.dtp.core.thread.DtpExecutor;
import lombok.extern.slf4j.Slf4j;

/**
* A timer task used to handle queued timeout.
*
* @author kamtohung
**/
@Slf4j
public class QueueTimeoutTimerTask implements TimerTask {

private final DtpExecutor dtpExecutor;

private final DtpRunnable runnable;

public QueueTimeoutTimerTask(DtpExecutor dtpExecutor,
DtpRunnable runnable) {
public QueueTimeoutTimerTask(DtpExecutor dtpExecutor, DtpRunnable runnable) {
this.dtpExecutor = dtpExecutor;
this.runnable = runnable;
}
Expand All @@ -28,5 +32,4 @@ public void run(Timeout timeout) {
log.warn("DynamicTp execute, queue timeout, tpName: {}, taskName: {}, traceId: {}",
dtpExecutor.getThreadPoolName(), runnable.getTaskName(), runnable.getTraceId());
}

}
14 changes: 8 additions & 6 deletions core/src/main/java/com/dtp/core/timer/RunTimeoutTimerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import com.dtp.core.thread.DtpExecutor;
import lombok.extern.slf4j.Slf4j;


/**
* A timer task used to handle run timeout.
*
* @author kamtohung
**/
@Slf4j
public class RunTimeoutTimerTask implements TimerTask {

Expand All @@ -18,9 +22,7 @@ public class RunTimeoutTimerTask implements TimerTask {

private final Thread thread;

public RunTimeoutTimerTask(DtpExecutor dtpExecutor,
DtpRunnable runnable,
Thread thread) {
public RunTimeoutTimerTask(DtpExecutor dtpExecutor, DtpRunnable runnable, Thread thread) {
this.dtpExecutor = dtpExecutor;
this.runnable = runnable;
this.thread = thread;
Expand All @@ -31,7 +33,8 @@ public void run(Timeout timeout) {
dtpExecutor.getRunTimeoutCount().increment();
AlarmManager.doAlarmAsync(dtpExecutor, NotifyItemEnum.RUN_TIMEOUT);
log.warn("DynamicTp execute, run timeout, tpName: {}, taskName: {}, traceId: {}, stackTrace: {}",
dtpExecutor.getThreadPoolName(), runnable.getTaskName(), runnable.getTraceId(), traceToString(thread.getStackTrace()));
dtpExecutor.getThreadPoolName(), runnable.getTaskName(),
runnable.getTraceId(), traceToString(thread.getStackTrace()));
}


Expand All @@ -43,5 +46,4 @@ public String traceToString(StackTraceElement[] trace) {
}
return builder.toString();
}

}

0 comments on commit ca733a9

Please sign in to comment.