Skip to content

Commit

Permalink
apply warning for callback also
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Sep 15, 2023
1 parent 90bed54 commit 0986c0e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
5 changes: 3 additions & 2 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,18 @@ static <REPLY, REPLY_PROTO> void asyncCall(
StreamObserver<REPLY_PROTO> responseObserver,
CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
Function<REPLY, REPLY_PROTO> toProto) {
asyncCall(responseObserver, supplier, toProto, e -> {});
asyncCall(responseObserver, supplier, toProto, throwable -> {});
}

static <REPLY, REPLY_PROTO> void asyncCall(
StreamObserver<REPLY_PROTO> responseObserver,
CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
Function<REPLY, REPLY_PROTO> toProto,
Consumer<Exception> warning) {
Consumer<Throwable> warning) {
try {
supplier.get().whenComplete((reply, exception) -> {
if (exception != null) {
warning.accept(exception);
responseObserver.onError(GrpcUtil.wrapException(exception));
} else {
responseObserver.onNext(toProto.apply(reply));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.grpc.server;

import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.RaftPeerId;
Expand Down Expand Up @@ -213,9 +214,9 @@ public void startLeaderElection(StartLeaderElectionRequestProto request,

@Override
public void readIndex(ReadIndexRequestProto request, StreamObserver<ReadIndexReplyProto> responseObserver) {
GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), Function.identity(),
e -> GrpcUtil.warn(LOG,
() -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e));
final Consumer<Throwable> warning = e -> GrpcUtil.warn(LOG,
() -> getId() + ": Failed readIndex " + ProtoUtils.toString(request.getServerRequest()), e);
GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), Function.identity(), warning);
}

@Override
Expand Down

0 comments on commit 0986c0e

Please sign in to comment.