diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 1da7bb3b47..7edb3ae0b7 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -54,8 +54,8 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -576,7 +576,7 @@ private void updateNextIndex(long replyNextIndex) { private class InstallSnapshotResponseHandler implements StreamObserver { private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass()); private final Queue pending; - private final AtomicBoolean done = new AtomicBoolean(false); + private final CompletableFuture done = new CompletableFuture<>(); private final boolean isNotificationOnly; InstallSnapshotResponseHandler() { @@ -637,12 +637,18 @@ void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIn getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, getFollower().getPeer()); } - boolean isDone() { - return done.get(); + void waitForResponse() { + try { + done.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new IllegalStateException("Failed to complete " + name, e); + } } void close() { - done.set(true); + done.complete(null); notifyLogAppender(); } @@ -776,14 +782,7 @@ private void installSnapshot(SnapshotInfo snapshot) { } return; } - - while (isRunning() && !responseHandler.isDone()) { - try { - getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } + responseHandler.waitForResponse(); if (responseHandler.hasAllResponse()) { getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex()); @@ -821,14 +820,7 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) { } return; } - - while (isRunning() && !responseHandler.isDone()) { - try { - getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } + responseHandler.waitForResponse(); } /**