From cb77779cb361e459de7f51f9117362b8d55b06e9 Mon Sep 17 00:00:00 2001 From: Ibrahim Kettaneh Date: Mon, 26 Aug 2024 16:43:56 -0400 Subject: [PATCH] raft: resume probe at heartbeat intervals --- pkg/raft/BUILD.bazel | 1 + pkg/raft/node_test.go | 5 +- pkg/raft/raft.go | 69 +++- pkg/raft/raft_flow_control_test.go | 116 +++--- pkg/raft/raft_paper_test.go | 59 +++- pkg/raft/raft_test.go | 329 +++++++++++++----- .../interaction_env_handler_add_nodes.go | 11 +- .../async_storage_writes_append_aba_race.txt | 176 +++++----- pkg/raft/testdata/checkquorum.txt | 45 ++- pkg/raft/testdata/forget_leader.txt | 8 + .../forget_leader_prevote_checkquorum.txt | 4 + .../heartbeat_resp_recovers_from_probing.txt | 18 +- ...rs_from_probing_storeliveness_disabled.txt | 91 +++++ pkg/raft/testdata/lagging_commit.txt | 38 +- pkg/raft/testdata/msg_app_commit_index.txt | 18 +- pkg/raft/testdata/replicate_pause.txt | 45 ++- .../slow_follower_after_compaction.txt | 8 +- .../snapshot_succeed_via_app_resp.txt | 35 +- pkg/raft/tracker/progress.go | 6 +- pkg/raft/tracker/supporttracker.go | 24 ++ pkg/raft/tracker/supporttracker_test.go | 72 ++++ 21 files changed, 874 insertions(+), 304 deletions(-) create mode 100644 pkg/raft/testdata/heartbeat_resp_recovers_from_probing_storeliveness_disabled.txt diff --git a/pkg/raft/BUILD.bazel b/pkg/raft/BUILD.bazel index 4061909348c0..1761144ae87a 100644 --- a/pkg/raft/BUILD.bazel +++ b/pkg/raft/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "//pkg/raft/rafttest", "//pkg/raft/tracker", "//pkg/settings/cluster", + "//pkg/testutils", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 7fc88435f193..9647cccb3863 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -765,7 +765,10 @@ func TestAppendPagination(t *testing.T) { // After the partition recovers, tick the clock to wake everything // back up and send the messages. - n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat}) + p := n.peers[raftpb.PeerID(1)].(*raft) + for ticks := p.heartbeatTimeout; ticks > 0; ticks-- { + n.tickRaftHeartbeat(p) + } assert.True(t, seenFullMessage, "didn't see any messages more than half the max size; something is wrong with this test") } diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 0a361724a162..b01be79c7866 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -625,15 +625,20 @@ func (r *raft) maybeSendAppend(to pb.PeerID) bool { pr := r.trk.Progress(to) last, commit := r.raftLog.lastIndex(), r.raftLog.committed + //fmt.Printf("!!! IBRAHIM 0 !!! \n") + if !pr.ShouldSendMsgApp(last, commit, r.advanceCommitViaMsgAppOnly()) { return false } + //fmt.Printf("!!! IBRAHIM 1 !!! \n") prevIndex := pr.Next - 1 prevTerm, err := r.raftLog.term(prevIndex) if err != nil { // The log probably got truncated at >= pr.Next, so we can't catch up the // follower log anymore. Send a snapshot instead. + //fmt.Printf("!!! IBRAHIM 2 !!! \n") + return r.maybeSendSnapshot(to, pr) } @@ -641,9 +646,12 @@ func (r *raft) maybeSendAppend(to pb.PeerID) bool { if pr.CanSendEntries(last) { if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil { // Send a snapshot if we failed to get the entries. + //fmt.Printf("!!! IBRAHIM 3 !!! \n") + return r.maybeSendSnapshot(to, pr) } } + //fmt.Printf("!!! IBRAHIM 4 !!! \n") // Send the MsgApp, and update the progress accordingly. r.send(pb.Message{ @@ -761,6 +769,36 @@ func (r *raft) bcastAppend() { }) } +// maybeSendAppendOrFortify checks if the follower is supporting the leader. +// If it is, it first unpauses all followers that are currently paused due to +// MsgAppProbesPaused. Then it sends RPC, with entries to all peers that are +// not up-to-date according to the progress recorded in r.trk. +// If the follower doesn't currently support the leader, it sends a fortify +// message. +func (r *raft) maybeSendAppendOrFortify() { + r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) { + if id == r.id { + // NB: the leader doesn't send MsgAppResp to itself here. This means that + // the leader will not have a chance to update its own + // MatchCommit/SentCommit. That is fine because the leader doesn't use + // MatchCommit/SentCommit for itself. It only uses the followers' values. + return + } + + if r.supportTracker.IsLeadSupportedByFollower(id) { + //fmt.Printf("!!! IBRAHIM !!! follower: %+v is supporting the current leader\n", id) + pr.MsgAppProbesPaused = false + r.maybeSendAppend(id) + //if sent { + //fmt.Printf("!!! IBRAHIM !!! sent MsgApp to follower: %+v\n", id) + //} + } else { + //fmt.Printf("!!! IBRAHIM !!! follower: %+v is NOT supporting the current leader\n", id) + r.sendFortify(id) + } + }) +} + // bcastHeartbeat sends RPC, without entries to all the peers. func (r *raft) bcastHeartbeat() { r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) { @@ -996,6 +1034,18 @@ func (r *raft) tickHeartbeat() { if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil { r.logger.Debugf("error occurred during checking sending heartbeat: %v", err) } + + // If storeliveness is NOT enabled, heartbeat responses unpause followers + // and trigger the leader to send MsgApp if the follower is not up-to-date. + // If storeliveness is enabled, we don't depend on heartbeat responses for + // that, and we do it here on heartbeat timeout if the follower is + // supporting the leader. If the follower doesn't support the leader, the + // leader tries to fortify the follower by sending a MsgFortifyLeader to + // that follower. + //fmt.Printf("!!! IBRAHIM !!! maybeSendAppendOrFortify()\n") + if r.storeLiveness.SupportFromEnabled() { + r.maybeSendAppendOrFortify() + } } } @@ -1432,6 +1482,15 @@ func stepLeader(r *raft, m pb.Message) error { switch m.Type { case pb.MsgBeat: r.bcastHeartbeat() + + //// If storeLiveness is enabled, unpause followers on heartbeat intervals + //// and broadcast append entries if they need it. + //// If storeliveness is not enabled, this will happen when the leader + //// receives a heartbeat response from a follower. + //if r.storeLiveness.SupportFromEnabled() { + // r.maybeSendAppendOrFortify() + //} + return nil case pb.MsgCheckQuorum: if !r.trk.QuorumActive() { @@ -1721,8 +1780,13 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.MsgAppProbesPaused = false - r.maybeSendAppend(m.From) + //fmt.Printf("!!! IBRAHIM !!! received heartbeat response from node: %+v \n", m.From) + // If storeLiveness is enabled, we don't necessiraly heartbeat followers, + // therefore, we unpause & send MsgApp to followers on every MsgBeat. + if !r.storeLiveness.SupportFromEnabled() { + pr.MsgAppProbesPaused = false + r.maybeSendAppend(m.From) + } case pb.MsgSnapStatus: if pr.State != tracker.StateSnapshot { @@ -2081,6 +2145,7 @@ func (r *raft) handleSnapshot(m pb.Message) { } func (r *raft) handleFortify(m pb.Message) { + //fmt.Printf("!!! IBRAHIM !!! handleFortify \n") assertTrue(r.state == StateFollower, "leaders should locally fortify without sending a message") assertTrue(r.lead == m.From, "only the leader should send fortification requests") diff --git a/pkg/raft/raft_flow_control_test.go b/pkg/raft/raft_flow_control_test.go index b5aa80a0720c..17d15e8db3ca 100644 --- a/pkg/raft/raft_flow_control_test.go +++ b/pkg/raft/raft_flow_control_test.go @@ -21,6 +21,7 @@ import ( "testing" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/testutils" ) // TestMsgAppFlowControlFull ensures: @@ -106,50 +107,83 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { } } -// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response -// frees one slot if the window is full. -func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { - r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) - r.becomeCandidate() - r.becomeLeader() - - pr2 := r.trk.Progress(2) - // force the progress to be in replicate state - pr2.BecomeReplicate() - // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - r.readMessages() - } - - for tt := 1; tt < 5; tt++ { - // recv tt msgHeartbeatResp and expect one free slot - for i := 0; i < tt; i++ { - if !pr2.IsPaused() { - t.Fatalf("#%d.%d: paused = false, want true", tt, i) +// TestMsgAppFlowControl ensures that if store-liveness is disabled, a heartbeat +// response frees one slot if the window is full. If store-livess is enabled, +// a similar thing would happen but on the next heartbeat interval (not on the +// heartbeat response). +func TestMsgAppFlowControl(t *testing.T) { + var r *raft + testutils.RunTrueAndFalse(t, "store-liveness", + func(t *testing.T, storeLivenessEnabled bool) { + if storeLivenessEnabled { + r = newTestRaft(1, 5, 1, + newTestMemoryStorage(withPeers(1, 2))) + } else { + r = newTestRaft(1, 5, 1, + newTestMemoryStorage(withPeers(1, 2)), withFortificationDisabled()) } - // Unpauses the progress, sends an empty MsgApp, and pauses it again. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - ms := r.readMessages() - if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { - t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) - } - } - // No more appends are sent if there are no heartbeats. - for i := 0; i < 10; i++ { - if !pr2.IsPaused() { - t.Fatalf("#%d.%d: paused = false, want true", tt, i) + r.becomeCandidate() + r.becomeLeader() + if storeLivenessEnabled { + r.supportTracker.RecordSupport(pb.PeerID(2), 1) + r.supportTracker.RecordSupport(pb.PeerID(3), 1) } - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - ms := r.readMessages() - if len(ms) != 0 { - t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms)) + + pr2 := r.trk.Progress(2) + // force the progress to be in replicate state + pr2.BecomeReplicate() + // fill in the inflights window + for i := 0; i < r.maxInflight; i++ { + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, + Entries: []pb.Entry{{Data: []byte("somedata")}}}) + r.readMessages() } - } - // clear all pending messages. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - r.readMessages() - } + for tt := 1; tt < 5; tt++ { + // recv tt msgHeartbeatResp and expect one free slot + for i := 0; i < tt; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + + // Unpauses the progress, sends an empty MsgApp, and pauses it again. + // When store-liveness is enabled, we do this on the next heartbeat + // interval. However, when store-liveness is disabled, we do this on + // the next heartbeat response. + if storeLivenessEnabled { + for ticks := r.heartbeatTimeout; ticks > 0; ticks-- { + r.tickHeartbeat() + } + ms := r.readMessages() + if len(ms) != 2 || ms[1].Type != pb.MsgApp || len(ms[1].Entries) != 0 { + t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) + } + } else { + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) + ms := r.readMessages() + if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 { + t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms)) + } + } + } + + // No more appends are sent if there are no heartbeats. + for i := 0; i < 10; i++ { + if !pr2.IsPaused() { + t.Fatalf("#%d.%d: paused = false, want true", tt, i) + } + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, + Entries: []pb.Entry{{Data: []byte("somedata")}}}) + ms := r.readMessages() + if len(ms) != 0 { + t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms)) + } + } + + // clear all pending messages. + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) + r.readMessages() + } + }) } diff --git a/pkg/raft/raft_paper_test.go b/pkg/raft/raft_paper_test.go index 866e51450853..2b6c9b761037 100644 --- a/pkg/raft/raft_paper_test.go +++ b/pkg/raft/raft_paper_test.go @@ -35,6 +35,7 @@ import ( "testing" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -101,27 +102,55 @@ func TestStartAsFollower(t *testing.T) { // TestLeaderBcastBeat tests that if the leader receives a heartbeat tick, // it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries // as heartbeat to all followers. +// If store-liveness is enabled, the leader will send MsgApp messages as +// we eventually rip-out the MsgHeartbeat all together. // Reference: section 5.2 func TestLeaderBcastBeat(t *testing.T) { // heartbeat interval hi := 1 - r := newTestRaft(1, 10, hi, newTestMemoryStorage(withPeers(1, 2, 3))) - r.becomeCandidate() - r.becomeLeader() - for i := 0; i < 10; i++ { - mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1}) - } - for i := 0; i < hi; i++ { - r.tick() - } + testutils.RunTrueAndFalse(t, "store-liveness", func(t *testing.T, storeLivenessEnabled bool) { + var r *raft + if storeLivenessEnabled { + r = newTestRaft(1, 10, hi, + newTestMemoryStorage(withPeers(1, 2, 3))) + } else { + r = newTestRaft(1, 10, hi, + newTestMemoryStorage(withPeers(1, 2, 3)), withFortificationDisabled()) + } - msgs := r.readMessages() - sort.Sort(messageSlice(msgs)) - assert.Equal(t, []pb.Message{ - {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat}, - {From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat}, - }, msgs) + r.becomeCandidate() + r.becomeLeader() + if storeLivenessEnabled { + r.supportTracker.RecordSupport(pb.PeerID(2), 1) + r.supportTracker.RecordSupport(pb.PeerID(3), 1) + } + + for i := 0; i < 10; i++ { + mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1}) + } + + for i := 0; i < hi; i++ { + r.tick() + } + + msgs := r.readMessages() + sort.Sort(messageSlice(msgs)) + + if storeLivenessEnabled { + assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 1, Entries: r.raftLog.allEntries(), Type: pb.MsgApp}, + {From: 1, To: 3, Term: 1, Entries: r.raftLog.allEntries(), Type: pb.MsgApp}, + {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat}, + {From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat}, + }, msgs) + } else { + assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat}, + {From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat}, + }, msgs) + } + }) } func TestFollowerStartElection(t *testing.T) { diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index 487f3e5f3f68..1ada921d865f 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -843,14 +844,12 @@ func TestCandidateConcede(t *testing.T) { // heal the partition tt.recover() - // send heartbeat; reset wait - tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) + // send append; reset wait + tt.send(pb.Message{From: 3, To: 1, Type: pb.MsgApp}) data := []byte("force follower") // send a proposal to 3 to flush out a MsgApp to 1 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) - // send heartbeat; flush out commit - tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) a := tt.peers[1].(*raft) assert.Equal(t, StateFollower, a.state) @@ -1167,12 +1166,13 @@ func TestHandleHeartbeat(t *testing.T) { } } -// TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response. -func TestHandleHeartbeatResp(t *testing.T) { +// TestHandleHeartbeatRespStoreLivenessDisabled ensures that we re-send log +// entries when we get a heartbeat response. +func TestHandleHeartbeatRespStoreLivenessDisabled(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) require.NoError(t, storage.SetHardState(pb.HardState{Term: 3})) require.NoError(t, storage.Append(index(1).terms(1, 2, 3))) - sm := newTestRaft(1, 5, 1, storage) + sm := newTestRaft(1, 5, 1, storage, withFortificationDisabled()) sm.becomeCandidate() sm.becomeLeader() sm.raftLog.commitTo(LogMark{Term: 3, Index: sm.raftLog.lastIndex()}) @@ -1204,6 +1204,58 @@ func TestHandleHeartbeatResp(t *testing.T) { require.Empty(t, msgs) } +// TestHandleHeatbeatTimeoutStoreLivenessEnabled ensures that we re-send log +// entries on heartbeat intervals only if we need to. +func TestHandleHeatbeatTimeoutStoreLivenessEnabled(t *testing.T) { + storage := newTestMemoryStorage(withPeers(1, 2)) + require.NoError(t, storage.SetHardState(pb.HardState{Term: 3})) + require.NoError(t, storage.Append(index(1).terms(1, 2, 3))) + sm := newTestRaft(1, 5, 1, storage) + sm.becomeCandidate() + sm.becomeLeader() + sm.supportTracker.RecordSupport(pb.PeerID(2), 1) + sm.supportTracker.RecordSupport(pb.PeerID(3), 1) + sm.raftLog.commitTo(LogMark{Term: 3, Index: sm.raftLog.lastIndex()}) + + // On a heartbeat interval we send a MsgApp. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tickHeartbeat() + } + + msgs := sm.readMessages() + require.Len(t, msgs, 2) + assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type) + assert.Equal(t, pb.MsgApp, msgs[1].Type) + + // On another heartbeat interval we send a MsgApp. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tickHeartbeat() + } + msgs = sm.readMessages() + require.Len(t, msgs, 2) + assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type) + assert.Equal(t, pb.MsgApp, msgs[1].Type) + + // Once we have an MsgAppResp, MsgBeat no longer send MsgApp. + sm.Step(pb.Message{ + From: 2, + Type: pb.MsgAppResp, + Index: msgs[1].Index + uint64(len(msgs[1].Entries)), + Commit: sm.raftLog.lastIndex(), + }) + + // Consume the message sent in response to MsgAppResp + sm.readMessages() + + // On a heartbeat interval we don't send a MsgApp. + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tickHeartbeat() + } + msgs = sm.readMessages() + require.Len(t, msgs, 1) + assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type) +} + // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { @@ -1235,7 +1287,8 @@ func TestMsgAppRespWaitReset(t *testing.T) { }) // The command is broadcast to all nodes not in the wait state. - // Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting. + // Node 2 left the wait state due to its MsgAppResp, but node 3 is still + // waiting. msgs := sm.readMessages() require.Len(t, msgs, 1) assert.Equal(t, pb.MsgApp, msgs[0].Type) @@ -1243,7 +1296,8 @@ func TestMsgAppRespWaitReset(t *testing.T) { assert.Len(t, msgs[0].Entries, 1) assert.Equal(t, uint64(2), msgs[0].Entries[0].Index) - // Now Node 3 acks the first entry. This releases the wait and entry 2 is sent. + // Now Node 3 acks the first entry. This releases the wait and entry 2 is + // sent. sm.Step(pb.Message{ From: 3, Type: pb.MsgAppResp, @@ -1977,42 +2031,77 @@ func TestLeaderAppResp(t *testing.T) { // TestBcastBeat is when the leader receives a heartbeat tick, it should // send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries. func TestBcastBeat(t *testing.T) { - offset := uint64(1000) - // make a state machine with log.offset = 1000 - s := pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - Index: offset, - Term: 1, - ConfState: pb.ConfState{Voters: []pb.PeerID{1, 2, 3}}, - }, - } - storage := NewMemoryStorage() - storage.ApplySnapshot(s) - sm := newTestRaft(1, 10, 1, storage) - sm.Term = 1 + var sm *raft + testutils.RunTrueAndFalse(t, "store-liveness", + func(t *testing.T, storeLivenessEnabled bool) { + offset := uint64(1000) + // make a state machine with log.offset = 1000 + s := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + Index: offset, + Term: 1, + ConfState: pb.ConfState{Voters: []pb.PeerID{1, 2, 3}}, + }, + } + storage := NewMemoryStorage() + storage.ApplySnapshot(s) - sm.becomeCandidate() - sm.becomeLeader() - for i := 0; i < 10; i++ { - mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) - } - sm.advanceMessagesAfterAppend() + if storeLivenessEnabled { + sm = newTestRaft(1, 10, 1, storage) + } else { + sm = newTestRaft(1, 10, 1, storage, withFortificationDisabled()) + } + sm.Term = 1 - // slow follower - sm.trk.Progress(2).Match, sm.trk.Progress(2).Next = 5, 6 - // normal follower - sm.trk.Progress(3).Match, sm.trk.Progress(3).Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.becomeCandidate() + sm.becomeLeader() + if storeLivenessEnabled { + sm.supportTracker.RecordSupport(pb.PeerID(2), 1) + sm.supportTracker.RecordSupport(pb.PeerID(3), 1) + } - sm.Step(pb.Message{Type: pb.MsgBeat}) - msgs := sm.readMessages() - require.Len(t, msgs, 2) + for i := 0; i < 10; i++ { + mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) + } + sm.advanceMessagesAfterAppend() - for i, m := range msgs { - require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i) - require.Zero(t, m.Index, "#%d", i) - require.Zero(t, m.LogTerm, "#%d", i) - require.Empty(t, m.Entries, "#%d", i) - } + // slow follower + sm.trk.Progress(2).Match, sm.trk.Progress(2).Next = 5, 6 + // normal follower + sm.trk.Progress(3).Match, sm.trk.Progress(3).Next = sm.raftLog.lastIndex(), + sm.raftLog.lastIndex()+1 + + for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- { + sm.tickHeartbeat() + } + msgs := sm.readMessages() + // If storeliveness is enabled, the MsgBeat will also send a MsgApp if it + // needs to. In this case since follower 2 is slow, we will send a MsgApp + // to it. + if storeLivenessEnabled { + require.Len(t, msgs, 3) + assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 2, Type: pb.MsgHeartbeat, Match: 5}, + {From: 1, To: 3, Term: 2, Type: pb.MsgHeartbeat, Match: 1011}, + {From: 1, To: 3, Term: 2, LogTerm: 2, Index: 1011, Commit: 1000, Match: 1011, + Type: pb.MsgApp}, + }, msgs) + } else { + require.Len(t, msgs, 2) + assert.Equal(t, []pb.Message{ + {From: 1, To: 2, Term: 2, Type: pb.MsgHeartbeat, Match: 5}, + {From: 1, To: 3, Term: 2, Type: pb.MsgHeartbeat, Match: 1011}, + }, msgs) + } + + // remove the third msg (MsgApp if store-liveness is enabled). + for i, m := range msgs[:2] { + require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i) + require.Zero(t, m.Index, "#%d", i) + require.Zero(t, m.LogTerm, "#%d", i) + require.Empty(t, m.Entries, "#%d", i) + } + }) } // TestRecvMsgBeat tests the output of the state machine when receiving MsgBeat @@ -2082,50 +2171,93 @@ func TestLeaderIncreaseNext(t *testing.T) { } func TestSendAppendForProgressProbe(t *testing.T) { - r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) - r.becomeCandidate() - r.becomeLeader() - r.readMessages() - r.trk.Progress(2).BecomeProbe() + var r *raft + testutils.RunTrueAndFalse(t, "store-liveness", + func(t *testing.T, storeLivenessEnabled bool) { + if storeLivenessEnabled { + r = newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) + } else { + r = newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)), + withFortificationDisabled()) + } - // each round is a heartbeat - for i := 0; i < 3; i++ { - if i == 0 { - // we expect that raft will only send out one msgAPP on the first - // loop. After that, the follower is paused until a heartbeat response is - // received. - mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) - msg := r.readMessages() - assert.Len(t, msg, 1) - assert.Zero(t, msg[0].Index) - } + r.becomeCandidate() + r.becomeLeader() + if storeLivenessEnabled { + r.supportTracker.RecordSupport(pb.PeerID(2), 1) + r.supportTracker.RecordSupport(pb.PeerID(3), 1) + } - assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) - for j := 0; j < 10; j++ { - mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) - assert.Empty(t, r.readMessages()) - } + r.readMessages() + r.trk.Progress(2).BecomeProbe() + + // each round is a heartbeat + for i := 0; i < 3; i++ { + if i == 0 { + // we expect that raft will only send out one msgAPP on the first + // loop. After that, the follower is paused until a heartbeat response + // is received. + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) + r.maybeSendAppend(2) + msg := r.readMessages() + assert.Len(t, msg, 1) + assert.Zero(t, msg[0].Index) + } - // do a heartbeat - for j := 0; j < r.heartbeatTimeout; j++ { - r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - } - assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + for j := 0; j < 10; j++ { + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) + r.maybeSendAppend(2) + assert.Empty(t, r.readMessages()) + } - // consume the heartbeat - msg := r.readMessages() - assert.Len(t, msg, 1) - assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) - } + // do a heartbeat + for ticks := r.heartbeatTimeout; ticks > 0; ticks-- { + r.tickHeartbeat() + } + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + + // consume the heartbeat and the MsgApp if store-liveness is enabled + msg := r.readMessages() + if storeLivenessEnabled { + assert.Len(t, msg, 2) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + assert.Equal(t, pb.MsgApp, msg[1].Type) + } else { + assert.Len(t, msg, 1) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + } + } - // a heartbeat response will allow another message to be sent - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - msg := r.readMessages() - assert.Len(t, msg, 1) - assert.Zero(t, msg[0].Index) - assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + // The next heartbeat timeout will allow another message to be sent if + // store-liveness is enabled. + for ticks := r.heartbeatTimeout; ticks > 0; ticks-- { + r.tickHeartbeat() + } + msg := r.readMessages() + if storeLivenessEnabled { + assert.Len(t, msg, 2) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + assert.Equal(t, pb.MsgApp, msg[1].Type) + assert.Zero(t, msg[1].Index) + } else { + assert.Len(t, msg, 1) + assert.Equal(t, pb.MsgHeartbeat, msg[0].Type) + } + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + + // The next heartbeat response will allow another message to be sent if + // store-liveness is disabled. + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) + msg = r.readMessages() + if storeLivenessEnabled { + assert.Len(t, msg, 0) + } else { + assert.Len(t, msg, 1) + assert.Zero(t, msg[0].Index) + } + assert.True(t, r.trk.Progress(2).MsgAppProbesPaused) + }) } func TestSendAppendForProgressReplicate(t *testing.T) { @@ -3786,8 +3918,11 @@ func TestFastLogRejection(t *testing.T) { Commit: last.Index, }) n1 := newTestRaft(1, 10, 1, s1) + n1.becomeCandidate() // bumps Term to last.Term n1.becomeLeader() + n1.supportTracker.RecordSupport(pb.PeerID(2), 1) + n1.supportTracker.RecordSupport(pb.PeerID(3), 1) s2 := NewMemoryStorage() s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []pb.PeerID{1, 2, 3}} @@ -3798,6 +3933,7 @@ func TestFastLogRejection(t *testing.T) { Commit: 0, }) n2 := newTestRaft(2, 10, 1, s2) + if test.followerCompact != 0 { s2.Compact(test.followerCompact) // NB: the state of n2 after this compaction isn't realistic because the @@ -3805,17 +3941,17 @@ func TestFastLogRejection(t *testing.T) { // edge case behaviour, in case it still does happen in some other way. } - require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat})) - msgs := n2.readMessages() - require.Len(t, msgs, 1, "can't read 1 message from peer 2") - require.Equal(t, pb.MsgHeartbeatResp, msgs[0].Type) - - require.NoError(t, n1.Step(msgs[0])) - msgs = n1.readMessages() - require.Len(t, msgs, 1, "can't read 1 message from peer 1") - require.Equal(t, pb.MsgApp, msgs[0].Type) - - require.NoError(t, n2.Step(msgs[0]), "peer 2 step append fail") + for ticks := n1.heartbeatTimeout; ticks > 0; ticks-- { + n1.tickHeartbeat() + } + msgs := n1.readMessages() + require.Len(t, msgs, 4, "can't read 1 message from peer 2") + require.Equal(t, pb.MsgHeartbeat, msgs[0].Type) + require.Equal(t, pb.MsgHeartbeat, msgs[1].Type) + require.Equal(t, pb.MsgApp, msgs[2].Type) + require.Equal(t, pb.MsgApp, msgs[3].Type) + + require.NoError(t, n2.Step(msgs[2]), "peer 2 step append fail") msgs = n2.readMessages() require.Len(t, msgs, 1, "can't read 1 message from peer 2") require.Equal(t, pb.MsgAppResp, msgs[0].Type) @@ -4016,6 +4152,17 @@ func (nw *network) send(msgs ...pb.Message) { } } +// tickRaftHeartbeat takes a raft instance and calls tickHeartbeat(). It then +// uses the network.send function if that generates any messages. +func (nw *network) tickRaftHeartbeat(p *raft) { + p.tickHeartbeat() + p.advanceMessagesAfterAppend() + msgs := nw.filter(p.readMessages()) + if len(msgs) > 0 { + nw.send(msgs...) + } +} + func (nw *network) drop(from, to pb.PeerID, perc float64) { nw.dropm[connem{from, to}] = perc } diff --git a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go index fd4f157d1be6..d53b39b7575f 100644 --- a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/raft" pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/datadriven" @@ -78,6 +79,12 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e clusterversion.RemoveDevOffset(clusterversion.MinSupported.Version()), true /* initializeVersion */) cfg.CRDBVersion = settings.Version + case "store-liveness-disabled": + var disableSL bool + arg.Scan(t, i, &disableSL) + if disableSL { + cfg.StoreLiveness = raftstoreliveness.Disabled{} + } } } } @@ -142,7 +149,9 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er cfg.ID, cfg.Storage = id, s env.Fabric.addNode() - cfg.StoreLiveness = newStoreLiveness(env.Fabric, id) + if cfg.StoreLiveness == nil { + cfg.StoreLiveness = newStoreLiveness(env.Fabric, id) + } // If the node creating command hasn't specified the CRDBVersion, use the // latest one. diff --git a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt index c1f6e0a6ce79..2db054590b46 100644 --- a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt +++ b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt @@ -389,10 +389,11 @@ Messages: ] # Step 7: before the new entries reach node 1, it hears of the term change -# through a heartbeat and persists the new term. Node 1 then receives these -# entries, overwriting the previous unstable log entries that are in the process -# of being appended. The entries have a larger term than the previous entries -# but the same indexes. It begins appending these new entries asynchronously. +# through a MsgFortifyLeader and persists the new term. Node 1 then receives +# these entries, overwriting the previous unstable log entries that are in the +# process of being appended. The entries have a larger term than the previous +# entries but the same indexes. It begins appending these new entries +# asynchronously. deliver-msgs drop=1 ---- @@ -414,52 +415,30 @@ Messages: 4->5 MsgHeartbeat Term:3 Log:0/0 4->6 MsgHeartbeat Term:3 Log:0/0 4->7 MsgHeartbeat Term:3 Log:0/0 - +4->1 MsgFortifyLeader Term:3 Log:0/0 +4->2 MsgFortifyLeader Term:3 Log:0/0 +4->3 MsgFortifyLeader Term:3 Log:0/0 +4->5 MsgFortifyLeader Term:3 Log:0/0 +4->6 MsgFortifyLeader Term:3 Log:0/0 +4->7 MsgFortifyLeader Term:3 Log:0/0 + deliver-msgs 1 ---- 4->1 MsgHeartbeat Term:3 Log:0/0 INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3] INFO 1 became follower at term 3 +4->1 MsgFortifyLeader Term:3 Log:0/0 process-ready 1 ---- Ready MustSync=true: -HardState Term:3 Commit:11 Lead:4 LeadEpoch:0 +HardState Term:3 Commit:11 Lead:4 LeadEpoch:1 Messages: 1->4 MsgHeartbeatResp Term:3 Log:0/0 -1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 - -deliver-msgs 4 ----- -1->4 MsgHeartbeatResp Term:3 Log:0/0 - -process-ready 4 ----- -Ready MustSync=false: -Messages: -4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] - -deliver-msgs 1 ----- -4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] -INFO found conflict at index 12 [existing term: 2, conflicting term: 3] -INFO replace the unstable entries from index 12 - -process-ready 1 ----- -Ready MustSync=true: -Entries: -3/12 EntryNormal "" -Messages: -1->AppendThread MsgStorageAppend Term:0 Log:3/12 Entries:[3/12 EntryNormal ""] Responses:[ - 1->4 MsgAppResp Term:3 Log:0/12 Commit:11 - AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 +1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 LeadEpoch:1 Responses:[ + 1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 ] -# Step 8: The asynchronous log appends from the first Ready complete and the -# MsgStorageAppendResp is returned to the raft node state machine. A decision -# is made about whether to truncate the unstable log. - raft-log 1 ---- 1/11 EntryNormal "" @@ -477,67 +456,106 @@ raft-log 1 1/11 EntryNormal "" 1/12 EntryNormal "init_prop" -# Step 9: However, the log entries from the second Ready are still in the -# asynchronous append pipeline and will overwrite (in stable storage) the -# entries from the first Ready at some future point. We can't truncate the -# unstable log yet or a future read from Storage might see the entries from step -# 5 before they have been replaced by the entries from step 7. Instead, we must -# wait until we are sure that the entries are stable and that no in-progress -# appends might overwrite them before removing entries from the unstable log. - -deliver-msgs 1 ----- -AppendThread->1 MsgStorageAppendResp Term:0 Log:1/12 -INFO mark (term,index)=(1,12) mismatched the last accepted term 3 in unstable log; ignoring - -process-append-thread 1 +stabilize 1 ---- -Processing: -1->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] -Responses: -1->3 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0) -1->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1 -1->3 MsgAppResp Term:2 Log:0/12 Commit:11 -AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:0 Log:1/12 + INFO mark (term,index)=(1,12) mismatched the last accepted term 2 in unstable log; ignoring +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] + Responses: + 1->3 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0) + 1->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1 + 1->3 MsgAppResp Term:2 Log:0/12 Commit:11 + AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 + Processing: + 1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 LeadEpoch:1 + Responses: + 1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 raft-log 1 ---- 1/11 EntryNormal "" 2/12 EntryNormal "" -deliver-msgs 1 +deliver-msgs 4 ---- -AppendThread->1 MsgStorageAppendResp Term:0 Log:2/12 -INFO mark (term,index)=(2,12) mismatched the last accepted term 3 in unstable log; ignoring +1->4 MsgHeartbeatResp Term:3 Log:0/0 +1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 -process-append-thread 1 +stabilize 4 ---- -Processing: -1->AppendThread MsgStorageAppend Term:3 Log:0/0 Commit:11 Lead:4 -Responses: +> 4 processing append thread + Processing: + 4->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Vote:4 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] + Responses: + 4->4 MsgAppResp Term:3 Log:0/12 Commit:11 + 4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 + AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12 +> 4 receiving messages + 4->4 MsgAppResp Term:3 Log:0/12 Commit:11 + 4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 + AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12 -raft-log 1 +tick-heartbeat 4 ---- -1/11 EntryNormal "" -2/12 EntryNormal "" +ok + +process-ready 4 +---- +Ready MustSync=false: +Messages: +4->1 MsgHeartbeat Term:3 Log:0/0 +4->2 MsgHeartbeat Term:3 Log:0/0 +4->3 MsgHeartbeat Term:3 Log:0/0 +4->5 MsgHeartbeat Term:3 Log:0/0 +4->6 MsgHeartbeat Term:3 Log:0/0 +4->7 MsgHeartbeat Term:3 Log:0/0 +4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->2 MsgFortifyLeader Term:3 Log:0/0 +4->3 MsgFortifyLeader Term:3 Log:0/0 +4->5 MsgFortifyLeader Term:3 Log:0/0 +4->6 MsgFortifyLeader Term:3 Log:0/0 +4->7 MsgFortifyLeader Term:3 Log:0/0 deliver-msgs 1 ---- -no messages +4->1 MsgHeartbeat Term:3 Log:0/0 +4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +INFO found conflict at index 12 [existing term: 2, conflicting term: 3] +INFO replace the unstable entries from index 12 -process-append-thread 1 +process-ready 1 ---- -Processing: -1->AppendThread MsgStorageAppend Term:0 Log:3/12 Entries:[3/12 EntryNormal ""] -Responses: -1->4 MsgAppResp Term:3 Log:0/12 Commit:11 -AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 +Ready MustSync=true: +Entries: +3/12 EntryNormal "" +Messages: +1->4 MsgHeartbeatResp Term:3 Log:0/0 +1->AppendThread MsgStorageAppend Term:0 Log:3/12 Entries:[3/12 EntryNormal ""] Responses:[ + 1->4 MsgAppResp Term:3 Log:0/12 Commit:11 + AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 +] + +# Step 8: The asynchronous log appends from the first Ready complete and the +# MsgStorageAppendResp is returned to the raft node state machine. A decision +# is made about whether to truncate the unstable log. + +stabilize 1 +---- +> 1 processing append thread + Processing: + 1->AppendThread MsgStorageAppend Term:0 Log:3/12 Entries:[3/12 EntryNormal ""] + Responses: + 1->4 MsgAppResp Term:3 Log:0/12 Commit:11 + AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 +> 1 receiving messages + AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 raft-log 1 ---- 1/11 EntryNormal "" 3/12 EntryNormal "" - -deliver-msgs 1 ----- -AppendThread->1 MsgStorageAppendResp Term:0 Log:3/12 diff --git a/pkg/raft/testdata/checkquorum.txt b/pkg/raft/testdata/checkquorum.txt index 57a5c14841c7..ae34ec431d95 100644 --- a/pkg/raft/testdata/checkquorum.txt +++ b/pkg/raft/testdata/checkquorum.txt @@ -76,26 +76,51 @@ stabilize Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1] 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1] 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1] 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1] 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + INFO 2 [term: 2] ignored a MsgFortifyLeader message with lower term from 1 [term: 1] > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 2 handling Ready Ready MustSync=false: Messages: @@ -105,13 +130,19 @@ stabilize 2->1 MsgAppResp Term:2 Log:0/0 2->1 MsgAppResp Term:2 Log:0/0 > 3 handling Ready - Ready MustSync=false: + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:2 Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 > 1 receiving messages 2->1 MsgAppResp Term:2 Log:0/0 INFO 1 [term: 1] received a MsgAppResp message with higher term from 2 [term: 2] @@ -130,6 +161,16 @@ stabilize INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1] 3->1 MsgHeartbeatResp Term:1 Log:0/0 INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1] + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1] + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1] + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1] + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1] + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2 + INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1] > 1 handling Ready Ready MustSync=true: HardState Term:2 Commit:11 Lead:0 LeadEpoch:0 @@ -163,7 +204,7 @@ INFO 1 [logterm: 1, index: 11, vote: 0] cast MsgVote for 2 [logterm: 1, index: 1 deliver-msgs 3 ---- 2->3 MsgVote Term:3 Log:1/11 -INFO 3 [logterm: 1, index: 11, vote: 1] ignored MsgVote from 2 [logterm: 1, index: 11] at term 1: recently received communication from leader (remaining ticks: 3) +INFO 3 [logterm: 1, index: 11, vote: 1] ignored MsgVote from 2 [logterm: 1, index: 11] at term 1: recently received communication from leader (remaining ticks: 3) and supporting fortified leader 1 at epoch 2 stabilize ---- diff --git a/pkg/raft/testdata/forget_leader.txt b/pkg/raft/testdata/forget_leader.txt index 08971d2827f5..3538605e3dff 100644 --- a/pkg/raft/testdata/forget_leader.txt +++ b/pkg/raft/testdata/forget_leader.txt @@ -106,6 +106,8 @@ stabilize 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 1->4 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->4 MsgFortifyLeader Term:1 Log:0/0 > 2 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:11 Lead:0 LeadEpoch:0 @@ -114,15 +116,18 @@ stabilize HardState Term:1 Commit:11 Lead:0 LeadEpoch:0 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgFortifyLeader Term:1 Log:0/0 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 > 4 receiving messages 1->4 MsgHeartbeat Term:1 Log:0/0 + 1->4 MsgFortifyLeader Term:1 Log:0/0 > 2 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) > 3 handling Ready Ready MustSync=false: Messages: @@ -132,10 +137,13 @@ stabilize HardState Term:1 Commit:11 Lead:1 LeadEpoch:0 Messages: 4->1 MsgHeartbeatResp Term:1 Log:0/0 + 4->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 3->1 MsgHeartbeatResp Term:1 Log:0/0 4->1 MsgHeartbeatResp Term:1 Log:0/0 + 4->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) raft-state ---- diff --git a/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt b/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt index e3f73f374a12..19cee2338c1e 100644 --- a/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt +++ b/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt @@ -69,11 +69,13 @@ stabilize Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 INFO 3 became follower at term 1 + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 2 handling Ready Ready MustSync=false: Messages: @@ -84,9 +86,11 @@ stabilize HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) raft-state ---- diff --git a/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt b/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt index 2647704bfb0d..7c7829b1e262 100644 --- a/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt +++ b/pkg/raft/testdata/heartbeat_resp_recovers_from_probing.txt @@ -47,7 +47,7 @@ tick-heartbeat 1 ---- ok -# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate. +# Heartbeat -> MsgApp -> MsgAppResp -> StateReplicate. stabilize ---- > 1 handling Ready @@ -55,33 +55,25 @@ stabilize Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 > 3 handling Ready Ready MustSync=false: Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 - 3->1 MsgHeartbeatResp Term:1 Log:0/0 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/11 Commit:11 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/11 Commit:11 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 -> 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 status 1 ---- diff --git a/pkg/raft/testdata/heartbeat_resp_recovers_from_probing_storeliveness_disabled.txt b/pkg/raft/testdata/heartbeat_resp_recovers_from_probing_storeliveness_disabled.txt new file mode 100644 index 000000000000..c71834dd2578 --- /dev/null +++ b/pkg/raft/testdata/heartbeat_resp_recovers_from_probing_storeliveness_disabled.txt @@ -0,0 +1,91 @@ +# This test checks that if a fully caught-up follower transitions +# into StateProbe (for example due to a call to ReportUnreachable), the +# leader will react to a subsequent heartbeat response from the probing +# follower by sending an empty MsgApp, the response of which restores +# StateReplicate for the follower. In other words, we don't end up in +# a stable state with a fully caught up follower in StateProbe. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Create 3 nodes with store-liveness disabled. +add-nodes 3 voters=(1,2,3) index=10 store-liveness-disabled=true +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=11 next=12 sentCommit=10 matchCommit=10 +2: StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 +3: StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 + +# On the first replica, report the second one as not reachable. +report-unreachable 1 2 +---- +DEBUG 1 failed to send message to 2 because it is unreachable [StateProbe match=11 next=12 sentCommit=11 matchCommit=11] + +status 1 +---- +1: StateReplicate match=11 next=12 sentCommit=10 matchCommit=10 +2: StateProbe match=11 next=12 sentCommit=11 matchCommit=11 +3: StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 + +tick-heartbeat 1 +---- +ok + +# Heartbeat -> HeartbeatResp -> MsgApp -> MsgAppResp -> StateReplicate. +stabilize +---- +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgHeartbeat Term:1 Log:0/0 +> 2 receiving messages + 1->2 MsgHeartbeat Term:1 Log:0/0 +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgHeartbeatResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgApp Term:1 Log:1/11 Commit:11 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/11 Commit:11 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 + +status 1 +---- +1: StateReplicate match=11 next=12 sentCommit=10 matchCommit=10 +2: StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 +3: StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 diff --git a/pkg/raft/testdata/lagging_commit.txt b/pkg/raft/testdata/lagging_commit.txt index f7bf4e90cbc0..a533f5e1f93e 100644 --- a/pkg/raft/testdata/lagging_commit.txt +++ b/pkg/raft/testdata/lagging_commit.txt @@ -126,41 +126,27 @@ tick-heartbeat 1 ---- ok -# However, the leader does not push the real commit index to the follower 3. It -# cuts the commit index at the Progress.Match mark, because it thinks that it is -# unsafe to send a commit index higher than that. +# The leader knows that the follower 3 is lagging behind, so it sends a MsgApp +# to fix that. process-ready 1 ---- Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 - -# Since the heartbeat message does not bump the follower's commit index, it will -# take another roundtrip with the leader to update it. As such, the total time -# it takes for the follower to learn the commit index is: -# -# delay = HeartbeatInterval + 3/2 * RTT -# -# This is suboptimal. It could have taken HeartbeatInterval + 1/2 * RTT, if the -# leader sent the up-to-date commit index in the heartbeat message. -# -# See https://github.com/etcd-io/raft/issues/138 which aims to fix this. +1->3 MsgApp Term:1 Log:1/13 Commit:13 + +# Since the leader sends a MsgApp on the heartbeat timeout, it takes this long +# for the follower to advance its commit index: +# delay = HeartbeatInterval + 1/2 * RTT +# This is better than what we previously had, which was: +# HeartbeatInterval + 3/2 * RTT. That was the case because the leader needed to +# wait to send/receive a MsgHeartbeat/MsgHeartbeatResp before it could send the +# MsgApp. stabilize 1 3 ---- > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgHeartbeatResp Term:1 Log:0/0 -> 1 receiving messages - 3->1 MsgHeartbeatResp Term:1 Log:0/0 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:1 Log:1/13 Commit:13 -> 3 receiving messages 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 3 handling Ready Ready MustSync=true: @@ -169,6 +155,8 @@ stabilize 1 3 1/12 EntryNormal "data1" 1/13 EntryNormal "data2" Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 receiving messages + 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 diff --git a/pkg/raft/testdata/msg_app_commit_index.txt b/pkg/raft/testdata/msg_app_commit_index.txt index 5fc3ba849a15..b98932c3bd1e 100644 --- a/pkg/raft/testdata/msg_app_commit_index.txt +++ b/pkg/raft/testdata/msg_app_commit_index.txt @@ -124,6 +124,7 @@ Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgApp Term:1 Log:1/13 Commit:13 # On the next MsgApp sent to follower 3, the leader will include that the # commit index is 13. Notice that the leader doesn't send MsgApp to follower 2 @@ -134,23 +135,11 @@ stabilize 1 2 3 1->2 MsgHeartbeat Term:1 Log:0/0 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 -> 3 handling Ready - Ready MustSync=false: - Messages: - 3->1 MsgHeartbeatResp Term:1 Log:0/0 -> 1 receiving messages - 2->1 MsgHeartbeatResp Term:1 Log:0/0 - 3->1 MsgHeartbeatResp Term:1 Log:0/0 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgApp Term:1 Log:1/13 Commit:13 -> 3 receiving messages - 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 3 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1 @@ -158,8 +147,11 @@ stabilize 1 2 3 1/12 EntryNormal "data1" 1/13 EntryNormal "data2" Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 > 1 receiving messages + 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgAppResp Term:1 Log:0/13 Commit:13 # If the commit index is up-to-date, no MsgApp will be sent. diff --git a/pkg/raft/testdata/replicate_pause.txt b/pkg/raft/testdata/replicate_pause.txt index 7ee9a71e24f8..37e0ee6e796b 100644 --- a/pkg/raft/testdata/replicate_pause.txt +++ b/pkg/raft/testdata/replicate_pause.txt @@ -133,6 +133,7 @@ stabilize 1 Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/14 Commit:17 stabilize 2 3 ---- @@ -140,6 +141,8 @@ stabilize 2 3 1->2 MsgHeartbeat Term:1 Log:0/0 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/14 Commit:17 + DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 > 2 handling Ready Ready MustSync=false: Messages: @@ -148,6 +151,7 @@ stabilize 2 3 Ready MustSync=false: Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 # After handling heartbeat responses, node 1 sends an empty MsgApp to a # throttled node 3 because it hasn't yet replied to a single MsgApp, and the @@ -157,21 +161,52 @@ stabilize 1 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 + DEBUG 1 received MsgAppResp(rejected, hint: (index 11, term 1)) from 3 for index 14 + DEBUG 1 decreased progress of 3 to [StateReplicate match=11 next=12 sentCommit=11 matchCommit=11 paused inflight=3[full]] > 1 handling Ready Ready MustSync=false: Messages: - 1->3 MsgApp Term:1 Log:1/14 Commit:17 + 1->3 MsgApp Term:1 Log:1/11 Commit:17 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] # Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. stabilize 3 ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/14 Commit:17 - DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 + 1->3 MsgApp Term:1 Log:1/11 Commit:17 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] > 3 handling Ready - Ready MustSync=false: + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:17 Lead:1 LeadEpoch:1 + Entries: + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + CommittedEntries: + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" Messages: - 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) Commit:11 + 3->1 MsgAppResp Term:1 Log:0/17 Commit:17 log-level none ---- diff --git a/pkg/raft/testdata/slow_follower_after_compaction.txt b/pkg/raft/testdata/slow_follower_after_compaction.txt index 83a94fbbb6b0..7f417a539766 100644 --- a/pkg/raft/testdata/slow_follower_after_compaction.txt +++ b/pkg/raft/testdata/slow_follower_after_compaction.txt @@ -96,6 +96,10 @@ compact 1 17 ---- 1/18 EntryNormal "prop_1_18" +log-level none +---- +ok + # Trigger a round of empty MsgApp "probe" from leader. It will reach node 3 # which will reply with a rejection MsgApp because it sees a gap in the log. # Node 1 will reset the MsgApp flow and send a snapshot to catch node 3 up. @@ -103,10 +107,6 @@ tick-heartbeat 1 ---- ok -log-level none ----- -ok - stabilize ---- ok diff --git a/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt b/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt index 9c96104e3dc6..c1d91f0077f0 100644 --- a/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt +++ b/pkg/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -65,6 +65,7 @@ Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 1->3 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgFortifyLeader Term:1 Log:0/0 # Iterate until no more work is done by the new peer. It receives the heartbeat # and responds. @@ -74,24 +75,34 @@ stabilize 3 1->3 MsgHeartbeat Term:1 Log:0/0 INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1] INFO 3 became follower at term 1 + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 3 handling Ready Ready MustSync=true: - HardState Term:1 Commit:0 Lead:1 LeadEpoch:0 + HardState Term:1 Commit:0 Lead:1 LeadEpoch:1 Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 # The leader in turn will realize that n3 needs a snapshot, which it initiates. stabilize 1 ---- > 1 receiving messages 3->1 MsgHeartbeatResp Term:1 Log:0/0 - DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11 sentCommit=10 matchCommit=0] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 sentCommit=11 matchCommit=0 paused pendingSnap=11] -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->3 MsgSnap Term:1 Log:0/0 - Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 + +tick-heartbeat 1 +---- +DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11 sentCommit=10 matchCommit=0] +DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 sentCommit=11 matchCommit=0 paused pendingSnap=11] + +process-ready 1 +---- +Ready MustSync=false: +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgSnap Term:1 Log:0/0 + Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false status 1 ---- @@ -106,6 +117,7 @@ status 1 stabilize 3 ---- > 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] @@ -114,9 +126,10 @@ stabilize 3 INFO 3 [commit: 11] restored snapshot [index: 11, term: 1] > 3 handling Ready Ready MustSync=true: - HardState Term:1 Commit:11 Lead:1 LeadEpoch:0 + HardState Term:1 Commit:11 Lead:1 LeadEpoch:1 Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 # The MsgAppResp lets the leader move the follower back to replicating state. @@ -124,6 +137,7 @@ stabilize 3 stabilize 1 ---- > 1 receiving messages + 3->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 sentCommit=11 matchCommit=11 paused pendingSnap=11] @@ -138,9 +152,12 @@ stabilize ---- > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 + 1->2 MsgHeartbeat Term:1 Log:0/0 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgHeartbeatResp Term:1 Log:0/0 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgHeartbeatResp Term:1 Log:0/0 diff --git a/pkg/raft/tracker/progress.go b/pkg/raft/tracker/progress.go index 07d9ae9a9cfc..ea926fd2dbab 100644 --- a/pkg/raft/tracker/progress.go +++ b/pkg/raft/tracker/progress.go @@ -109,9 +109,9 @@ type Progress struct { // MsgAppProbesPaused is used when the MsgApp flow to a node is throttled. This // happens in StateProbe, or StateReplicate with saturated Inflights. In both // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppProbesPaused is false (it is reset on - // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused() and ShouldSendMsgApp(). + // progress, but we only do so when MsgAppProbesPaused is false (it is reset + // on the next heartbeat timeout if the follower supports the leader), to not + // overflow the receiver. See IsPaused() and ShouldSendMsgApp(). MsgAppProbesPaused bool // Inflights is a sliding window for the inflight messages. diff --git a/pkg/raft/tracker/supporttracker.go b/pkg/raft/tracker/supporttracker.go index 02b231d7aea4..9f2936004b3c 100644 --- a/pkg/raft/tracker/supporttracker.go +++ b/pkg/raft/tracker/supporttracker.go @@ -81,6 +81,30 @@ func (st *SupportTracker) LeadSupportUntil() hlc.Timestamp { return st.config.Voters.LeadSupportExpiration(supportExpMap) } +// IsLeadSupportedByFollower returns true if the leader is currently supported +// by the specified follower. This means that (1) The leader added that +// follower's support epoch to the support map, and (2) the follower's current +// support epoch matches the epoch in (1). +func (st *SupportTracker) IsLeadSupportedByFollower(id pb.PeerID) bool { + // If the leader didn't add the follower's id to the support map, it means + // that the leader hasn't yet received a MsgFortifyLeaderResp from that + //follower. + supportEpoch, exist := st.support[id] + if !exist { + return false + } + + // If the leader is not supported by the follower, or if the follower's epoch + // is different from what the leader's recorded supportEpoch, it means that + // the follower is not supporting the leader at the current term. + curEpoch, _, ok := st.storeLiveness.SupportFrom(id) + if !ok || curEpoch != supportEpoch { + return false + } + + return true +} + func (st *SupportTracker) String() string { if len(st.support) == 0 { return "empty" diff --git a/pkg/raft/tracker/supporttracker_test.go b/pkg/raft/tracker/supporttracker_test.go index 36f52cea1a33..fcce2854ab25 100644 --- a/pkg/raft/tracker/supporttracker_test.go +++ b/pkg/raft/tracker/supporttracker_test.go @@ -143,6 +143,78 @@ func TestLeadSupportUntil(t *testing.T) { } } +func TestIsLeadSupportedByFollower(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ts := func(ts int64) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: ts, + } + } + + mockLivenessOnePeer := makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + 1: makeMockLivenessEntry(10, ts(10)), + }, + ) + + testCases := []struct { + ids []pb.PeerID + storeLiveness raftstoreliveness.StoreLiveness + setup func(tracker *SupportTracker) + expSupported bool + }{ + { + ids: []pb.PeerID{1}, + storeLiveness: mockLivenessOnePeer, + setup: func(supportTracker *SupportTracker) { + // No support recorded. + }, + expSupported: false, + }, + { + ids: []pb.PeerID{2}, + storeLiveness: mockLivenessOnePeer, + setup: func(supportTracker *SupportTracker) { + // Support recorded for a different follower than the one in the + // storeLiveness. + supportTracker.RecordSupport(2, 10) + }, + expSupported: false, + }, + { + ids: []pb.PeerID{1}, + storeLiveness: mockLivenessOnePeer, + setup: func(supportTracker *SupportTracker) { + // Support recorded for a different epoch. + supportTracker.RecordSupport(1, 15) + }, + expSupported: false, + }, + { + ids: []pb.PeerID{1}, + storeLiveness: mockLivenessOnePeer, + setup: func(supportTracker *SupportTracker) { + // Support recorded for a different epoch. + supportTracker.RecordSupport(1, 10) + }, + expSupported: true, + }, + } + + for _, tc := range testCases { + cfg := quorum.MakeEmptyConfig() + for _, id := range tc.ids { + cfg.Voters[0][id] = struct{}{} + } + supportTracker := MakeSupportTracker(&cfg, tc.storeLiveness) + + tc.setup(&supportTracker) + require.Equal(t, tc.expSupported, supportTracker.IsLeadSupportedByFollower(1)) + } +} + type mockLivenessEntry struct { epoch pb.Epoch ts hlc.Timestamp