Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
yfei-z committed Oct 11, 2024
1 parent cf80231 commit d4079f3
Showing 1 changed file with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.election.BaseElection;
import org.jgroups.protocols.raft.election.VoteResponse;
import org.jgroups.tests.harness.BaseRaftElectionTest;
import org.jgroups.util.Util;

import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static java.util.Arrays.stream;
Expand All @@ -32,6 +40,8 @@
@Test(groups = Global.FUNCTIONAL, singleThreaded = true, dataProvider = ALL_ELECTION_CLASSES_PROVIDER)
public class NetworkPartitionChannelTest extends BaseRaftElectionTest.ChannelBased {
private final int[] indexes;
private volatile Semaphore newTerm;

{
clusterSize = 5;
indexes = IntStream.range(0, clusterSize).toArray();
Expand Down Expand Up @@ -64,7 +74,25 @@ public void electionAfterMerge(Class<?> ignore) throws Exception {
System.out.println("partition1: " + view(leader));
System.out.println("partition2: " + view(coord));

// block the new coordinator to advance the term in voting thread
newTerm = new Semaphore(0);

merge(leader, coord);

// since the term is not advanced yet, new coordinator has accepted the existing leader, and stopping the
// voting thread, but voting thread is just interrupted, it's still running, the term will be advanced anyway,
// and the VoteRequest will be sent, if the voting process goes wrong, e.g. waiting response timeout then it
// won't retry the voting process since the thread has been interrupted.
waitUntilLeaderElected(3000, indexes);
System.out.println(dumpLeaderAndTerms());

// try to make waiting VoteResponse timeout
election(channel(coord)).voteTimeout(1);

// unblock the voting thread
newTerm.release();
newTerm = null;

int finalLeader = leader;
assertThat(eventually(() -> coordIndex(finalLeader) == coord && coordIndex(coord) == coord, 10, TimeUnit.SECONDS))
.isTrue();
Expand All @@ -75,6 +103,18 @@ public void electionAfterMerge(Class<?> ignore) throws Exception {
System.out.println(dumpLeaderAndTerms());
}

@Override
protected RAFT newRaftInstance() {
return new RAFT() {
@Override
public long createNewTerm() {
Semaphore s = newTerm;
if (s != null) s.acquireUninterruptibly();
return super.createNewTerm();
}
};
}

private void partition(int[]... partitions) throws TimeoutException {
List<List<JChannel>> parts = stream(partitions).map(t -> stream(t).mapToObj(this::channel).collect(toList()))
.collect(toList());
Expand Down

0 comments on commit d4079f3

Please sign in to comment.