From 49c20792703b0ce4ce2845feb7348a307304b1bf Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 25 Aug 2023 09:47:44 -0700 Subject: [PATCH] RATIS-1877. In JvmPauseMonitor, sleepTime should not be larger than sleepDeviationThreshold. --- .../apache/ratis/util/JvmPauseMonitor.java | 53 +++++++++++++++++-- .../org/apache/ratis/util/TimeDuration.java | 9 ++++ .../ratis/server/impl/RaftServerProxy.java | 8 +-- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java index fac3d0ede9..f46e20ecf9 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java @@ -32,8 +32,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +/** + * Detect pauses in JVM causing by GC or other problems in the machine. + */ public class JvmPauseMonitor { - public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class); + static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class); private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0); static final class GcInfo { @@ -87,16 +90,54 @@ static String toString(Map beforeSleep, TimeDuration extraSleepT + extraSleepTime.toString(TimeUnit.SECONDS, 3) + gc; } - private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); + /** To build {@link JvmPauseMonitor}. */ + public static class Builder { + private Object name = "default"; + private TimeDuration sleepDeviationThreshold = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); + private TimeDuration sleepTime = sleepDeviationThreshold; + private CheckedConsumer handler = t -> {}; + + public Builder setName(Object name) { + this.name = name; + return this; + } + + public Builder setSleepTime(TimeDuration sleepTime) { + this.sleepTime = sleepTime; + return this; + } + + public Builder setSleepDeviationThreshold(TimeDuration sleepDeviationThreshold) { + this.sleepDeviationThreshold = sleepDeviationThreshold; + return this; + } + + public Builder setHandler(CheckedConsumer handler) { + this.handler = handler; + return this; + } + + public JvmPauseMonitor build() { + return new JvmPauseMonitor(name, sleepTime, sleepDeviationThreshold, handler); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + private final TimeDuration sleepTime; private final TimeDuration sleepDeviationThreshold; private final String name; private final AtomicReference threadRef = new AtomicReference<>(); private final CheckedConsumer handler; - public JvmPauseMonitor(Object name, TimeDuration sleepDeviationThreshold, + private JvmPauseMonitor(Object name, TimeDuration sleepTime, TimeDuration sleepDeviationThreshold, CheckedConsumer handler) { this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name; + // use min -- if the sleep time is too long, it may not be able to detect the given deviation. + this.sleepTime = TimeDuration.min(sleepTime, sleepDeviationThreshold); this.sleepDeviationThreshold = sleepDeviationThreshold; this.handler = handler; } @@ -116,7 +157,7 @@ private void detectPause() { final Map before = getGcTimes(); final TimeDuration extraSleep; try { - extraSleep = SLEEP_TIME.sleep(); + extraSleep = sleepTime.sleep(); } catch (InterruptedException ie) { return; } @@ -140,7 +181,9 @@ private void handle(TimeDuration extraSleep) { /** Start this monitor. */ public void start() { final MemoizedSupplier supplier = JavaUtils.memoize(() -> Daemon.newBuilder() - .setName("JvmPauseMonitor" + THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build()); + .setName(JavaUtils.getClassSimpleName(getClass()) + THREAD_COUNT.getAndIncrement()) + .setRunnable(this::run) + .build()); Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier))) .filter(t -> supplier.isInitialized()) .ifPresent(Thread::start); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java index 98fb3694e3..10c5c6bd16 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.LongUnaryOperator; +import java.util.stream.Stream; /** * Time duration is represented by a long together with a {@link TimeUnit}. @@ -59,6 +60,14 @@ public static TimeUnit higherUnit(TimeUnit unit) { return ordinal == timeUnits.length - 1? unit: timeUnits[ordinal + 1]; } + /** @return the minimum of the given parameters. */ + public static TimeDuration min(TimeDuration left, TimeDuration right) { + Objects.requireNonNull(left, "left == null"); + Objects.requireNonNull(right, "right == null"); + return Stream.of(left, right).min(TimeDuration::compareTo).orElseThrow( + () -> new IllegalStateException("Failed to compute min(" + left + ", " + right + ")")); + } + /** Abbreviations of {@link TimeUnit}. */ public enum Abbreviation { NANOSECONDS("ns", "nanos"), diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 89f49210eb..f93120b3ab 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -74,7 +74,7 @@ class RaftServerProxy implements RaftServer { /** * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures. - * + *

* The map is synchronized for mutations and the bulk {@link #getGroupIds()}/{@link #getAll()} methods * but the (non-bulk) {@link #get(RaftGroupId)} and {@link #containsGroup(RaftGroupId)} methods are not. * The thread safety and atomicity guarantees for the non-bulk methods are provided by {@link ConcurrentMap}. @@ -222,8 +222,10 @@ String toString(RaftGroupId groupId, CompletableFuture f) { final TimeDuration sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); final TimeDuration rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties); final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties); - this.pauseMonitor = new JvmPauseMonitor(id, sleepDeviationThreshold, - extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime)); + this.pauseMonitor = JvmPauseMonitor.newBuilder().setName(id) + .setSleepDeviationThreshold(sleepDeviationThreshold) + .setHandler(extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime)) + .build(); this.threadGroup = threadGroup == null ? new ThreadGroup(this.id.toString()) : threadGroup; }