Skip to content

Commit

Permalink
RATIS-2162. When closing leaderState, if the logAppender thread sends…
Browse files Browse the repository at this point in the history
… a snapshot, a deadlock may occur. (#1154)
  • Loading branch information
133tosakarin authored Sep 30, 2024
1 parent acb4d1a commit e3d6736
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ public class ConfigurationManager {
* The current raft configuration. If configurations is not empty, should be
* the last entry of the map. Otherwise is initialConf.
*/
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftConfigurationImpl currentConf;
private RaftConfigurationImpl currentConf;
/** Cache the peer corresponding to {@link #id}. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftPeer currentPeer;
private RaftPeer currentPeer;

ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
this.id = id;
Expand Down Expand Up @@ -78,11 +76,11 @@ private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf)
}
}

RaftConfigurationImpl getCurrent() {
synchronized RaftConfigurationImpl getCurrent() {
return currentConf;
}

RaftPeer getCurrentPeer() {
synchronized RaftPeer getCurrentPeer() {
return currentPeer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
Expand Down Expand Up @@ -62,6 +63,7 @@ int update(AtomicInteger outstanding) {
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Timestamp lastRpcTime = creationTime;
private volatile boolean isRunning = true;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();
private final AtomicInteger outstandingOp = new AtomicInteger();

FollowerState(RaftServerImpl server, Object reason) {
Expand Down Expand Up @@ -93,8 +95,10 @@ boolean isCurrentLeaderValid() {
return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
}

void stopRunning() {
CompletableFuture<Void> stopRunning() {
this.isRunning = false;
interrupt();
return stopped;
}

boolean lostMajorityHeartbeatsRecently() {
Expand Down Expand Up @@ -122,6 +126,14 @@ private boolean shouldRun() {

@Override
public void run() {
try {
runImpl();
} finally {
stopped.complete(null);
}
}

private void runImpl() {
final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
while (shouldRun()) {
final TimeDuration electionTimeout = server.getRandomElectionTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -183,6 +184,7 @@ public String toString() {
private final String name;
private final LifeCycle lifeCycle;
private final Daemon daemon;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();

private final RaftServerImpl server;
private final boolean skipPreVote;
Expand Down Expand Up @@ -223,8 +225,10 @@ private void startIfNew(Runnable starter) {
}
}

void shutdown() {
CompletableFuture<Void> shutdown() {
lifeCycle.checkStateAndClose();
stopped.complete(null);
return stopped;
}

@VisibleForTesting
Expand All @@ -234,6 +238,14 @@ LifeCycle.State getCurrentState() {

@Override
public void run() {
try {
runImpl();
} finally {
stopped.complete(null);
}
}

private void runImpl() {
if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) {
final LifeCycle.State state = lifeCycle.getCurrentState();
LOG.info("{}: skip running since this is already {}", this, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,8 @@ boolean removeAll(Collection<LogAppender> c) {
}

CompletableFuture<Void> stopAll() {
final CompletableFuture<?>[] futures = new CompletableFuture<?>[senders.size()];
for(int i = 0; i < futures.length; i++) {
futures[i] = senders.get(i).stopAsync();
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(senders.stream().
map(LogAppender::stopAsync).toArray(CompletableFuture[]::new));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -566,20 +567,23 @@ void setFirstElection(Object reason) {
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
private synchronized boolean changeToFollower(
long newTerm,
boolean force,
boolean allowListener,
Object reason) {
private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
return metadataUpdated.get();
}

private synchronized CompletableFuture<Void> changeToFollowerAsync(
long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) {
final RaftPeerRole old = role.getCurrentRole();
if (old == RaftPeerRole.LISTENER && !allowListener) {
throw new IllegalStateException("Unexpected role " + old);
}
boolean metadataUpdated;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
role.shutdownLeaderState(false)
future = role.shutdownLeaderState(false)
.exceptionally(e -> {
if (e != null) {
if (!getInfo().isAlive()) {
Expand All @@ -588,21 +592,21 @@ private synchronized boolean changeToFollower(
}
}
throw new CompletionException("Failed to shutdownLeaderState: " + this, e);
})
.join();
});
state.setLeader(null, reason);
} else if (old == RaftPeerRole.CANDIDATE) {
role.shutdownLeaderElection();
future = role.shutdownLeaderElection();
} else if (old == RaftPeerRole.FOLLOWER) {
role.shutdownFollowerState();
future = role.shutdownFollowerState();
}
metadataUpdated = state.updateCurrentTerm(newTerm);

metadataUpdated.set(state.updateCurrentTerm(newTerm));
role.startFollowerState(this, reason);
setFirstElection(reason);
} else {
metadataUpdated = state.updateCurrentTerm(newTerm);
metadataUpdated.set(state.updateCurrentTerm(newTerm));
}
return metadataUpdated;
return future;
}

synchronized void changeToFollowerAndPersistMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RoleInfo {
public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);

private final RaftPeerId id;
private volatile RaftPeerRole role;
private final AtomicReference<RaftPeerRole> role = new AtomicReference<>();
/** Used when the peer is leader */
private final AtomicReference<LeaderStateImpl> leaderState = new AtomicReference<>();
/** Used when the peer is follower, to monitor election timeout */
Expand All @@ -64,7 +64,7 @@ class RoleInfo {
}

void transitionRole(RaftPeerRole newRole) {
this.role = newRole;
this.role.set(newRole);
this.transitionTime.set(Timestamp.currentTime());
}

Expand All @@ -73,7 +73,7 @@ long getRoleElapsedTimeMs() {
}

RaftPeerRole getCurrentRole() {
return role;
return role.get();
}

boolean isLeaderReady() {
Expand Down Expand Up @@ -113,13 +113,13 @@ void startFollowerState(RaftServerImpl server, Object reason) {
updateAndGet(followerState, new FollowerState(server, reason)).start();
}

void shutdownFollowerState() {
CompletableFuture<Void> shutdownFollowerState() {
final FollowerState follower = followerState.getAndSet(null);
if (follower != null) {
LOG.info("{}: shutdown {}", id, follower);
follower.stopRunning();
follower.interrupt();
if (follower == null) {
return CompletableFuture.completedFuture(null);
}
LOG.info("{}: shutdown {}", id, follower);
return follower.stopRunning();
}

void startLeaderElection(RaftServerImpl server, boolean force) {
Expand All @@ -133,13 +133,13 @@ void setLeaderElectionPause(boolean pause) {
pauseLeaderElection.set(pause);
}

void shutdownLeaderElection() {
CompletableFuture<Void> shutdownLeaderElection() {
final LeaderElection election = leaderElection.getAndSet(null);
if (election != null) {
LOG.info("{}: shutdown {}", id, election);
election.shutdown();
// no need to interrupt the election thread
if (election == null) {
return CompletableFuture.completedFuture(null);
}
LOG.info("{}: shutdown {}", id, election);
return election.shutdown();
}

private <T> T updateAndGet(AtomicReference<T> ref, T current) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void start() {

@Override
public boolean isRunning() {
return daemon.isWorking();
return daemon.isWorking() && server.getInfo().isLeader();
}

@Override
Expand Down

0 comments on commit e3d6736

Please sign in to comment.