Skip to content

Commit

Permalink
RATIS-2152. GrpcLogAppender stucks while sending an installSnapshot n…
Browse files Browse the repository at this point in the history
…otification request (#1146)
  • Loading branch information
chungen0126 authored Sep 12, 2024
1 parent e6fe8fc commit 9ac832b
Showing 1 changed file with 13 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -576,7 +576,7 @@ private void updateNextIndex(long replyNextIndex) {
private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass());
private final Queue<Integer> pending;
private final AtomicBoolean done = new AtomicBoolean(false);
private final CompletableFuture<Void> done = new CompletableFuture<>();
private final boolean isNotificationOnly;

InstallSnapshotResponseHandler() {
Expand Down Expand Up @@ -637,12 +637,18 @@ void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIn
getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, getFollower().getPeer());
}

boolean isDone() {
return done.get();
void waitForResponse() {
try {
done.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to complete " + name, e);
}
}

void close() {
done.set(true);
done.complete(null);
notifyLogAppender();
}

Expand Down Expand Up @@ -776,14 +782,7 @@ private void installSnapshot(SnapshotInfo snapshot) {
}
return;
}

while (isRunning() && !responseHandler.isDone()) {
try {
getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
responseHandler.waitForResponse();

if (responseHandler.hasAllResponse()) {
getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
Expand Down Expand Up @@ -821,14 +820,7 @@ private void notifyInstallSnapshot(TermIndex firstAvailable) {
}
return;
}

while (isRunning() && !responseHandler.isDone()) {
try {
getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
responseHandler.waitForResponse();
}

/**
Expand Down

0 comments on commit 9ac832b

Please sign in to comment.