From 0986c0edcca8f443e0014477e89d34056f9d82f7 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 15 Sep 2023 15:21:50 +0800 Subject: [PATCH] apply warning for callback also Signed-off-by: tison --- .../src/main/java/org/apache/ratis/grpc/GrpcUtil.java | 5 +++-- .../ratis/grpc/server/GrpcServerProtocolService.java | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java index fd198e6d88..22653b6efb 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java @@ -167,17 +167,18 @@ static void asyncCall( StreamObserver responseObserver, CheckedSupplier, IOException> supplier, Function toProto) { - asyncCall(responseObserver, supplier, toProto, e -> {}); + asyncCall(responseObserver, supplier, toProto, throwable -> {}); } static void asyncCall( StreamObserver responseObserver, CheckedSupplier, IOException> supplier, Function toProto, - Consumer warning) { + Consumer warning) { try { supplier.get().whenComplete((reply, exception) -> { if (exception != null) { + warning.accept(exception); responseObserver.onError(GrpcUtil.wrapException(exception)); } else { responseObserver.onNext(toProto.apply(reply)); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 5bfc3562ea..766e14321a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.grpc.server; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.protocol.RaftPeerId; @@ -213,9 +214,9 @@ public void startLeaderElection(StartLeaderElectionRequestProto request, @Override public void readIndex(ReadIndexRequestProto request, StreamObserver responseObserver) { - GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), Function.identity(), - e -> GrpcUtil.warn(LOG, - () -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e)); + final Consumer warning = e -> GrpcUtil.warn(LOG, + () -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e); + GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), Function.identity(), warning); } @Override