Skip to content

Commit

Permalink
RATIS-1886. AppendLog sleep fixed time cause significant drop in writ…
Browse files Browse the repository at this point in the history
…e throughput.
  • Loading branch information
szetszwo committed Sep 28, 2023
1 parent d461a01 commit 8e0463b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public static TimeDuration min(TimeDuration left, TimeDuration right) {
() -> new IllegalStateException("Failed to compute min(" + left + ", " + right + ")"));
}

/** @return the maximum of the given parameters. */
public static TimeDuration max(TimeDuration left, TimeDuration right) {
Objects.requireNonNull(left, "left == null");
Objects.requireNonNull(right, "right == null");
return Stream.of(left, right).max(TimeDuration::compareTo).orElseThrow(
() -> new IllegalStateException("Failed to compute max(" + left + ", " + right + ")"));
}

/** Abbreviations of {@link TimeUnit}. */
public enum Abbreviation {
NANOSECONDS("ns", "nanos"),
Expand Down Expand Up @@ -304,11 +312,21 @@ public <OUTPUT, THROWABLE extends Throwable> OUTPUT apply(
return function.apply(getDuration(), getUnit());
}

/** @return Is this {@link TimeDuration} negative? */
/** @return Is this {@link TimeDuration} less than zero? */
public boolean isNegative() {
return duration < 0;
}

/** @return Is this {@link TimeDuration} greater than or equal to zero? */
public boolean isNonNegative() {
return duration >= 0;
}

/** @return Is this {@link TimeDuration} greater than zero? */
public boolean isPositive() {
return duration > 0;
}

/** @return Is this {@link TimeDuration} less than or equal to zero? */
public boolean isNonPositive() {
return duration <= 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public long getWaitTimeMs() {
// For normal nodes, new entries should be sent ASAP
// however for slow followers (especially when the follower is down),
// keep sending without any wait time only ends up in high CPU load
return Math.max(getMinWaitTimeMs(), 0L);
return TimeDuration.max(getRemainingWaitTime(), TimeDuration.ZERO).toLong(TimeUnit.MILLISECONDS);
}
return getHeartbeatWaitTimeMs();
}
Expand Down Expand Up @@ -246,14 +246,16 @@ private boolean haveTooManyPendingRequests() {
}

static class StreamObservers {
public static final int DEFAULT_WAIT_FOR_READY_MS = 10;
private final CallStreamObserver<AppendEntriesRequestProto> appendLog;
private final CallStreamObserver<AppendEntriesRequestProto> heartbeat;
private final TimeDuration waitForReady;
private volatile boolean running = true;

StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat) {
StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat,
TimeDuration waitTimeMin) {
this.appendLog = client.appendEntries(handler, false);
this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null;
this.waitForReady = waitTimeMin.isPositive()? waitTimeMin: TimeDuration.ONE_MILLISECOND;
}

void onNext(AppendEntriesRequestProto proto)
Expand All @@ -267,7 +269,7 @@ void onNext(AppendEntriesRequestProto proto)
}
// stall for stream to be ready.
while (!stream.isReady() && running) {
sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat);
sleep(waitForReady, isHeartBeat);
}
stream.onNext(proto);
}
Expand Down Expand Up @@ -307,23 +309,23 @@ private void appendLog(boolean heartbeat) throws IOException {
increaseNextIndex(pending);
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel);
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin());
}
}

final long waitMs = getMinWaitTimeMs();
if (waitMs > 0) {
sleep(waitMs, heartbeat);
final TimeDuration remaining = getRemainingWaitTime();
if (remaining.isPositive()) {
sleep(remaining, heartbeat);
}
if (isRunning()) {
sendRequest(request, pending);
}
}

