diff --git a/raft_test.go b/raft_test.go index 3bbf00a0..9d340ee2 100644 --- a/raft_test.go +++ b/raft_test.go @@ -4824,6 +4824,58 @@ func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft { return sm } +func TestLogReplicationWithReorderedMessage(t *testing.T) { + r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) + r1.becomeCandidate() + r1.becomeLeader() + r1.readMessages() + r1.trk.Progress[2].BecomeReplicate() + + r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) + + // r1 sends 2 MsgApp messages to r2. + mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) + r1.sendAppend(2) + req1 := expectOneMessage(t, r1) + mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) + r1.sendAppend(2) + req2 := expectOneMessage(t, r1) + + // r2 receives the second MsgApp first due to reordering. + r2.Step(req2) + resp2 := expectOneMessage(t, r2) + // r2 rejects req2 + require.True(t, resp2.Reject) + require.Equal(t, uint64(0), resp2.RejectHint) + require.Equal(t, uint64(2), resp2.Index) + + // r2 handles the first MsgApp and responses to r1. + // And r1 updates match index accordingly. + r2.Step(req1) + m := expectOneMessage(t, r2) + require.False(t, m.Reject) + require.Equal(t, uint64(2), m.Index) + r1.Step(m) + m = expectOneMessage(t, r1) + require.Equal(t, uint64(2), r1.trk.Progress[2].Match) + + // r1 observes a transient network issue to r2, hence transits to probe state. + r1.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) + require.Equal(t, tracker.StateProbe, r1.trk.Progress[2].State) + + // now r1 receives the delayed resp2. + r1.Step(resp2) + m = expectOneMessage(t, r1) + // r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index. + require.Equal(t, r1.trk.Progress[2].Match, m.Index) +} + +func expectOneMessage(t *testing.T, r *raft) pb.Message { + msgs := r.readMessages() + require.Len(t, msgs, 1, "expect one message") + return msgs[0] +} + type network struct { t *testing.T // optional diff --git a/tracker/progress.go b/tracker/progress.go index dc4ac6b3..32c5ee24 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -203,11 +203,13 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // The rejection must be stale if "rejected" does not match next - 1. This // is because non-replicating followers are probed one entry at a time. + // The check is a best effort assuming message reordering is rare. if pr.Next-1 != rejected { return false } - pr.Next = max(min(rejected, matchHint+1), 1) + // Next index shall always be larger than match index. + pr.Next = max(min(rejected, matchHint+1), pr.Match+1) pr.MsgAppFlowPaused = false return true }