Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: consolidate all append message sending #134

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ stale log entries:
rafthttp package.

'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MsgApp' messages. Candidate and follower respond to this message in
Expand Down Expand Up @@ -353,8 +353,8 @@ stale log entries:

'MsgSnap' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MsgProp' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
'bcastAppend' method, which then calls 'maybeSendAppend' method to each
follower. In 'maybeSendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MsgSnap' type message.

'MsgSnapStatus' tells the result of snapshot install message. When a
Expand All @@ -376,7 +376,7 @@ stale log entries:
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
is passed to leader's Step method, the leader knows which follower
responded. And only when the leader's last committed index is greater than
follower's Match index, the leader runs 'sendAppend` method.
follower's Match index, the leader runs 'maybeSendAppend` method.

'MsgUnreachable' tells that request(message) wasn't delivered. When
'MsgUnreachable' is passed to leader's Step method, the leader discovers
Expand Down
134 changes: 47 additions & 87 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,24 +588,24 @@ func (r *raft) send(m pb.Message) {
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
// maybeSendAppend sends an append RPC with log entries (if any) that are not
// yet known to be replicated in the given peer's log, as well as the current
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
// log has been compacted) it can send a MsgSnap.
//
// In some cases, the MsgApp message can have zero entries, and yet being sent.
// When the follower log is not fully up-to-date, we must send a MsgApp
// periodically so that eventually the flow is either accepted or rejected. Not
// doing so can result in replication stall, in cases when a MsgApp is dropped.
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// Returns true if a message was sent, or false otherwise. A message is not sent
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {

last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
return false
}

Expand All @@ -617,35 +617,25 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return r.maybeSendSnapshot(to, pr)
}

var ents []pb.Entry
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty {
return false
}
// TODO(pav-kv): move this check up to where err is returned.
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
var entries []pb.Entry
if pr.CanSendEntries(last) {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr)
}
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
// Send the MsgApp, and update the progress accordingly.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: prevIndex,
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
Entries: entries,
Commit: commit,
})
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
pr.SentEntries(len(entries), uint64(payloadsSize(entries)))
pr.SentCommit(commit)
return true
}

Expand Down Expand Up @@ -704,7 +694,7 @@ func (r *raft) bcastAppend() {
if id == r.id {
return
}
r.sendAppend(id)
r.maybeSendAppend(id)
})
}

Expand Down Expand Up @@ -1482,7 +1472,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.maybeSendAppend(m.From)
}
} else {
// We want to update our tracking if the response updates our
Expand Down Expand Up @@ -1521,21 +1511,13 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
// We've updated flow control information above, which may allow us to
// send multiple (size-limited) in-flight messages at once (such as when
// transitioning from probe to replicate, or when freeTo() covers
// multiple messages). Send as many messages as we can.
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
for r.maybeSendAppend(m.From) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1547,24 +1529,8 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false

// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
// messages that filled up Inflights in the first place were dropped. Note
// also that the outgoing heartbeat already communicated the commit index.
//
// If the follower is fully caught up but also in StateProbe (as can happen
// if ReportUnreachable was called), we also want to send an append (it will
// be empty) to allow the follower to transition back to StateReplicate once
// it responds.
//
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(m.From)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1597,7 +1563,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.MsgAppFlowPaused = true
pr.PauseMsgAppProbes(true)
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down Expand Up @@ -1634,7 +1600,8 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(leadTransferee)
}
}
return nil
Expand Down Expand Up @@ -1982,21 +1949,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
r.maybeCommit()
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to let
// them wait out a heartbeat interval (or the next incoming proposal).
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
Expand Down
8 changes: 4 additions & 4 deletions raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.trk.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand All @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.trk.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand Down
28 changes: 14 additions & 14 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

r.trk.Progress[2].MsgAppFlowPaused = true
r.trk.Progress[2].PauseMsgAppProbes(true)

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)

r.trk.Progress[2].BecomeReplicate()
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
r.trk.Progress[2].MsgAppFlowPaused = true
assert.False(t, r.trk.Progress[2].MsgAppProbesPaused)
r.trk.Progress[2].PauseMsgAppProbes(true)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
}

func TestProgressPaused(t *testing.T) {
Expand Down Expand Up @@ -2299,24 +2299,24 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// loop. After that, the follower is paused until a heartbeat response is
// received.
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
msg := r.readMessages()
assert.Len(t, msg, 1)
assert.Zero(t, msg[0].Index)
}

assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
assert.Empty(t, r.readMessages())
}

// do a heartbeat
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)

// consume the heartbeat
msg := r.readMessages()
Expand All @@ -2329,7 +2329,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
msg := r.readMessages()
assert.Len(t, msg, 1)
assert.Zero(t, msg[0].Index)
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
}

func TestSendAppendForProgressReplicate(t *testing.T) {
Expand All @@ -2341,7 +2341,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
msgs := r.readMessages()
assert.Len(t, msgs, 1, "#%d", i)
}
Expand All @@ -2356,7 +2356,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
msgs := r.readMessages()
assert.Empty(t, msgs, "#%d", i)
}
Expand Down Expand Up @@ -3876,10 +3876,10 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {

// r1 sends 2 MsgApp messages to r2.
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.sendAppend(2)
r1.maybeSendAppend(2)
req1 := expectOneMessage(t, r1)
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.sendAppend(2)
r1.maybeSendAppend(2)
req2 := expectOneMessage(t, r1)

// r2 receives the second MsgApp first due to reordering.
Expand Down
3 changes: 3 additions & 0 deletions testdata/replicate_pause.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ deliver-msgs drop=3
dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"]
dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"]
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"]
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14


# Repeat committing 3 entries.
Expand Down
2 changes: 2 additions & 0 deletions testdata/slow_follower_after_compaction.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ deliver-msgs drop=3
----
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"]
dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"]
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16

# Truncate the leader's log beyond node 3 log size.
compact 1 17
Expand Down
2 changes: 2 additions & 0 deletions tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Inflights struct {
count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes

// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
// control, we need to support dynamic limits.
size int // the max number of inflight messages
maxBytes uint64 // the max total byte size of inflight messages

Expand Down
Loading
Loading