Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: last committed index in BallotBox #1109

Merged
merged 6 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public boolean init(final BallotBoxOptions opts) {
this.opts = opts;
this.waiter = opts.getWaiter();
this.closureQueue = opts.getClosureQueue();
this.lastCommittedIndex = opts.getLastCommittedIndex();
return true;
}

Expand Down Expand Up @@ -165,6 +166,7 @@ public void clearPendingTasks() {
* committed until a log at the new term becomes committed, so
* |newPendingIndex| should be |last_log_index| + 1.
* @param newPendingIndex pending index of new leader
*
* @return returns true if reset success
*/
public boolean resetPendingIndex(final long newPendingIndex) {
Expand All @@ -180,6 +182,7 @@ public boolean resetPendingIndex(final long newPendingIndex) {
this.opts.getNodeId(), newPendingIndex, this.lastCommittedIndex);
return false;
}

this.pendingIndex = newPendingIndex;
this.closureQueue.resetFirstIndex(newPendingIndex);
return true;
Expand Down
55 changes: 46 additions & 9 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1013,15 +1013,6 @@ protected int adjustTimeout(final int timeoutMs) {
LOG.error("Node {} initFSMCaller failed.", getNodeId());
return false;
}
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
ballotBoxOpts.setNodeId(getNodeId());
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}

if (!initSnapshotStorage()) {
LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
Expand All @@ -1044,6 +1035,12 @@ protected int adjustTimeout(final int timeoutMs) {
this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers());
}

// It must be initialized after initializing conf and log storage.
if (!initBallotBox()) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

if (!this.conf.isEmpty()) {
Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf);
} else {
Expand Down Expand Up @@ -1120,13 +1117,41 @@ protected int adjustTimeout(final int timeoutMs) {
return true;
}

private boolean initBallotBox() {
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
ballotBoxOpts.setNodeId(getNodeId());
// Try to initialize the last committed index in BallotBox to be the last snapshot index.
long lastCommittedIndex = 0;
if (this.snapshotExecutor != null) {
lastCommittedIndex = this.snapshotExecutor.getLastSnapshotIndex();
}
if (this.getQuorum() == 1) {
// It is safe to initiate lastCommittedIndex as last log one because in case of single peer no one will discard
// log records on leader election.
// Fix https://github.com/sofastack/sofa-jraft/issues/1049
lastCommittedIndex = Math.max(lastCommittedIndex, this.logManager.getLastLogIndex());
}

ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex);
LOG.info("Node {} init ballot box's lastCommittedIndex={}.", lastCommittedIndex);
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
return this.ballotBox.init(ballotBoxOpts);
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

@OnlyForTest
void tryElectSelf() {
this.writeLock.lock();
// unlock in electSelf
electSelf();
}

@OnlyForTest
BallotBox getBallotBox() {
return this.ballotBox;
}

// should be in writeLock
private void electSelf() {
long oldTerm;
Expand Down Expand Up @@ -1360,6 +1385,18 @@ public void run(final Status status) {
}

private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
// It's overload, fail-fast
final List<Closure> dones = tasks.stream().map(ele -> ele.done).filter(Objects::nonNull)
.collect(Collectors.toList());
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : dones) {
done.run(new Status(RaftError.EBUSY, "Node %s log manager is busy.", this.getNodeId()));
}
});
return;
}

this.writeLock.lock();
try {
final int size = tasks.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ public class BallotBoxOptions {
private FSMCaller waiter;
private ClosureQueue closureQueue;
private NodeId nodeId;
private long lastCommittedIndex;

public NodeId getNodeId() {
return nodeId;
}

public long getLastCommittedIndex() {
return lastCommittedIndex;
}

public void setLastCommittedIndex(long lastCommittedIndex) {
this.lastCommittedIndex = lastCommittedIndex;
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public interface SnapshotExecutor extends Lifecycle<SnapshotExecutorOptions>, De
*/
void doSnapshot(final Closure done);

/**
* Returns the last snapshot index.
* @return
*/
long getLastSnapshotIndex();

killme2008 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Start to snapshot StateMachine immediately with the latest log applied to state machine.
* You MUST call this method in {@link StateMachine} callback methods to trigger a snapshot synchronously, otherwise throws {@link IllegalStateException}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ public long getLastSnapshotTerm() {
return this.lastSnapshotTerm;
}

/**
* Only for test
*/
@OnlyForTest
public long getLastSnapshotIndex() {
return this.lastSnapshotIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void setup() {
this.closureQueue = new ClosureQueueImpl(GROUP_ID);
opts.setClosureQueue(this.closureQueue);
opts.setWaiter(this.waiter);
opts.setLastCommittedIndex(0);
box = new BallotBox();
assertTrue(box.init(opts));
}
Expand All @@ -60,11 +61,25 @@ public void teardown() {
box.shutdown();
}

@Test
public void initWithLastCommittedIndex() {
BallotBoxOptions opts = new BallotBoxOptions();
this.closureQueue = new ClosureQueueImpl(GROUP_ID);
opts.setClosureQueue(this.closureQueue);
opts.setWaiter(this.waiter);
opts.setLastCommittedIndex(9);
box = new BallotBox();
assertTrue(box.init(opts));

assertEquals(box.getLastCommittedIndex(), 9);
}

@Test
public void testResetPendingIndex() {
assertEquals(0, closureQueue.getFirstIndex());
assertEquals(0, box.getPendingIndex());
assertTrue(box.resetPendingIndex(1));
assertEquals(0, box.getLastCommittedIndex());
assertEquals(1, closureQueue.getFirstIndex());
assertEquals(1, box.getPendingIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package com.alipay.sofa.jraft.storage;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +50,11 @@ public class HybridLogStorage implements LogStorage {
private long thresholdIndex;

public HybridLogStorage(final String path, final StoreOptions storeOptions, final LogStorage oldStorage) {
try {
FileUtils.forceMkdir(new File(path));
} catch (IOException e) {
// ignore
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
final String newLogStoragePath = Paths.get(path, storeOptions.getStoragePath()).toString();
this.newLogStorage = new LogitLogStorage(newLogStoragePath, storeOptions);
this.oldLogStorage = oldStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -2067,6 +2068,8 @@ public void testRestoreSnasphot() throws Exception {
cluster.waitLeader();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(leaderAddr));
final NodeImpl newLeader = (NodeImpl) cluster.getLeader();
assertEquals(newLeader.getBallotBox().getLastCommittedIndex(), newLeader.getLastCommittedIndex());
cluster.ensureSame();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());

Expand Down Expand Up @@ -2886,6 +2889,8 @@ public void readCommittedUserLog() throws Exception {
final Node leader = cluster.getLeader();
assertNotNull(leader);
this.sendTestTaskAndWait(leader);
// Waits for applying to FSM
Thread.sleep(500);

// index == 1 is a CONFIGURATION log, so real_index will be 2 when returned.
UserLog userLog = leader.readCommittedUserLog(1);
Expand Down Expand Up @@ -3402,9 +3407,13 @@ public void testChangePeersChaosApplyTasks() throws Exception {
for (final ChangeArg arg : args) {
arg.stop = true;
}
for (final Future<?> future : futures) {
future.get();
}
for (final Future<?> future : futures) {
try {
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// ignore
}
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

cluster.waitLeader();
final SynchronizedClosure done = new SynchronizedClosure();
Expand Down
Loading