diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 3544f3be1b..d3bf28dee1 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -34,6 +34,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -121,6 +123,7 @@ private GrpcServerProtocolClient getClient() throws IOException { private void resetClient(AppendEntriesRequest request, boolean onError) { try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { getClient().resetConnectBackoff(); + appendLogRequestObserver.stop(); appendLogRequestObserver = null; firstResponseReceived = false; // clear the pending requests queue and reset the next index of follower @@ -241,20 +244,34 @@ private boolean haveTooManyPendingRequests() { } static class StreamObservers { - private final StreamObserver appendLog; - private final StreamObserver heartbeat; + public static final int DEFAULT_WAIT_FOR_READY_MS = 10; + private final CallStreamObserver appendLog; + private final CallStreamObserver heartbeat; + private volatile boolean running = true; StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat) { this.appendLog = client.appendEntries(handler, false); this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null; } - void onNext(AppendEntriesRequestProto proto) { - if (heartbeat != null && proto.getEntriesCount() == 0) { - heartbeat.onNext(proto); + void onNext(AppendEntriesRequestProto proto) + throws InterruptedIOException { + CallStreamObserver stream; + boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0; + if (isHeartBeat) { + stream = heartbeat; } else { - appendLog.onNext(proto); + stream = appendLog; } + // stall for stream to be ready. + while (!stream.isReady() && running) { + sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat); + } + stream.onNext(proto); + } + + void stop() { + running = false; } void onCompleted() { @@ -294,31 +311,34 @@ private void appendLog(boolean heartbeat) throws IOException { final long waitMs = getMinWaitTimeMs(); if (waitMs > 0) { - try { - Thread.sleep(waitMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw IOUtils.toInterruptedIOException( - "Interrupted appendLog, heartbeat? " + heartbeat, e); - } + sleep(waitMs, heartbeat); } if (isRunning()) { sendRequest(request, pending); } } - private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto proto) { + private static void sleep(long waitMs, boolean heartbeat) + throws InterruptedIOException { + try { + Thread.sleep(waitMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException( + "Interrupted appendLog, heartbeat? " + heartbeat, e); + } + } + + private void sendRequest(AppendEntriesRequest request, + AppendEntriesRequestProto proto) throws InterruptedIOException { CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, getServer().getId(), null, proto); resetHeartbeatTrigger(); - final boolean sent = Optional.ofNullable(appendLogRequestObserver) - .map(observer -> { - request.startRequestTimer(); - observer.onNext(proto); - return true; - }).isPresent(); - - if (sent) { + + StreamObservers observers = appendLogRequestObserver; + if (observers != null) { + request.startRequestTimer(); + observers.onNext(proto); getFollower().updateLastRpcSendTime(request.isHeartbeat()); scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index 34f014ebc2..2eb73f6aa0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc; @@ -128,12 +129,12 @@ void readIndex(ReadIndexRequestProto request, StreamObserver appendEntries( + CallStreamObserver appendEntries( StreamObserver responseHandler, boolean isHeartbeat) { if (isHeartbeat && useSeparateHBChannel) { - return hbAsyncStub.appendEntries(responseHandler); + return (CallStreamObserver) hbAsyncStub.appendEntries(responseHandler); } else { - return asyncStub.appendEntries(responseHandler); + return (CallStreamObserver) asyncStub.appendEntries(responseHandler); } }