Skip to content

Commit

Permalink
Handling flaky tests.
Browse files Browse the repository at this point in the history
Executing the tests with taskset has almost a fixed set of tests
failing. Some others tests only rarely failed, and others I still need
to further investigate to identify the cause.

Most of the failures happen because of timeouts. We have some fixed
threads for election, handling messages, etc., which might block. I
guess this would be enough to create flaky tests when running with
taskset.

This commit is part of issue #222. Although, not all tests are fixed,
we have less room for flaky tests.
  • Loading branch information
jabolina committed Dec 25, 2023
1 parent 596488c commit 70cbbb8
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 73 deletions.
8 changes: 8 additions & 0 deletions src/org/jgroups/protocols/raft/RAFT.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,15 @@ public void flushCommitTable() {
commit_table.forEach(this::sendAppendEntriesMessage);
}

/**
* Triggers a flush of the entries to the given member.
*
* @param member: The not-null member address to send the entries.
* @throws IllegalStateException: Thrown in case the current node is <b>not</b> the leader.
* @throws NullPointerException: Thrown in case the {@param member} is null.
*/
public void flushCommitTable(Address member) {
if (!isLeader()) throw new IllegalStateException("Currently not the leader, should be " + leader());
CommitTable.Entry e=commit_table.get(Objects.requireNonNull(member));
if(e != null)
sendAppendEntriesMessage(member, e);
Expand Down
15 changes: 15 additions & 0 deletions src/org/jgroups/protocols/raft/election/BaseElection.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ public abstract class BaseElection extends Protocol {
protected volatile View view;

public long voteTimeout() {return vote_timeout;}

/**
* Defines the default timeout in milliseconds to utilize during any election operation.
*
* @param timeoutMs: Timeout value in milliseconds.
* @return This election instance.
* @throws IllegalArgumentException: In case timeout is less than or equal to 0.
*/
public BaseElection voteTimeout(long timeoutMs) {
if (timeoutMs <= 0) throw new IllegalArgumentException("Timeout should be greater than 0.");

this.vote_timeout = timeoutMs;
return this;
}

public RAFT raft() {return raft;}
public BaseElection raft(RAFT r) {raft=r; return this;}
public ResponseCollector<VoteResponse> getVotes() {return votes;} // use for testing only!
Expand Down
68 changes: 53 additions & 15 deletions tests/junit-functional/org/jgroups/tests/AppendEntriesTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package org.jgroups.tests;
package org.jgroups.tests;

import org.jgroups.Address;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.raft.*;
import org.jgroups.protocols.raft.AppendResult;
import org.jgroups.protocols.raft.ELECTION;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntries;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.REDIRECT;
import org.jgroups.protocols.raft.RaftImpl;
import org.jgroups.raft.RaftHandle;
import org.jgroups.raft.blocks.ReplicatedStateMachine;
import org.jgroups.raft.util.Utils;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.raft.util.Utils;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
Expand All @@ -20,11 +27,14 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

/**
* Tests the AppendEntries functionality: appending log entries in regular operation, new members, late joiners etc
Expand Down Expand Up @@ -622,12 +632,12 @@ protected static void close(JChannel... channels) {
for(JChannel ch: channels) {
if(ch == null)
continue;
Util.close(ch);
RAFT raft=ch.getProtocolStack().findProtocol(RAFT.class);
try {
Utils.deleteLog(raft);
}
catch(Exception ignored) {}
Util.close(ch);
}
}

Expand Down Expand Up @@ -733,13 +743,41 @@ protected final void assertSame(ReplicatedStateMachine<Integer,Integer>... rsms)

@SafeVarargs
protected final void assertSame(long timeout, long interval, ReplicatedStateMachine<Integer,Integer>... rsms) {
Util.waitUntilTrue(timeout, interval, () -> Stream.of(rsms).allMatch(r -> r.equals(rsms[0])));
for(ReplicatedStateMachine<Integer,Integer> rsm: rsms)
System.out.println(rsm.channel().getName() + ": " + rsm);
for(int i=1; i < rsms.length; i++) {
assert rsms[i].equals(rsms[0])
: String.format("commit-table of A: %s", ((RAFT)a.getProtocolStack().findProtocol(RAFT.class)).dumpCommitTable());
// Wait until the leader receives the responses and all have matching commit indexes.
List<JChannel> channels = Stream.of(rsms).map(ReplicatedStateMachine::channel).collect(Collectors.toList());
BooleanSupplier bs = () -> channels.stream()
.map(AppendEntriesTest::raft)
.map(RAFT::commitIndex)
.distinct()
.count() == 1;
assert Util.waitUntilTrue(timeout, interval, bs) : generateErrorMessage(rsms);

System.out.println(dumpStateMachines(rsms));

// Wait until all state machines are equal. Meaning they all applied the same commands.
// In this point, the commit indexes are equals, meaning up to a point everything was applied.
assert Util.waitUntilTrue(timeout, interval, () -> Stream.of(rsms).distinct().count() == 1)
: generateErrorMessage(rsms) + " where " + dumpStateMachines(rsms);
}

private static String dumpStateMachines(ReplicatedStateMachine<?,?>... rsms) {
StringBuilder sb = new StringBuilder();
for (ReplicatedStateMachine<?, ?> rsm : rsms) {
sb.append(rsm.raftId())
.append(" -> ")
.append(rsm)
.append("\n");
}
return sb.toString();
}

private static String generateErrorMessage(ReplicatedStateMachine<?,?>... rsms) {
StringBuilder sb = new StringBuilder();
RAFT leader = raft(rsms[0].channel());
sb.append(leader.raftId())
.append(": commit-index=").append(leader.commitIndex())
.append(leader.dumpCommitTable());
return sb.toString();
}

protected static void assertLogIndices(Log log, int last_appended, int commit_index, int term) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ public void testAddServerOnLeaderWhichCantCommit() throws Exception {

RAFT raft=raft(leader);
try { // this will fail as leader A stepped down when it found that the view's size dropped below the majority
raft.addServer("C");
raft.addServer("C").get(10, TimeUnit.SECONDS);
assert false : "Adding server C should fail as the leader stepped down";
}
catch(Exception ex) {
} catch(Exception ex) {
System.out.println("Caught exception (as expected) trying to add C: " + ex);
assert !raft.isLeader() : "Still seen as leader!";
}

// Now start B again, so that addServer("C") can succeed
Expand Down
24 changes: 20 additions & 4 deletions tests/junit-functional/org/jgroups/tests/ElectionsTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.jgroups.tests;
package org.jgroups.tests;

import org.jgroups.Address;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.protocols.raft.*;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntries;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.REDIRECT;
import org.jgroups.protocols.raft.election.BaseElection;
import org.jgroups.raft.util.Utils;
import org.jgroups.tests.election.BaseElectionTest;
Expand Down Expand Up @@ -57,11 +61,17 @@ public void testElectionWithLongLog(Class<?> ignore) throws Exception {
JChannel coord=findCoord(a,b,c);
System.out.printf("\n\n-- starting the voting process on %s:\n", coord.getAddress());
BaseElection el=coord.getProtocolStack().findProtocol(electionClass);

// Assert that B and C have a longer log.
long aSize = logSize(a);
assert aSize < logSize(b) : "A log longer than B";
assert aSize < logSize(c) : "A log longer than C";

el.startVotingThread();
Util.waitUntilTrue(5000, 500, () -> !el.isVotingThreadRunning());

Address leader=assertLeader(20, 500, null, a, b, c);
assert leader.equals(b.getAddress()) || leader.equals(c.getAddress());
assert leader.equals(b.getAddress()) || leader.equals(c.getAddress()) : "Leader was " + leader;
assert !leader.equals(a.getAddress());
}

Expand Down Expand Up @@ -126,6 +136,12 @@ protected static void setLog(JChannel ch, int... terms) {
log.append(index+1, le);
}

private static long logSize(JChannel ch) {
RAFT raft=ch.getProtocolStack().findProtocol(RAFT.class);
Log log=raft.log();
return log.size();
}


protected static boolean isLeader(JChannel ch) {
RAFT raft=ch.getProtocolStack().findProtocol(RAFT.class);
Expand Down
12 changes: 11 additions & 1 deletion tests/junit-functional/org/jgroups/tests/MergeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -102,6 +108,10 @@ protected static void assertNoMoreThanOneLeaderInSameTerm(JChannel... channels)
RAFT r=ch.getProtocolStack().findProtocol(RAFT.class);
long current_term=r.currentTerm();
Address leader=r.leader();

// A node can still have a null leader. Meaning it didn't identify the new leader for a term.
if (leader == null) continue;

m.compute(current_term, (k,v) -> {
Set<Address> set=v == null? new HashSet<>() : v;
set.add(leader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.View;
import org.jgroups.protocols.raft.*;
import org.jgroups.protocols.raft.ELECTION;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntries;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.RaftImpl;
import org.jgroups.protocols.raft.election.BaseElection;
import org.jgroups.raft.testfwk.RaftCluster;
import org.jgroups.raft.testfwk.RaftNode;
Expand Down Expand Up @@ -59,14 +64,14 @@ protected void destroy() throws Exception {
nodes[i].destroy();
nodes[i]=null;
}
if(rafts[i] != null) {
Utils.deleteLog(rafts[i]);
rafts[i]=null;
}
if(elections[i] != null) {
elections[i].stopVotingThread();
elections[i]=null;
}
if(rafts[i] != null) {
Utils.deleteLog(rafts[i]);
rafts[i]=null;
}
}
cluster.clear();
}
Expand Down Expand Up @@ -102,7 +107,7 @@ public void testScenarioD(Class<?> ignore) throws Exception {
assertTerms(null, expected, expected, expected, expected);
}

/** Tests scenario E in fig.8 5.4.1 [1]: S1 replicates term=4 to S2 and S2, then crashes: either S2 or S3 will be
/** Tests scenario E in fig.8 5.4.1 [1]: S1 replicates term=4 to S2 and S3, then crashes: either S2 or S3 will be
* the new leader because they have the highest term (4):
* <pre>
* 1 2 3 1 2 3 1 2 3
Expand Down Expand Up @@ -140,7 +145,7 @@ public void testScenarioE(Class<?> ignore) throws Exception {

List<Address> leaders=Stream.of(rafts).filter(Objects::nonNull)
.filter(RAFT::isLeader).map(Protocol::getAddress).collect(Collectors.toList());
assert leaders.size() == 1;
assert leaders.size() == 1 : "Should have a single leader: " + leaders;
assert leaders.contains(s2) || leaders.contains(s3);
RAFT leader_raft=rafts[1].isLeader()? rafts[1] : rafts[2];

Expand Down Expand Up @@ -195,10 +200,23 @@ protected void assertTerms(long[] ... exp_terms) {
}

protected void waitUntilTerms(long timeout, long interval, long[] expexted_terms, Runnable action) {
waitUntilTrue(timeout, interval,
assert waitUntilTrue(timeout, interval,
() -> Stream.of(rafts).filter(Objects::nonNull)
.allMatch(r -> Arrays.equals(terms(r),expexted_terms)),
action);
action) : generateErrorMessage();
}

private String generateErrorMessage() {
StringBuilder sb = new StringBuilder("\n");
for (RAFT raft : rafts) {
if (raft == null) continue;

sb.append(raft.raftId())
.append(" -> ")
.append(Arrays.toString(terms(raft)))
.append("\n");
}
return sb.toString();
}

public static boolean waitUntilTrue(long timeout, long interval, BooleanSupplier condition,
Expand Down Expand Up @@ -281,7 +299,7 @@ protected RaftNode createNode(int index, String name) throws Exception {
.resendInterval(600_000) // long to disable resending by default
.stateMachine(new DummyStateMachine())
.synchronous(true).setAddress(addrs[index]);
elections[index]=instantiate().raft(rafts[index]).setAddress(addrs[index]);
elections[index]=instantiate().raft(rafts[index]).voteTimeout(1_000).setAddress(addrs[index]);
RaftNode node=nodes[index]=new RaftNode(cluster, new Protocol[]{elections[index], rafts[index]});
node.init();
cluster.add(addrs[index], node);
Expand Down
18 changes: 12 additions & 6 deletions tests/junit-functional/org/jgroups/tests/SyncLeaderCrashTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.View;
import org.jgroups.protocols.raft.*;
import org.jgroups.protocols.raft.AppendResult;
import org.jgroups.protocols.raft.ELECTION2;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntries;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.election.BaseElection;
import org.jgroups.raft.testfwk.RaftCluster;
import org.jgroups.raft.testfwk.RaftNode;
Expand Down Expand Up @@ -67,14 +72,14 @@ protected void destroy() throws Exception {
nodes[i].destroy();
nodes[i]=null;
}
if(rafts[i] != null) {
Utils.deleteLog(rafts[i]);
rafts[i]=null;
}
if(elections[i] != null) {
elections[i].stopVotingThread();
elections[i]=null;
}
if(rafts[i] != null) {
Utils.deleteLog(rafts[i]);
rafts[i]=null;
}
}
cluster.clear();
}
Expand All @@ -88,7 +93,8 @@ public void testsLeaderCrash(Class<?> ignore) throws Exception {
Log l=r.log();
long prev_term=l.get(i-1).term();
LogEntries entries=new LogEntries().add(new LogEntry(9, DATA));
r.impl().handleAppendEntriesRequest(entries, a,i-1, prev_term, 9, 4);
AppendResult ar = r.impl().handleAppendEntriesRequest(entries, a,i-1, prev_term, 9, 4);
assert ar != null && ar.success() : String.format("%s failed on %d with %s", r.raftId(), i, ar);
}
}
kill(0);
Expand Down
Loading

0 comments on commit 70cbbb8

Please sign in to comment.