private static void sleep(long waitMs, boolean heartbeat)
private static void sleep(TimeDuration waitTime, boolean heartbeat)
throws InterruptedIOException {
try {
Thread.sleep(waitMs);
waitTime.sleep();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;

import java.util.Collections;
import java.util.List;
Expand All @@ -56,7 +57,7 @@ public abstract class LogAppenderBase implements LogAppender {
private final AwaitForSignal eventAwaitForSignal;

private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
private final long waitTimeMinMs;
private final TimeDuration waitTimeMin;

protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
Expand All @@ -73,7 +74,7 @@ protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, F
this.daemon = new LogAppenderDaemon(this);
this.eventAwaitForSignal = new AwaitForSignal(name);

this.waitTimeMinMs = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
this.waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
}

@Override
Expand Down Expand Up @@ -136,8 +137,12 @@ void restart() {
getLeaderState().restart(this);
}

public long getMinWaitTimeMs() {
return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
protected TimeDuration getWaitTimeMin() {
return waitTimeMin;
}

protected TimeDuration getRemainingWaitTime() {
return waitTimeMin.add(getFollower().getLastRpcSendTime().elapsedTime().negate());
}

@Override
Expand Down Expand Up @@ -203,7 +208,8 @@ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean he
}

final List<LogEntryProto> protos = buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
(entry, time, exception) -> LOG.warn("Failed to get {} in {}: {}", entry, time, exception));
(entry, time, exception) -> LOG.warn("Failed to get " + entry
+ " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
return leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.stream.Collectors;

import static org.apache.ratis.util.TimeDuration.Abbreviation;
import static org.apache.ratis.util.TimeDuration.LOG;
import static org.apache.ratis.util.TimeDuration.parse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -249,14 +249,35 @@ public void testHigherLower() {

@Test(timeout = 1000)
public void testCompareTo() {
assertTrue(TimeDuration.ONE_SECOND.compareTo(TimeDuration.ONE_MINUTE) < 0);
assertTrue(TimeDuration.ONE_MINUTE.compareTo(TimeDuration.ONE_SECOND) > 0);
assertTimeDurationCompareTo(TimeDuration.ONE_MINUTE, TimeDuration.ONE_SECOND);

assertTrue(TimeDuration.valueOf(15, TimeUnit.SECONDS).compareTo(TimeDuration.ONE_MINUTE) < 0);
assertTrue(TimeDuration.ONE_MINUTE.compareTo(TimeDuration.valueOf(15, TimeUnit.SECONDS)) > 0);
final TimeDuration fifteenSecond = TimeDuration.valueOf(15, TimeUnit.SECONDS);
assertTimeDurationCompareTo(TimeDuration.ONE_DAY, fifteenSecond);

assertEquals(0, TimeDuration.valueOf(60, TimeUnit.SECONDS).compareTo(TimeDuration.ONE_MINUTE));
assertEquals(0, TimeDuration.ONE_MINUTE.compareTo(TimeDuration.valueOf(60, TimeUnit.SECONDS)));
assertTimeDurationEquals(TimeDuration.ONE_MINUTE, fifteenSecond.multiply(4));
assertTimeDurationEquals(TimeDuration.ONE_DAY, TimeDuration.ONE_MINUTE.multiply(60).multiply(24));
}

static void assertTimeDurationEquals(TimeDuration left, TimeDuration right) {
assertEquals(0, left.compareTo(right));
assertEquals(0, right.compareTo(left));
assertEquals(left, right);
assertEquals(right, left);
}

static void assertTimeDurationCompareTo(TimeDuration larger, TimeDuration smaller) {
assertTrue(smaller.compareTo(larger) < 0);
assertTrue(larger.compareTo(smaller) > 0);
assertEquals(smaller, TimeDuration.min(smaller, larger));
assertEquals(smaller, TimeDuration.min(larger, smaller));
assertEquals(larger, TimeDuration.max(smaller, larger));
assertEquals(larger, TimeDuration.max(larger, smaller));

final TimeDuration diff = larger.add(smaller.negate());
assertTrue(diff.isPositive());
assertTrue(diff.isNonNegative());
assertFalse(diff.isNegative());
assertFalse(diff.isNonPositive());
}

private static void assertHigherLower(TimeUnit lower, TimeUnit higher) {
Expand Down

0 comments on commit 8e0463b

Please sign in to comment.