Skip to content

Commit

Permalink
RATIS-1888. Handle exception of readIndexAsync in gRPC readIndex impl (
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored Sep 15, 2023
1 parent b8ce6d1 commit 979b38a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
18 changes: 14 additions & 4 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.ratis.grpc;

import org.apache.ratis.protocol.RaftClientReply;
import java.util.function.Consumer;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.security.TlsConf.TrustManagerConf;
Expand Down Expand Up @@ -47,7 +47,7 @@
import java.util.function.Supplier;

public interface GrpcUtil {
static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);

Metadata.Key<String> EXCEPTION_TYPE_KEY =
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
Expand Down Expand Up @@ -163,20 +163,30 @@ static IOException unwrapIOException(Throwable t) {
return e;
}

static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
static <REPLY, REPLY_PROTO> void asyncCall(
StreamObserver<REPLY_PROTO> responseObserver,
CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
Function<REPLY, REPLY_PROTO> toProto) {
asyncCall(responseObserver, supplier, toProto, throwable -> {});
}

static <REPLY, REPLY_PROTO> void asyncCall(
StreamObserver<REPLY_PROTO> responseObserver,
CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
Function<REPLY, REPLY_PROTO> toProto,
Consumer<Throwable> warning) {
try {
supplier.get().whenCompleteAsync((reply, exception) -> {
supplier.get().whenComplete((reply, exception) -> {
if (exception != null) {
warning.accept(exception);
responseObserver.onError(GrpcUtil.wrapException(exception));
} else {
responseObserver.onNext(toProto.apply(reply));
responseObserver.onCompleted();
}
});
} catch (Exception e) {
warning.accept(e);
responseObserver.onError(GrpcUtil.wrapException(e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.ratis.grpc.server;

import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -212,16 +214,9 @@ public void startLeaderElection(StartLeaderElectionRequestProto request,

@Override
public void readIndex(ReadIndexRequestProto request, StreamObserver<ReadIndexReplyProto> responseObserver) {
try {
server.readIndexAsync(request).thenAccept(reply -> {
responseObserver.onNext(reply);
responseObserver.onCompleted();
});
} catch (Throwable e) {
GrpcUtil.warn(LOG,
() -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e);
responseObserver.onError(GrpcUtil.wrapException(e));
}
final Consumer<Throwable> warning = e -> GrpcUtil.warn(LOG,
() -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e);
GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), Function.identity(), warning);
}

@Override
Expand Down

0 comments on commit 979b38a

Please sign in to comment.