Skip to content

Commit

Permalink
rawnode: expose per-follower MsgApp message stream
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 23, 2024
1 parent 017bdda commit 76264df
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 7 deletions.
65 changes: 58 additions & 7 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,21 @@ type Config struct {
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// DisableEagerAppends makes raft hold off constructing log append messages in
// response to Step() calls. The messages can be collected via a separate
// MessagesTo method.
//
// This way, the application has better control when raft may call Storage
// methods and allocate memory for entries and messages.
//
// Setting this to true also improves batching: messages are constructed on
// demand, and tend to contain more entries. The application can control the
// latency/throughput trade-off by collecting messages more or less
// frequently.
//
// With this setting set to false, messages are constructed eagerly in Step()
// calls, and typically will consist of a single / few entries.
DisableEagerAppends bool

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -335,6 +350,12 @@ func (c *Config) validate() error {
return nil
}

type msgBuf []pb.Message

func (mb *msgBuf) append(m pb.Message) {
*(*[]pb.Message)(mb) = append(*(*[]pb.Message)(mb), m)
}

type raft struct {
id uint64

Expand All @@ -360,7 +381,7 @@ type raft struct {
// other nodes.
//
// Messages in this list must target other nodes.
msgs []pb.Message
msgs msgBuf
// msgsAfterAppend contains the list of messages that should be sent after
// the accumulated unstable state (e.g. term, vote, []entry, and snapshot)
// has been persisted to durable storage. This includes waiting for any
Expand All @@ -372,6 +393,10 @@ type raft struct {
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message
// disableEagerAppends instructs append message construction and sending until
// the Ready() call. This improves batching and allows better resource
// allocation control by the application.
disableEagerAppends bool

// the leader id
lead uint64
Expand Down Expand Up @@ -447,6 +472,7 @@ func newRaft(c *Config) *raft {
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
disableEagerAppends: c.DisableEagerAppends,
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -502,6 +528,11 @@ func (r *raft) hardState() pb.HardState {
// send schedules persisting state to a stable storage and AFTER that
// sending the message (as part of next Ready message processing).
func (r *raft) send(m pb.Message) {
r.sendTo(&r.msgs, m)
}

// sendTo prepares the given message, and puts it to the output messages buffer.
func (r *raft) sendTo(buf *msgBuf, m pb.Message) {
if m.From == None {
m.From = r.id
}
Expand Down Expand Up @@ -584,10 +615,21 @@ func (r *raft) send(m pb.Message) {
if m.To == r.id {
r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
}
r.msgs = append(r.msgs, m)
buf.append(m)
}
}

func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb.Message {
if to == r.id {
// TODO(pav-kv): async log storage writes should go through this path.
return buffer
}
pr := r.trk.Progress[to]
buf := msgBuf(buffer)
r.maybeSendAppendBuf(to, pr, &buf)
return buf
}

// maybeSendAppend sends an append RPC with log entries (if any) that are not
// yet known to be replicated in the given peer's log, as well as the current
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
Expand All @@ -602,6 +644,15 @@ func (r *raft) send(m pb.Message) {
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
if r.disableEagerAppends {
return false
}
return r.maybeSendAppendBuf(to, pr, &r.msgs)
}

// maybeSendAppendBuf implements maybeSendAppend, and puts the messages into the
// provided buffer.
func (r *raft) maybeSendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
return false
Expand All @@ -612,19 +663,19 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
return r.maybeSendSnapshot(to, pr, buf)
}

var entries []pb.Entry
if pr.CanSendEntries(last) {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr)
return r.maybeSendSnapshot(to, pr, buf)
}
}

// Send the MsgApp, and update the progress accordingly.
r.send(pb.Message{
r.sendTo(buf, pb.Message{
To: to,
Type: pb.MsgApp,
Index: prevIndex,
Expand All @@ -639,7 +690,7 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {

// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
// node. Returns true iff the snapshot message has been emitted successfully.
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
Expand All @@ -662,7 +713,7 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
r.sendTo(buf, pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

Expand Down
23 changes: 23 additions & 0 deletions rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ func (rn *RawNode) Ready() Ready {
return rd
}

// FlowControl tunes the volume and types of messages that GetMessages call can
// return to the application.
type FlowControl struct {
// MaxMsgAppBytes limits the number of byte in append messages. Ignored if
// zero.
MaxMsgAppBytes uint64

// TODO(pav-kv): specify limits for local storage append messages.
// TODO(pav-kv): control the snapshots.
}

// MessagesTo returns outstanding messages to a particular node. It appends the
// messages to the given slice, and returns the resulting slice.
//
// At the moment, MessagesTo only returns MsgApp or MsgSnap messages, and only
// if Config.DisableEagerAppends is true. All other messages are communicated
// via Ready calls.
//
// WARNING: this is an experimental API, use it with caution.
func (rn *RawNode) MessagesTo(id uint64, fc FlowControl, buffer []pb.Message) []pb.Message {
return rn.raft.getMessages(id, fc, buffer)
}

// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
Expand Down

0 comments on commit 76264df

Please sign in to comment.