From 979b38a7ab09493689bf167e04e521ed18d76d0e Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 16 Sep 2023 00:55:00 +0800 Subject: [PATCH] RATIS-1888. Handle exception of readIndexAsync in gRPC readIndex impl (#920) --- .../java/org/apache/ratis/grpc/GrpcUtil.java | 18 ++++++++++++++---- .../grpc/server/GrpcServerProtocolService.java | 15 +++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java index ee1c28dd31..22653b6efb 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java @@ -17,7 +17,7 @@ */ package org.apache.ratis.grpc; -import org.apache.ratis.protocol.RaftClientReply; +import java.util.function.Consumer; import org.apache.ratis.protocol.exceptions.ServerNotReadyException; import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.security.TlsConf.TrustManagerConf; @@ -47,7 +47,7 @@ import java.util.function.Supplier; public interface GrpcUtil { - static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class); + Logger LOG = LoggerFactory.getLogger(GrpcUtil.class); Metadata.Key EXCEPTION_TYPE_KEY = Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); @@ -163,13 +163,22 @@ static IOException unwrapIOException(Throwable t) { return e; } - static void asyncCall( + static void asyncCall( StreamObserver responseObserver, CheckedSupplier, IOException> supplier, Function toProto) { + asyncCall(responseObserver, supplier, toProto, throwable -> {}); + } + + static void asyncCall( + StreamObserver responseObserver, + CheckedSupplier, IOException> supplier, + Function toProto, + Consumer warning) { try { - supplier.get().whenCompleteAsync((reply, exception) -> { + supplier.get().whenComplete((reply, exception) -> { if (exception != null) { + warning.accept(exception); responseObserver.onError(GrpcUtil.wrapException(exception)); } else { responseObserver.onNext(toProto.apply(reply)); @@ -177,6 +186,7 @@ static void asyncCall( } }); } catch (Exception e) { + warning.accept(e); responseObserver.onError(GrpcUtil.wrapException(e)); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 398f2bc96d..766e14321a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -17,6 +17,8 @@ */ package org.apache.ratis.grpc.server; +import java.util.function.Consumer; +import java.util.function.Function; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; @@ -212,16 +214,9 @@ public void startLeaderElection(StartLeaderElectionRequestProto request, @Override public void readIndex(ReadIndexRequestProto request, StreamObserver responseObserver) { - try { - server.readIndexAsync(request).thenAccept(reply -> { - responseObserver.onNext(reply); - responseObserver.onCompleted(); - }); - } catch (Throwable e) { - GrpcUtil.warn(LOG, - () -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e); - responseObserver.onError(GrpcUtil.wrapException(e)); - } + final Consumer warning = e -> GrpcUtil.warn(LOG, + () -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e); + GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), Function.identity(), warning); } @Override