Skip to content

Commit

Permalink
RATIS-1877. In JvmPauseMonitor, sleepTime should not be larger than s…
Browse files Browse the repository at this point in the history
…leepDeviationThreshold.
  • Loading branch information
szetszwo committed Aug 25, 2023
1 parent c3f78fa commit 49c2079
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* Detect pauses in JVM causing by GC or other problems in the machine.
*/
public class JvmPauseMonitor {
public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);

static final class GcInfo {
Expand Down Expand Up @@ -87,16 +90,54 @@ static String toString(Map<String, GcInfo> beforeSleep, TimeDuration extraSleepT
+ extraSleepTime.toString(TimeUnit.SECONDS, 3) + gc;
}

private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
/** To build {@link JvmPauseMonitor}. */
public static class Builder {
private Object name = "default";
private TimeDuration sleepDeviationThreshold = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
private TimeDuration sleepTime = sleepDeviationThreshold;
private CheckedConsumer<TimeDuration, IOException> handler = t -> {};

public Builder setName(Object name) {
this.name = name;
return this;
}

public Builder setSleepTime(TimeDuration sleepTime) {
this.sleepTime = sleepTime;
return this;
}

public Builder setSleepDeviationThreshold(TimeDuration sleepDeviationThreshold) {
this.sleepDeviationThreshold = sleepDeviationThreshold;
return this;
}

public Builder setHandler(CheckedConsumer<TimeDuration, IOException> handler) {
this.handler = handler;
return this;
}

public JvmPauseMonitor build() {
return new JvmPauseMonitor(name, sleepTime, sleepDeviationThreshold, handler);
}
}

public static Builder newBuilder() {
return new Builder();
}

private final TimeDuration sleepTime;
private final TimeDuration sleepDeviationThreshold;

private final String name;
private final AtomicReference<Thread> threadRef = new AtomicReference<>();
private final CheckedConsumer<TimeDuration, IOException> handler;

public JvmPauseMonitor(Object name, TimeDuration sleepDeviationThreshold,
private JvmPauseMonitor(Object name, TimeDuration sleepTime, TimeDuration sleepDeviationThreshold,
CheckedConsumer<TimeDuration, IOException> handler) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
// use min -- if the sleep time is too long, it may not be able to detect the given deviation.
this.sleepTime = TimeDuration.min(sleepTime, sleepDeviationThreshold);
this.sleepDeviationThreshold = sleepDeviationThreshold;
this.handler = handler;
}
Expand All @@ -116,7 +157,7 @@ private void detectPause() {
final Map<String, GcInfo> before = getGcTimes();
final TimeDuration extraSleep;
try {
extraSleep = SLEEP_TIME.sleep();
extraSleep = sleepTime.sleep();
} catch (InterruptedException ie) {
return;
}
Expand All @@ -140,7 +181,9 @@ private void handle(TimeDuration extraSleep) {
/** Start this monitor. */
public void start() {
final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> Daemon.newBuilder()
.setName("JvmPauseMonitor" + THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build());
.setName(JavaUtils.getClassSimpleName(getClass()) + THREAD_COUNT.getAndIncrement())
.setRunnable(this::run)
.build());
Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier)))
.filter(t -> supplier.isInitialized())
.ifPresent(Thread::start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;

/**
* Time duration is represented by a long together with a {@link TimeUnit}.
Expand Down Expand Up @@ -59,6 +60,14 @@ public static TimeUnit higherUnit(TimeUnit unit) {
return ordinal == timeUnits.length - 1? unit: timeUnits[ordinal + 1];
}

/** @return the minimum of the given parameters. */
public static TimeDuration min(TimeDuration left, TimeDuration right) {
Objects.requireNonNull(left, "left == null");
Objects.requireNonNull(right, "right == null");
return Stream.of(left, right).min(TimeDuration::compareTo).orElseThrow(
() -> new IllegalStateException("Failed to compute min(" + left + ", " + right + ")"));
}

/** Abbreviations of {@link TimeUnit}. */
public enum Abbreviation {
NANOSECONDS("ns", "nanos"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
class RaftServerProxy implements RaftServer {
/**
* A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
*
* <p>
* The map is synchronized for mutations and the bulk {@link #getGroupIds()}/{@link #getAll()} methods
* but the (non-bulk) {@link #get(RaftGroupId)} and {@link #containsGroup(RaftGroupId)} methods are not.
* The thread safety and atomicity guarantees for the non-bulk methods are provided by {@link ConcurrentMap}.
Expand Down Expand Up @@ -222,8 +222,10 @@ String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {
final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
this.pauseMonitor = new JvmPauseMonitor(id, sleepDeviationThreshold,
extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime));
this.pauseMonitor = JvmPauseMonitor.newBuilder().setName(id)
.setSleepDeviationThreshold(sleepDeviationThreshold)
.setHandler(extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime))
.build();
this.threadGroup = threadGroup == null ? new ThreadGroup(this.id.toString()) : threadGroup;
}

Expand Down

0 comments on commit 49c2079

Please sign in to comment.