Skip to content

Commit

Permalink
RATIS-1868. Handling Netty back pressure when streaming ratis log (#900)
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame authored Aug 9, 2023
1 parent ea6faa4 commit f3a5d4d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
Expand All @@ -45,6 +46,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -121,6 +123,7 @@ private GrpcServerProtocolClient getClient() throws IOException {
private void resetClient(AppendEntriesRequest request, boolean onError) {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
getClient().resetConnectBackoff();
appendLogRequestObserver.stop();
appendLogRequestObserver = null;
firstResponseReceived = false;
// clear the pending requests queue and reset the next index of follower
Expand Down Expand Up @@ -241,20 +244,34 @@ private boolean haveTooManyPendingRequests() {
}

static class StreamObservers {
private final StreamObserver<AppendEntriesRequestProto> appendLog;
private final StreamObserver<AppendEntriesRequestProto> heartbeat;
public static final int DEFAULT_WAIT_FOR_READY_MS = 10;
private final CallStreamObserver<AppendEntriesRequestProto> appendLog;
private final CallStreamObserver<AppendEntriesRequestProto> heartbeat;
private volatile boolean running = true;

StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat) {
this.appendLog = client.appendEntries(handler, false);
this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null;
}

void onNext(AppendEntriesRequestProto proto) {
if (heartbeat != null && proto.getEntriesCount() == 0) {
heartbeat.onNext(proto);
void onNext(AppendEntriesRequestProto proto)
throws InterruptedIOException {
CallStreamObserver<AppendEntriesRequestProto> stream;
boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0;
if (isHeartBeat) {
stream = heartbeat;
} else {
appendLog.onNext(proto);
stream = appendLog;
}
// stall for stream to be ready.
while (!stream.isReady() && running) {
sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat);
}
stream.onNext(proto);
}

void stop() {
running = false;
}

void onCompleted() {
Expand Down Expand Up @@ -294,31 +311,34 @@ private void appendLog(boolean heartbeat) throws IOException {

final long waitMs = getMinWaitTimeMs();
if (waitMs > 0) {
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(
"Interrupted appendLog, heartbeat? " + heartbeat, e);
}
sleep(waitMs, heartbeat);
}
if (isRunning()) {
sendRequest(request, pending);
}
}

private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto proto) {
private static void sleep(long waitMs, boolean heartbeat)
throws InterruptedIOException {
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(
"Interrupted appendLog, heartbeat? " + heartbeat, e);
}
}

private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto) throws InterruptedIOException {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
getServer().getId(), null, proto);
resetHeartbeatTrigger();
final boolean sent = Optional.ofNullable(appendLogRequestObserver)
.map(observer -> {
request.startRequestTimer();
observer.onNext(proto);
return true;
}).isPresent();

if (sent) {

StreamObservers observers = appendLogRequestObserver;
if (observers != null) {
request.startRequestTimer();
observers.onNext(proto);
getFollower().updateLastRpcSendTime(request.isHeartbeat());
scheduler.onTimeout(requestTimeoutDuration,
() -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
Expand Down Expand Up @@ -128,12 +129,12 @@ void readIndex(ReadIndexRequestProto request, StreamObserver<ReadIndexReplyProto
.readIndex(request, s);
}

StreamObserver<AppendEntriesRequestProto> appendEntries(
CallStreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseHandler, boolean isHeartbeat) {
if (isHeartbeat && useSeparateHBChannel) {
return hbAsyncStub.appendEntries(responseHandler);
return (CallStreamObserver<AppendEntriesRequestProto>) hbAsyncStub.appendEntries(responseHandler);
} else {
return asyncStub.appendEntries(responseHandler);
return (CallStreamObserver<AppendEntriesRequestProto>) asyncStub.appendEntries(responseHandler);
}
}

Expand Down

0 comments on commit f3a5d4d

Please sign in to comment.