diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index ea95da965d51..5ceddf1c94e1 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -765,7 +765,10 @@ func TestAppendPagination(t *testing.T) { // After the partition recovers, tick the clock to wake everything // back up and send the messages. - n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat}) + p := n.peers[raftpb.PeerID(1)].(*raft) + for ticks := p.heartbeatTimeout; ticks > 0; ticks-- { + n.tick(p) + } assert.True(t, seenFullMessage, "didn't see any messages more than half the max size; something is wrong with this test") } diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 0ae6b8a809fb..2cb8fcb1707d 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -850,10 +850,11 @@ func (r *raft) sendFortify(to pb.PeerID) { func (r *raft) bcastAppend() { r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) { if id == r.id { - // NB: the leader doesn't send MsgAppResp to itself here. This means that - // the leader will not have a chance to update its own - // MatchCommit/SentCommit. That is fine because the leader doesn't use - // MatchCommit/SentCommit for itself. It only uses the followers' values. + // NB: the leader doesn't send MsgApp to itself here nor does it receive + // a self directed MsgAppResp. This means that the leader will not have a + // chance to update its own MatchCommit/SentCommit. That is fine because + // the leader doesn't use MatchCommit/SentCommit for itself. It only uses + // the peers' values. return } r.maybeSendAppend(id) @@ -880,6 +881,38 @@ func (r *raft) bcastFortify() { }) } +// maybeUnpauseAndBcastAppend unpauses and attempts to send an MsgApp to all the +// followers that provide store liveness support. If there is no store liveness +// support, we skip unpausing and sending MsgApp because the message is likely +// to be dropped. +func (r *raft) maybeUnpauseAndBcastAppend() { + if !r.fortificationTracker.FortificationEnabled() { + // The underlying store liveness fabric hasn't been enabled. + return + } + + r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) { + if r.id == id { + // NB: the leader doesn't send MsgApp to itself here nor does it receive + // a self directed MsgAppResp. This means that the leader will not have a + // chance to update its own MatchCommit/SentCommit. That is fine because + // the leader doesn't use MatchCommit/SentCommit for itself. It only uses + // the peers' values. + return + } + + if _, supported := r.fortificationTracker.IsFortifiedBy(id); !supported { + // If the follower's store isn't providing active store liveness support + // to the leader's store, or it is but the leader isn't hearing about it, + // we don't send a MsgApp. + return + } + + pr.MsgAppProbesPaused = false + r.maybeSendAppend(id) + }) +} + func (r *raft) appliedTo(index uint64, size entryEncodingSize) { oldApplied := r.raftLog.applied newApplied := max(index, oldApplied) @@ -1108,7 +1141,7 @@ func (r *raft) tickHeartbeat() { // Try to refortify any followers that don't currently support us. r.bcastFortify() - // TODO(ibrahim): add/call maybeUnpauseAndBcastAppend() here. + r.maybeUnpauseAndBcastAppend() } } diff --git a/pkg/raft/raft_flow_control_test.go b/pkg/raft/raft_flow_control_test.go index a4ccd9721c7e..3064bd14e7e0 100644 --- a/pkg/raft/raft_flow_control_test.go +++ b/pkg/raft/raft_flow_control_test.go @@ -21,6 +21,8 @@ import ( "testing" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/stretchr/testify/require" ) // TestMsgAppFlowControlFull ensures: @@ -39,9 +41,8 @@ func TestMsgAppFlowControlFull(t *testing.T) { for i := 0; i < r.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 || ms[0].Type != pb.MsgApp { - t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", i, len(ms)) - } + require.Len(t, ms, 1) + require.Equal(t, ms[0].Type, pb.MsgApp) } // ensure 1 @@ -53,9 +54,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 0 { - t.Fatalf("#%d: len(ms) = %d, want 0", i, len(ms)) - } + require.Empty(t, ms) } } @@ -87,9 +86,8 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // fill in the inflights window again r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() - if len(ms) != 1 || ms[0].Type != pb.MsgApp { - t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", tt, len(ms)) - } + require.Len(t, ms, 1) + require.Equal(t, ms[0].Type, pb.MsgApp) // ensure 1 if !pr2.IsPaused() { @@ -106,50 +104,82 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { } } -// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response -// frees one slot if the window is full. -func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { - r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) - r.becomeCandidate() - r.becomeLeader() - - pr2 := r.trk.Progress(2) - // force the progress to be in replicate state - pr2.BecomeReplicate() - // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - r.readMessages() - } - - for tt := 1; tt < 5; tt++ { - // recv tt msgHeartbeatResp and expect one free slot - for i := 0; i < tt; i++ { - if !pr2.IsPaused() { - t.Fatalf("#%d.%d: paused = false, want true", tt, i) - } - // Unpauses the progress, sends an empty MsgApp, and pauses it again. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - ms := r.readMessages() - if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { - t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) +// TestMsgAppFlowControl ensures that if storeliveness is disabled, a heartbeat +// response frees one slot if the window is full. If storelivess is enabled, +// a similar thing happens but on the next heartbeat timeout. +func TestMsgAppFlowControl(t *testing.T) { + testutils.RunTrueAndFalse(t, "store-liveness-enabled", + func(t *testing.T, storeLivenessEnabled bool) { + testOptions := emptyTestConfigModifierOpt() + if !storeLivenessEnabled { + testOptions = withFortificationDisabled() } - } - // No more appends are sent if there are no heartbeats. - for i := 0; i < 10; i++ { - if !pr2.IsPaused() { - t.Fatalf("#%d.%d: paused = false, want true", tt, i) + r := newTestRaft(1, 5, 1, + newTestMemoryStorage(withPeers(1, 2)), testOptions) + r.becomeCandidate() + r.becomeLeader() + + pr2 := r.trk.Progress(2) + // force the progress to be in replicate state + pr2.BecomeReplicate() + // fill in the inflights window + for i := 0; i < r.maxInflight; i++ { + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, + Entries: []pb.Entry{{Data: []byte("somedata")}}}) + r.readMessages() } - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms := r.readMessages() - if len(ms) != 0 { - t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms)) - } - } - // clear all pending messages. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - r.readMessages() - } + for tt := 1; tt < 5; tt++ { + for i := 0; i < tt; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + + // Unpauses the progress, sends an empty MsgApp, and pauses it again. + // When storeliveness is enabled, we do this on the next heartbeat + // timeout. However, when storeliveness is disabled, we do this on + // the next heartbeat response. + if storeLivenessEnabled { + for ticks := r.heartbeatTimeout; ticks > 0; ticks-- { + r.tick() + } + ms := r.readMessages() + if len(ms) != 3 || ms[0].Type != pb.MsgHeartbeat || ms[1].Type != pb.MsgFortifyLeader || + ms[2].Type != pb.MsgApp || len(ms[2].Entries) != 0 { + t.Fatalf("#%d.%d: len(ms) == %d, want 3 messages including one empty MsgApp", + tt, i, len(ms)) + } + require.Len(t, ms, 3) + require.Equal(t, ms[0].Type, pb.MsgHeartbeat) + require.Equal(t, ms[1].Type, pb.MsgFortifyLeader) + require.Equal(t, ms[2].Type, pb.MsgApp) + require.Empty(t, ms[0].Entries) + } else { + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) + ms := r.readMessages() + require.Len(t, ms, 1) + require.Equal(t, ms[0].Type, pb.MsgApp) + require.Empty(t, ms[0].Entries) + } + } + + // No more appends are sent if there are no heartbeats. + for i := 0; i < 10; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, + Entries: []pb.Entry{{Data: []byte("somedata")}}}) + ms := r.readMessages() + require.Empty(t, ms) + } + + // clear all pending messages. + for ticks := r.heartbeatTimeout; ticks > 0; ticks-- { + r.tick() + } + r.readMessages() + } + }) } diff --git a/pkg/raft/raft_paper_test.go b/pkg/raft/raft_paper_test.go index cad6d4fe3950..f00d98972e5b 100644 --- a/pkg/raft/raft_paper_test.go +++ b/pkg/raft/raft_paper_test.go @@ -103,10 +103,12 @@ func TestStartAsFollower(t *testing.T) { // TestLeaderBcastBeat tests that if the leader receives a heartbeat tick, // it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries // as heartbeat to all followers. +// Note that if store liveness is enabled, the leader will also send a MsgApp +// on every heartbeat interval. // Reference: section 5.2 func TestLeaderBcastBeat(t *testing.T) { // heartbeat interval - hi := 1 + hi := 3 testutils.RunTrueAndFalse(t, "store-liveness-enabled", func(t *testing.T, storeLivenessEnabled bool) { @@ -126,6 +128,7 @@ func TestLeaderBcastBeat(t *testing.T) { } for i := 0; i < hi; i++ { + require.Empty(t, r.readMessages()) r.tick() } @@ -133,6 +136,8 @@ func TestLeaderBcastBeat(t *testing.T) { slices.SortFunc(msgs, cmpMessages) if storeLivenessEnabled { assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()}, + {From: 1, To: 3, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()}, {From: 1, To: 2, Term: 1, Type: pb.MsgFortifyLeader}, {From: 1, To: 3, Term: 1, Type: pb.MsgFortifyLeader}, {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat}, diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index 5d07f0fb4627..d05dd54f6a79 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -844,13 +845,18 @@ func TestCandidateConcede(t *testing.T) { // heal the partition tt.recover() // send heartbeat; reset wait - tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) + p := tt.peers[pb.PeerID(3)].(*raft) + for ticks := p.heartbeatTimeout; ticks > 0; ticks-- { + tt.tick(p) + } data := []byte("force follower") // send a proposal to 3 to flush out a MsgApp to 1 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) // send heartbeat; flush out commit - tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) + for ticks := p.heartbeatTimeout; ticks > 0; ticks-- { + tt.tick(p) + } a := tt.peers[1].(*raft) assert.Equal(t, StateFollower, a.state) @@ -1167,15 +1173,16 @@ func TestHandleHeartbeat(t *testing.T) { } } -// TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response. -func TestHandleHeartbeatResp(t *testing.T) { +// TestHandleHeartbeatRespStoreLivenessDisabled ensures that we re-send log +// entries when we get a heartbeat response. +func TestHandleHeartbeatRespStoreLivenessDisabled(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) require.NoError(t, storage.SetHardState(pb.HardState{Term: 3})) require.NoError(t, storage.Append(index(1).terms(1, 2, 3))) - sm := newTestRaft(1, 5, 1, storage) + sm := newTestRaft(1, 5, 1, storage, withFortificationDisabled()) sm.becomeCandidate() sm.becomeLeader() - sm.raftLog.commitTo(LogMark{Term: 3, Index: sm.raftLog.lastIndex()}) + sm.raftLog.commitTo(sm.raftLog.unstable.mark()) // A heartbeat response from a node that is behind; re-send MsgApp sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) @@ -1204,6 +1211,59 @@ func TestHandleHeartbeatResp(t *testing.T) { require.Empty(t, msgs) } +// TestHandleHeatbeatTimeoutStoreLivenessEnabled ensures that we re-send log +// entries on heartbeat timeouts only if we need to. +func TestHandleHeatbeatTimeoutStoreLivenessEnabled(t *testing.T) { + storage := newTestMemoryStorage(withPeers(1, 2)) + require.NoError(t, storage.SetHardState(pb.HardState{Term: 3})) + require.NoError(t, storage.Append(index(1).terms(1, 2, 3))) + sm := newTestRaft(1, 5, 1, storage) + sm.becomeCandidate() + sm.becomeLeader() + sm.fortificationTracker.RecordFortification(pb.PeerID(2), 1) + sm.fortificationTracker.RecordFortification(pb.PeerID(3), 1) + sm.raftLog.commitTo(sm.raftLog.unstable.mark()) + + // On heartbeat timeout, the leader sends a MsgApp. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tick() + } + + msgs := sm.readMessages() + require.Len(t, msgs, 2) + assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type) + assert.Equal(t, pb.MsgApp, msgs[1].Type) + + // On another heartbeat timeout, the leader sends a MsgApp. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tick() + } + msgs = sm.readMessages() + require.Len(t, msgs, 2) + assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type) + assert.Equal(t, pb.MsgApp, msgs[1].Type) + + // Once the leader receives a MsgAppResp, it doesn't send MsgApp. + sm.Step(pb.Message{ + From: 2, + Type: pb.MsgAppResp, + Index: msgs[1].Index + uint64(len(msgs[1].Entries)), + Commit: sm.raftLog.lastIndex(), + }) + + // Consume the message sent in response to MsgAppResp + sm.readMessages() + + // On heartbeat timeout, the leader doesn't send a MsgApp because the follower + // is up-to-date. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tick() + } + msgs = sm.readMessages() + require.Len(t, msgs, 1) + assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type) +} + // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { @@ -1977,42 +2037,79 @@ func TestLeaderAppResp(t *testing.T) { // TestBcastBeat is when the leader receives a heartbeat tick, it should // send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries. func TestBcastBeat(t *testing.T) { - offset := uint64(1000) - // make a state machine with log.offset = 1000 - s := pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - Index: offset, - Term: 1, - ConfState: pb.ConfState{Voters: []pb.PeerID{1, 2, 3}}, - }, - } - storage := NewMemoryStorage() - storage.ApplySnapshot(s) - sm := newTestRaft(1, 10, 1, storage) - sm.Term = 1 + testutils.RunTrueAndFalse(t, "store-liveness-enabled", + func(t *testing.T, storeLivenessEnabled bool) { + offset := uint64(1000) + // make a state machine with log.offset = 1000 + s := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + Index: offset, + Term: 1, + ConfState: pb.ConfState{Voters: []pb.PeerID{1, 2, 3}}, + }, + } + storage := NewMemoryStorage() + storage.ApplySnapshot(s) - sm.becomeCandidate() - sm.becomeLeader() - for i := 0; i < 10; i++ { - mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) - } - sm.advanceMessagesAfterAppend() + testOptions := emptyTestConfigModifierOpt() + if !storeLivenessEnabled { + testOptions = withFortificationDisabled() + } - // slow follower - sm.trk.Progress(2).Match, sm.trk.Progress(2).Next = 5, 6 - // normal follower - sm.trk.Progress(3).Match, sm.trk.Progress(3).Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm := newTestRaft(1, 10, 1, storage, testOptions) - sm.Step(pb.Message{Type: pb.MsgBeat}) - msgs := sm.readMessages() - require.Len(t, msgs, 2) + sm.Term = 1 - for i, m := range msgs { - require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i) - require.Zero(t, m.Index, "#%d", i) - require.Zero(t, m.LogTerm, "#%d", i) - require.Empty(t, m.Entries, "#%d", i) - } + sm.becomeCandidate() + sm.becomeLeader() + + for i := 0; i < 10; i++ { + mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) + } + sm.advanceMessagesAfterAppend() + + // slow follower + sm.trk.Progress(2).Match, sm.trk.Progress(2).Next = 5, 6 + // normal follower + sm.trk.Progress(3).Match, sm.trk.Progress(3).Next = sm.raftLog.lastIndex(), + sm.raftLog.lastIndex()+1 + + // TODO(ibrahim): Create a test helper function that takes the number of + // ticks and calls tick() that many times. Then we can refactor a lot of + // tests that have this pattern. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tick() + } + msgs := sm.readMessages() + // If storeliveness is enabled, the heartbeat timeout will also send a + // MsgApp if it needs to. In this case since follower 2 is slow, we will + // send a MsgApp to it. + if storeLivenessEnabled { + require.Len(t, msgs, 5) + assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 2, Type: pb.MsgHeartbeat, Match: 5}, + {From: 1, To: 3, Term: 2, Type: pb.MsgHeartbeat, Match: 1011}, + {From: 1, To: 2, Term: 2, Type: pb.MsgFortifyLeader}, + {From: 1, To: 3, Term: 2, Type: pb.MsgFortifyLeader}, + {From: 1, To: 3, Term: 2, Type: pb.MsgApp, LogTerm: 2, Index: 1011, Commit: 1000, + Match: 1011}, + }, msgs) + } else { + require.Len(t, msgs, 2) + assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 2, Type: pb.MsgHeartbeat, Match: 5}, + {From: 1, To: 3, Term: 2, Type: pb.MsgHeartbeat, Match: 1011}, + }, msgs) + } + + // Make sure that the heartbeat messages contain the expected fields. + for i, m := range msgs[:2] { + require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i) + require.Zero(t, m.Index, "#%d", i) + require.Zero(t, m.LogTerm, "#%d", i) + require.Empty(t, m.Entries, "#%d", i) + } + }) } // TestRecvMsgBeat tests the output of the state machine when receiving MsgBeat @@ -2082,50 +2179,81 @@ func TestLeaderIncreaseNext(t *testing.T) { } func TestSendAppendForProgressProbe(t *testing.T) { - r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) - r.becomeCandidate() - r.becomeLeader() - r.readMessages() - r.trk.Progress(2).BecomeProbe() + testutils.RunTrueAndFalse(t, "store-liveness-enabled", + func(t *testing.T, storeLivenessEnabled bool) { + testOptions := emptyTestConfigModifierOpt() + if !storeLivenessEnabled { + // TODO(ibrahim): allow the test option to take a boolean to + // enable/disable fortification. This way we can refactor the tests and + // make them less verbose. + testOptions = withFortificationDisabled() + } - // each round is a heartbeat - for i := 0; i < 3; i++ { - if i == 0 { - // we expect that raft will only send out one msgAPP on the first - // loop. After that, the follower is paused until a heartbeat response is - // received. - mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) - msg := r.readMessages() - assert.Len(t, msg, 1) - assert.Zero(t, msg[0].Index) - } + r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)), + testOptions) - assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) - for j := 0; j < 10; j++ { - mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) - assert.Empty(t, r.readMessages()) - } + r.becomeCandidate() + r.becomeLeader() - // do a heartbeat - for j := 0; j < r.heartbeatTimeout; j++ { - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - } - assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + r.readMessages() + r.trk.Progress(2).BecomeProbe() + + // each round is a heartbeat + for i := 0; i < 3; i++ { + if i == 0 { + // we expect that raft will only send out one msgAPP on the first + // loop. After that, the follower is paused until a heartbeat response + // is received. + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) + r.maybeSendAppend(2) + msg := r.readMessages() + assert.Len(t, msg, 1) + assert.Zero(t, msg[0].Index) + } - // consume the heartbeat - msg := r.readMessages() - assert.Len(t, msg, 1) - assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) - } + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + for j := 0; j < 10; j++ { + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) + r.maybeSendAppend(2) + assert.Empty(t, r.readMessages()) + } - // a heartbeat response will allow another message to be sent - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - msg := r.readMessages() - assert.Len(t, msg, 1) - assert.Zero(t, msg[0].Index) - assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + // do a heartbeat + for j := 0; j < r.heartbeatTimeout; j++ { + r.tick() + } + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + + // consume the heartbeat, and the MsgApp if storeliveness is enabled + msg := r.readMessages() + if storeLivenessEnabled { + assert.Len(t, msg, 3) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + assert.Equal(t, pb.MsgFortifyLeader, msg[1].Type) + assert.Equal(t, pb.MsgApp, msg[2].Type) + } else { + assert.Len(t, msg, 1) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + } + } + + // The next heartbeat timeout will allow another message to be sent. + for ticks := r.heartbeatTimeout; ticks > 0; ticks-- { + r.tick() + } + msg := r.readMessages() + if storeLivenessEnabled { + assert.Len(t, msg, 3) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + assert.Equal(t, pb.MsgFortifyLeader, msg[1].Type) + assert.Equal(t, pb.MsgApp, msg[2].Type) + assert.Zero(t, msg[1].Index) + } else { + assert.Len(t, msg, 1) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + } + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + }) } func TestSendAppendForProgressReplicate(t *testing.T) { @@ -4016,6 +4144,16 @@ func (nw *network) send(msgs ...pb.Message) { } } +// tick takes a raft instance and calls tick(). It then uses the network.send +// function if that generates any messages. +func (nw *network) tick(p *raft) { + p.tick() + msgs := nw.filter(p.readMessages()) + if len(msgs) > 0 { + nw.send(msgs...) + } +} + func (nw *network) drop(from, to pb.PeerID, perc float64) { nw.dropm[connem{from, to}] = perc } diff --git a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt index 32e7cba194ad..ec65a539b1db 100644 --- a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt +++ b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt @@ -388,11 +388,11 @@ Messages: AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12 ] -# Step 7: before the new entries reach node 1, it hears of the term change -# through a heartbeat and persists the new term. Node 1 then receives these -# entries, overwriting the previous unstable log entries that are in the process -# of being appended. The entries have a larger term than the previous entries -# but the same indexes. It begins appending these new entries asynchronously. +# Step 7: before the new entries reach node 1, it hears of the term change and +# persists the new term. Node 1 then receives these entries, overwriting the +# previous unstable log entries that are in the process of being appended. +# The entries have a larger term than the previous entries but the same indexes. +# It begins appending these new entries asynchronously. deliver-msgs drop=1 ---- @@ -420,6 +420,12 @@ Messages: 4->5 MsgFortifyLeader Term:3 Log:0/0 4->6 MsgFortifyLeader Term:3 Log:0/0 4->7 MsgFortifyLeader Term:3 Log:0/0 +4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->2 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->5 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->6 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->AppendThread MsgStorageAppend Term:0 Log:0/0 Responses:[ 4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 ] @@ -430,29 +436,6 @@ deliver-msgs 1 INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3] INFO 1 became follower at term 3 4->1 MsgFortifyLeader Term:3 Log:0/0 - -process-ready 1 ----- -Ready MustSync=true: -HardState Term:3 Commit:11 Lead:4 LeadEpoch:1 -Messages: -1->4 MsgHeartbeatResp Term:3 Log:0/0 -1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 LeadEpoch:1 Responses:[ - 1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 -] - -deliver-msgs 4 ----- -1->4 MsgHeartbeatResp Term:3 Log:0/0 - -process-ready 4 ----- -Ready MustSync=false: -Messages: -4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] - -deliver-msgs 1 ----- 4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] INFO found conflict at index 12 [existing term: 2, conflicting term: 3] INFO replace the unstable entries from index 12 @@ -460,10 +443,13 @@ INFO replace the unstable entries from index 12 process-ready 1 ---- Ready MustSync=true: +HardState Term:3 Commit:11 Lead:4 LeadEpoch:1 Entries: 3/12 EntryNormal "" Messages: -1->AppendThread MsgStorageAppend Term:0 Log:3/12 Entries:[3/12 EntryNormal ""] Responses:[ +1->4 MsgHeartbeatResp Term:3 Log:0/0 +1->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] Responses:[ + 1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 1->4 MsgAppResp Term:3 Log:0/12 Commit:11 AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 ] @@ -496,61 +482,31 @@ raft-log 1 # 5 before they have been replaced by the entries from step 7. Instead, we must # wait until we are sure that the entries are stable and that no in-progress # appends might overwrite them before removing entries from the unstable log. - -deliver-msgs 1 ----- -AppendThread->1 MsgStorageAppendResp Term:0 Log:1/12 -INFO mark (term,index)=(1,12) mismatched the last accepted term 3 in unstable log; ignoring - -process-append-thread 1 ----- -Processing: -1->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] -Responses: -1->3 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0) -1->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1 -1->3 MsgAppResp Term:2 Log:0/12 Commit:11 -AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 - -raft-log 1 +stabilize 1 ---- -1/11 EntryNormal "" -2/12 EntryNormal "" - -deliver-msgs 1 ----- -AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 -INFO mark (term,index)=(2,12) mismatched the last accepted term 3 in unstable log; ignoring - -process-append-thread 1 ----- -Processing: -1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 LeadEpoch:1 -Responses: -1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 - -raft-log 1 ----- -1/11 EntryNormal "" -2/12 EntryNormal "" - -deliver-msgs 1 ----- -no messages - -process-append-thread 1 ----- -Processing: -1->AppendThread MsgStorageAppend Term:0 Log:3/12 Entries:[3/12 EntryNormal ""] -Responses: -1->4 MsgAppResp Term:3 Log:0/12 Commit:11 -AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 - +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:0 Log:1/12 + INFO mark (term,index)=(1,12) mismatched the last accepted term 3 in unstable log; ignoring +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] + Responses: + 1->3 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0) + 1->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1 + 1->3 MsgAppResp Term:2 Log:0/12 Commit:11 + AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 + Processing: + 1->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] + Responses: + 1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 + 1->4 MsgAppResp Term:3 Log:0/12 Commit:11 + AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 + INFO mark (term,index)=(2,12) mismatched the last accepted term 3 in unstable log; ignoring + AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 + raft-log 1 ---- 1/11 EntryNormal "" 3/12 EntryNormal "" - -deliver-msgs 1 ----- -AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 diff --git a/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt b/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt index 2647704bfb0d..ba034b8b166f 100644 --- a/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt +++ b/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt @@ -47,7 +47,7 @@ tick-heartbeat 1 ---- ok -# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate. +# Heartbeat -> MsgApp -> MsgAppResp -> StateReplicate. stabilize ---- > 1 handling Ready @@ -55,20 +55,24 @@ stabilize Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 > 3 handling Ready Ready MustSync=false: Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 3->1 MsgHeartbeatResp Term:1 Log:0/0 > 1 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/lagging_commit.txt b/pkg/raft/testdata/lagging_commit.txt index f7bf4e90cbc0..15e6a1844337 100644 --- a/pkg/raft/testdata/lagging_commit.txt +++ b/pkg/raft/testdata/lagging_commit.txt @@ -126,36 +126,40 @@ tick-heartbeat 1 ---- ok -# However, the leader does not push the real commit index to the follower 3. It -# cuts the commit index at the Progress.Match mark, because it thinks that it is -# unsafe to send a commit index higher than that. +# The leader knows that the follower 3 is lagging behind, so it sends a MsgApp +# to fix that. process-ready 1 ---- Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 - -# Since the heartbeat message does not bump the follower's commit index, it will -# take another roundtrip with the leader to update it. As such, the total time -# it takes for the follower to learn the commit index is: -# -# delay = HeartbeatInterval + 3/2 * RTT -# -# This is suboptimal. It could have taken HeartbeatInterval + 1/2 * RTT, if the -# leader sent the up-to-date commit index in the heartbeat message. -# -# See https://github.com/etcd-io/raft/issues/138 which aims to fix this. +1->3 MsgApp Term:1 Log:1/13 Commit:13 + +# Since the leader sends a MsgApp on the heartbeat timeout, it takes this long +# for the follower to advance its commit index: +# delay = HeartbeatInterval + 1/2 * RTT +# This is better than what we previously had, which was: +# HeartbeatInterval + 3/2 * RTT. That was the case because the leader needed to +# wait to send/receive a MsgHeartbeat/MsgHeartbeatResp before it could send the +# MsgApp. stabilize 1 3 ---- > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 3 handling Ready - Ready MustSync=false: + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 receiving messages 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 handling Ready Ready MustSync=false: Messages: @@ -163,11 +167,7 @@ stabilize 1 3 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 3 handling Ready - Ready MustSync=true: - HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 - CommittedEntries: - 1/12 EntryNormal "data1" - 1/13 EntryNormal "data2" + Ready MustSync=false: Messages: 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 receiving messages diff --git a/pkg/raft/testdata/lagging_commit_no_store_liveness_support.txt b/pkg/raft/testdata/lagging_commit_no_store_liveness_support.txt new file mode 100644 index 000000000000..03e7a41ca5dc --- /dev/null +++ b/pkg/raft/testdata/lagging_commit_no_store_liveness_support.txt @@ -0,0 +1,211 @@ +# This test demonstrates the effect of delayed commit on a follower node after a +# network hiccup between the leader and this follower that causes withdrawn +# support. + +# Skip logging the boilerplate. Set up a raft group of 3 nodes, and elect node 1 +# as the leader. Nodes 2 and 3 are the followers. +log-level none +---- +ok + +add-nodes 3 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok + +# Propose a couple of entries. +propose 1 data1 +---- +ok + +propose 1 data2 +---- +ok + +process-ready 1 +---- +ok + +# The interesting part starts below. +log-level debug +---- +ok + +deliver-msgs 2 3 +---- +1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"] +1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"] +1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"] +1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"] + +process-ready 3 +---- +Ready MustSync=true: +Entries: +1/12 EntryNormal "data1" +1/13 EntryNormal "data2" +Messages: +3->1 MsgAppResp Term:1 Log:0/12 Commit:11 +3->1 MsgAppResp Term:1 Log:0/13 Commit:11 + +# Suppose there is a network blip which prevents the leader learning that the +# follower 3 has appended the proposed entries to the log. +deliver-msgs drop=(1) +---- +dropped: 3->1 MsgAppResp Term:1 Log:0/12 Commit:11 +dropped: 3->1 MsgAppResp Term:1 Log:0/13 Commit:11 + +# In the meantime, the entries are committed, and the leader sends the commit +# index to all the followers. +stabilize 1 2 +---- +> 2 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 2->1 MsgAppResp Term:1 Log:0/12 Commit:11 + 2->1 MsgAppResp Term:1 Log:0/13 Commit:11 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/12 Commit:11 + 2->1 MsgAppResp Term:1 Log:0/13 Commit:11 +> 1 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 1->2 MsgApp Term:1 Log:1/13 Commit:12 + 1->3 MsgApp Term:1 Log:1/13 Commit:12 + 1->2 MsgApp Term:1 Log:1/13 Commit:13 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/13 Commit:12 + 1->2 MsgApp Term:1 Log:1/13 Commit:13 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 2->1 MsgAppResp Term:1 Log:0/13 Commit:12 + 2->1 MsgAppResp Term:1 Log:0/13 Commit:13 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/13 Commit:12 + 2->1 MsgAppResp Term:1 Log:0/13 Commit:13 + +# The network blip prevents the follower 3 from learning that the previously +# appended entries are now committed. +deliver-msgs drop=(3) +---- +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:13 + +withdraw-support 3 1 +---- + 1 2 3 +1 1 1 1 +2 1 1 1 +3 x 1 1 + +status 1 +---- +1: StateReplicate match=13 next=14 sentCommit=11 matchCommit=11 +2: StateReplicate match=13 next=14 sentCommit=13 matchCommit=13 +3: StateReplicate match=11 next=14 sentCommit=13 matchCommit=11 inflight=2 + +tick-heartbeat 1 +---- +ok + +# Although the leader knows that the follower 3 is lagging behind it doesn't +# send a MsgApp because follower 3 doesn't support the leader. +process-ready 1 +---- +Ready MustSync=false: +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgHeartbeat Term:1 Log:0/0 + +# Now that follower 3's store supports the leader's store, will send a MsgApp on +# the next heartbeat timeout. +grant-support 3 1 +---- + 1 2 3 +1 2 1 1 +2 1 1 1 +3 2 1 1 + +tick-heartbeat 1 +---- +ok + +process-ready 1 +---- +Ready MustSync=true: +HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:2 +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgFortifyLeader Term:1 Log:0/0 +1->3 MsgApp Term:1 Log:1/13 Commit:13 + +stabilize +---- +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgHeartbeat Term:1 Log:0/0 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:2 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/13 Commit:13 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/13 Commit:13 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 diff --git a/pkg/raft/testdata/msg_app_commit_index.txt b/pkg/raft/testdata/msg_app_commit_index.txt index 5fc3ba849a15..d464faa2fe8e 100644 --- a/pkg/raft/testdata/msg_app_commit_index.txt +++ b/pkg/raft/testdata/msg_app_commit_index.txt @@ -124,6 +124,7 @@ Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgApp Term:1 Log:1/13 Commit:13 # On the next MsgApp sent to follower 3, the leader will include that the # commit index is 13. Notice that the leader doesn't send MsgApp to follower 2 @@ -134,17 +135,24 @@ stabilize 1 2 3 1->2 MsgHeartbeat Term:1 Log:0/0 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 > 3 handling Ready - Ready MustSync=false: + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 handling Ready Ready MustSync=false: Messages: @@ -152,11 +160,7 @@ stabilize 1 2 3 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 3 handling Ready - Ready MustSync=true: - HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 - CommittedEntries: - 1/12 EntryNormal "data1" - 1/13 EntryNormal "data2" + Ready MustSync=false: Messages: 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 receiving messages diff --git a/pkg/raft/testdata/replicate_pause.txt b/pkg/raft/testdata/replicate_pause.txt index 7ee9a71e24f8..75d8fbdd27a7 100644 --- a/pkg/raft/testdata/replicate_pause.txt +++ b/pkg/raft/testdata/replicate_pause.txt @@ -121,7 +121,9 @@ status 1 2: StateReplicate match=17 next=18 sentCommit=17 matchCommit=17 3: StateReplicate match=11 next=15 sentCommit=14 matchCommit=11 paused inflight=3[full] -# Make a heartbeat roundtrip. +# On the next heartbeat timeout, node 1 sends an empty MsgApp to a throttled +# node 3 because it hasn't yet replied to a single MsgApp, and the in-flight +# tracker is still saturated. tick-heartbeat 1 ---- ok @@ -133,13 +135,17 @@ stabilize 1 Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/14 Commit:17 +# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. stabilize 2 3 ---- > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/14 Commit:17 + DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 > 2 handling Ready Ready MustSync=false: Messages: @@ -148,42 +154,67 @@ stabilize 2 3 Ready MustSync=false: Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 -# After handling heartbeat responses, node 1 sends an empty MsgApp to a -# throttled node 3 because it hasn't yet replied to a single MsgApp, and the -# in-flight tracker is still saturated. +# Node 1 receives the rejection and adjusts the MsgApp sent to node 3. stabilize 1 ---- > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 + DEBUG 1 received MsgAppResp(rejected, hint: (index 11, term 1)) from 3 for index 14 + DEBUG 1 decreased progress of 3 to [StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 paused inflight=3[full]] > 1 handling Ready Ready MustSync=false: Messages: 1->3 MsgApp Term:1 Log:1/14 Commit:17 + 1->3 MsgApp Term:1 Log:1/11 Commit:17 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] -# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. -stabilize 3 +stabilize 1 3 ---- > 3 receiving messages 1->3 MsgApp Term:1 Log:1/14 Commit:17 DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 + 1->3 MsgApp Term:1 Log:1/11 Commit:17 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] > 3 handling Ready - Ready MustSync=false: + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:17 Lead:1 LeadEpoch:1 + Entries: + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + CommittedEntries: + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" Messages: 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 - -log-level none ----- -ok - -stabilize ----- -ok - -log-level debug ----- -ok + 3->1 MsgAppResp Term:1 Log:0/17 Commit:17 +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 + DEBUG 1 received MsgAppResp(rejected, hint: (index 11, term 1)) from 3 for index 14 + 3->1 MsgAppResp Term:1 Log:0/17 Commit:17 # Eventually all nodes catch up on the committed state. status 1 diff --git a/pkg/raft/testdata/slow_follower_after_compaction.txt b/pkg/raft/testdata/slow_follower_after_compaction.txt index 83a94fbbb6b0..a74c15db5ebe 100644 --- a/pkg/raft/testdata/slow_follower_after_compaction.txt +++ b/pkg/raft/testdata/slow_follower_after_compaction.txt @@ -96,24 +96,49 @@ compact 1 17 ---- 1/18 EntryNormal "prop_1_18" -# Trigger a round of empty MsgApp "probe" from leader. It will reach node 3 -# which will reply with a rejection MsgApp because it sees a gap in the log. -# Node 1 will reset the MsgApp flow and send a snapshot to catch node 3 up. +# Trigger a heartbeat timeout to allow the leader to detect that it can't send a +# MsgApp to node 3 because the relevant part of the log is already compacted. +# Instead, the leader should send a snapshot. tick-heartbeat 1 ---- -ok - -log-level none ----- -ok +DEBUG 1 [firstindex: 18, commit: 18] sent snapshot[index: 18, term: 1] to 3 [StateReplicate match=14 next=17 sentCommit=16 matchCommit=14 inflight=2[full]] +DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=14 next=19 sentCommit=18 matchCommit=14 paused pendingSnap=18] stabilize ---- -ok - -log-level debug ----- -ok +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgSnap Term:1 Log:0/0 + Snapshot: Index:18 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgSnap Term:1 Log:0/0 + Snapshot: Index:18 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + INFO log [committed=14, applied=14, applying=14, unstable.offset=15, unstable.offsetInProgress=15, len(unstable.Entries)=0] starts to restore snapshot [index: 18, term: 1] + INFO 3 switched to configuration voters=(1 2 3) + INFO 3 [commit: 18, lastindex: 18, lastterm: 1] restored snapshot [index: 18, term: 1] + INFO 3 [commit: 18] restored snapshot [index: 18, term: 1] +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:18 Lead:1 LeadEpoch:1 + Snapshot Index:18 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:0/18 Commit:18 +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:0/18 Commit:18 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=18 next=19 sentCommit=18 matchCommit=18 paused pendingSnap=18] # All nodes caught up. status 1 diff --git a/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt b/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt index 887149e17007..2fafbdc5f4c6 100644 --- a/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt +++ b/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -55,9 +55,11 @@ INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastter # Time passes on the leader so that it will try the previously missing follower # again. +# TODO(ibrahim): Consider not constructing a snapshot in the first place if we +# can't send it. tick-heartbeat 1 ---- -ok +DEBUG ignore sending snapshot to 3 since it is not recently active process-ready 1 ---- diff --git a/pkg/raft/tracker/progress.go b/pkg/raft/tracker/progress.go index e39f45fdde53..55ee39b3b64f 100644 --- a/pkg/raft/tracker/progress.go +++ b/pkg/raft/tracker/progress.go @@ -109,12 +109,15 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // MsgAppProbesPaused is used when the MsgApp flow to a node is throttled. This - // happens in StateProbe, or StateReplicate with saturated Inflights. In both - // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppProbesPaused is false (it is reset on - // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused(), ShouldSendEntries(), and ShouldSendProbe(). + // MsgAppProbesPaused is used when the MsgApp flow to a node is throttled. + // This happens in StateProbe, or StateReplicate with saturated Inflights. In + // both cases, we need to continue sending MsgApp once in a while to guarantee + // progress, but we only do so when MsgAppProbesPaused is false to avoid + // spinning. + // MsgAppProbesPaused is reset on the next MsgHeartbeatResp from the follower, + // or on next heartbeat timeout if the follower's store supports the leader's + // store. + // See IsPaused(), ShouldSendEntries(), and ShouldSendMsgApp(). MsgAppProbesPaused bool // Inflights is a sliding window for the inflight messages.