Skip to content

Commit

Permalink
rawnode: expose per-follower MsgApp message stream
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 8, 2024
1 parent b4da150 commit 7d4fc7f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
79 changes: 64 additions & 15 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,21 @@ type Config struct {
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// DisableEagerAppends makes raft hold off constructing log append messages in
// response to Step() calls. The messages can be collected via a separate
// MessagesTo method.
//
// This way, the application has better control when raft may call Storage
// methods and allocate memory for entries and messages.
//
// Setting this to true also improves batching: messages are constructed on
// demand, and tend to contain more entries. The application can control the
// latency/throughput trade-off by collecting messages more or less
// frequently.
//
// With this setting set to false, messages are constructed eagerly in Step()
// calls, and typically will consist of a single / few entries.
DisableEagerAppends bool

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -335,6 +350,12 @@ func (c *Config) validate() error {
return nil
}

type msgBuf []pb.Message

func (mb *msgBuf) append(m pb.Message) {
*(*[]pb.Message)(mb) = append(*(*[]pb.Message)(mb), m)
}

type raft struct {
id uint64

Expand All @@ -360,7 +381,7 @@ type raft struct {
// other nodes.
//
// Messages in this list must target other nodes.
msgs []pb.Message
msgs msgBuf
// msgsAfterAppend contains the list of messages that should be sent after
// the accumulated unstable state (e.g. term, vote, []entry, and snapshot)
// has been persisted to durable storage. This includes waiting for any
Expand All @@ -372,6 +393,10 @@ type raft struct {
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message
// disableEagerAppends instructs append message construction and sending until
// the Ready() call. This improves batching and allows better resource
// allocation control by the application.
disableEagerAppends bool

// the leader id
lead uint64
Expand Down Expand Up @@ -447,6 +472,7 @@ func newRaft(c *Config) *raft {
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
disableEagerAppends: c.DisableEagerAppends,
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -502,6 +528,11 @@ func (r *raft) hardState() pb.HardState {
// send schedules persisting state to a stable storage and AFTER that
// sending the message (as part of next Ready message processing).
func (r *raft) send(m pb.Message) {
r.sendTo(&r.msgs, m)
}

// sendTo prepares the given message, and puts it to the output messages buffer.
func (r *raft) sendTo(buf *msgBuf, m pb.Message) {
if m.From == None {
m.From = r.id
}
Expand Down Expand Up @@ -584,8 +615,19 @@ func (r *raft) send(m pb.Message) {
if m.To == r.id {
r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
}
r.msgs = append(r.msgs, m)
buf.append(m)
}
}

func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb.Message {

Check warning on line 622 in raft.go

View workflow job for this annotation

GitHub Actions / run

unused-parameter: parameter 'fc' seems to be unused, consider removing or renaming it as _ (revive)
if to == r.id {
// TODO(pav-kv): async log storage writes should go through this path.
return buffer
}
pr := r.trk.Progress[to]
buf := msgBuf(buffer)
r.sendAppendBuf(to, pr, &buf)
return buf
}

// sendAppend sends an append RPC with new entries to the given peer, if
Expand All @@ -595,23 +637,30 @@ func (r *raft) send(m pb.Message) {
// this follower is throttled, or there are no new entries but the commit index
// for the follower can be bumped.
func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool {
if r.disableEagerAppends {
return false
}
return r.sendAppendBuf(to, pr, &r.msgs)
}

func (r *raft) sendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
if pr.State == tracker.StateProbe {
return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr)
return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr, buf)
} else if pr.State != tracker.StateReplicate {
return false
} // only StateReplicate below

// If there are any pending entries and the inflight tracking is not
// saturated, send a regular append message (or snapshot).
if pr.Next <= r.raftLog.lastIndex() && !pr.Inflights.Full() {
return r.maybeSendAppend(to, pr)
return r.maybeSendAppend(to, pr, buf)
}
// NB: the commit index is periodically sent in the heartbeat messages, so
// technically we don't need the CanBumpCommit clause here to guarantee commit
// index convergence on the follower. However, sending it via MsgApp here
// allows faster (no heartbeat interval delay) convergence in some cases.
if pr.CanBumpCommit(r.raftLog.committed) {
return r.maybeSendEmptyAppend(to, pr)
return r.maybeSendEmptyAppend(to, pr, buf)
}
// In a throttled StateReplicate, send an empty append message if we haven't
// done so recently.
Expand All @@ -620,13 +669,13 @@ func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool {
// accepts or rejects it. If we don't do so, replication can stall if all the
// in-flight appends are lost/dropped.
return !pr.MsgAppFlowPaused && pr.Match < r.raftLog.lastIndex() &&
r.maybeSendEmptyAppend(to, pr)
r.maybeSendEmptyAppend(to, pr, buf)
}

// maybeSendAppend sends a non-empty append message to the given follower. It
// may send a snapshot instead if the required section of the log is no longer
// available in this leader's log. Returns true if a message was sent.
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
// TODO(pav-kv): when pr.Next is updated, we always know the term of entry
// pr.Next-1, because the previous append message contains it. We should store
// (Next-1, Term) in Progress, instead of just Next. Then we don't have to
Expand All @@ -636,13 +685,13 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
return r.maybeSendSnapshot(to, pr, buf)
}
ents, err := r.raftLog.entries(pr.Next, r.maxMsgSize)
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
return r.maybeSendSnapshot(to, pr, buf)
}
r.send(pb.Message{
r.sendTo(buf, pb.Message{
To: to,
Type: pb.MsgApp,
Index: prevIndex,
Expand All @@ -655,7 +704,7 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
return true
}

func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool {
func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
// TODO(pav-kv): when pr.Next is updated, we always know the term of entry
// pr.Next-1, because the append message contains it. Store (Next-1, Term) in
// Progress, instead of just Next. Then we don't have to fetch the term and
Expand All @@ -664,9 +713,9 @@ func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool {
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
return r.maybeSendSnapshot(to, pr, buf)
}
r.send(pb.Message{
r.sendTo(buf, pb.Message{
To: to,
Type: pb.MsgApp,
Index: pr.Next - 1,
Expand All @@ -680,7 +729,7 @@ func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool {

// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
// node. Returns true iff the snapshot message has been emitted successfully.
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
Expand All @@ -703,7 +752,7 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
r.sendTo(buf, pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

Expand Down
23 changes: 23 additions & 0 deletions rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ func (rn *RawNode) Ready() Ready {
return rd
}

// FlowControl tunes the volume and types of messages that GetMessages call can
// return to the application.
type FlowControl struct {
// MaxMsgAppBytes limits the number of byte in append messages. Ignored if
// zero.
MaxMsgAppBytes uint64

// TODO(pav-kv): specify limits for local storage append messages.
// TODO(pav-kv): control the snapshots.
}

// MessagesTo returns outstanding messages to a particular node. It appends the
// messages to the given slice, and returns the resulting slice.
//
// At the moment, MessagesTo only returns MsgApp or MsgSnap messages, and only
// if Config.DisableEagerAppends is true. All other messages are communicated
// via Ready calls.
//
// WARNING: this is an experimental API, use it with caution.
func (rn *RawNode) MessagesTo(id uint64, fc FlowControl, buffer []pb.Message) []pb.Message {
return rn.raft.getMessages(id, fc, buffer)
}

// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
Expand Down

0 comments on commit 7d4fc7f

Please sign in to comment.