From 7d4fc7faa485d4bc202d4538841b0609617262b9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 24 Jan 2024 08:52:28 +0000 Subject: [PATCH] rawnode: expose per-follower MsgApp message stream Signed-off-by: Pavel Kalinnikov --- raft.go | 79 +++++++++++++++++++++++++++++++++++++++++++----------- rawnode.go | 23 ++++++++++++++++ 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/raft.go b/raft.go index 6d9178fb..4d3f5d03 100644 --- a/raft.go +++ b/raft.go @@ -218,6 +218,21 @@ type Config struct { // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops // to 2.5 MB/s. See Little's law to understand the maths behind. MaxInflightBytes uint64 + // DisableEagerAppends makes raft hold off constructing log append messages in + // response to Step() calls. The messages can be collected via a separate + // MessagesTo method. + // + // This way, the application has better control when raft may call Storage + // methods and allocate memory for entries and messages. + // + // Setting this to true also improves batching: messages are constructed on + // demand, and tend to contain more entries. The application can control the + // latency/throughput trade-off by collecting messages more or less + // frequently. + // + // With this setting set to false, messages are constructed eagerly in Step() + // calls, and typically will consist of a single / few entries. + DisableEagerAppends bool // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -335,6 +350,12 @@ func (c *Config) validate() error { return nil } +type msgBuf []pb.Message + +func (mb *msgBuf) append(m pb.Message) { + *(*[]pb.Message)(mb) = append(*(*[]pb.Message)(mb), m) +} + type raft struct { id uint64 @@ -360,7 +381,7 @@ type raft struct { // other nodes. // // Messages in this list must target other nodes. - msgs []pb.Message + msgs msgBuf // msgsAfterAppend contains the list of messages that should be sent after // the accumulated unstable state (e.g. term, vote, []entry, and snapshot) // has been persisted to durable storage. This includes waiting for any @@ -372,6 +393,10 @@ type raft struct { // Messages in this list have the type MsgAppResp, MsgVoteResp, or // MsgPreVoteResp. See the comment in raft.send for details. msgsAfterAppend []pb.Message + // disableEagerAppends instructs append message construction and sending until + // the Ready() call. This improves batching and allows better resource + // allocation control by the application. + disableEagerAppends bool // the leader id lead uint64 @@ -447,6 +472,7 @@ func newRaft(c *Config) *raft { maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), + disableEagerAppends: c.DisableEagerAppends, electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -502,6 +528,11 @@ func (r *raft) hardState() pb.HardState { // send schedules persisting state to a stable storage and AFTER that // sending the message (as part of next Ready message processing). func (r *raft) send(m pb.Message) { + r.sendTo(&r.msgs, m) +} + +// sendTo prepares the given message, and puts it to the output messages buffer. +func (r *raft) sendTo(buf *msgBuf, m pb.Message) { if m.From == None { m.From = r.id } @@ -584,8 +615,19 @@ func (r *raft) send(m pb.Message) { if m.To == r.id { r.logger.Panicf("message should not be self-addressed when sending %s", m.Type) } - r.msgs = append(r.msgs, m) + buf.append(m) + } +} + +func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb.Message { + if to == r.id { + // TODO(pav-kv): async log storage writes should go through this path. + return buffer } + pr := r.trk.Progress[to] + buf := msgBuf(buffer) + r.sendAppendBuf(to, pr, &buf) + return buf } // sendAppend sends an append RPC with new entries to the given peer, if @@ -595,8 +637,15 @@ func (r *raft) send(m pb.Message) { // this follower is throttled, or there are no new entries but the commit index // for the follower can be bumped. func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool { + if r.disableEagerAppends { + return false + } + return r.sendAppendBuf(to, pr, &r.msgs) +} + +func (r *raft) sendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool { if pr.State == tracker.StateProbe { - return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr) + return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr, buf) } else if pr.State != tracker.StateReplicate { return false } // only StateReplicate below @@ -604,14 +653,14 @@ func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool { // If there are any pending entries and the inflight tracking is not // saturated, send a regular append message (or snapshot). if pr.Next <= r.raftLog.lastIndex() && !pr.Inflights.Full() { - return r.maybeSendAppend(to, pr) + return r.maybeSendAppend(to, pr, buf) } // NB: the commit index is periodically sent in the heartbeat messages, so // technically we don't need the CanBumpCommit clause here to guarantee commit // index convergence on the follower. However, sending it via MsgApp here // allows faster (no heartbeat interval delay) convergence in some cases. if pr.CanBumpCommit(r.raftLog.committed) { - return r.maybeSendEmptyAppend(to, pr) + return r.maybeSendEmptyAppend(to, pr, buf) } // In a throttled StateReplicate, send an empty append message if we haven't // done so recently. @@ -620,13 +669,13 @@ func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool { // accepts or rejects it. If we don't do so, replication can stall if all the // in-flight appends are lost/dropped. return !pr.MsgAppFlowPaused && pr.Match < r.raftLog.lastIndex() && - r.maybeSendEmptyAppend(to, pr) + r.maybeSendEmptyAppend(to, pr, buf) } // maybeSendAppend sends a non-empty append message to the given follower. It // may send a snapshot instead if the required section of the log is no longer // available in this leader's log. Returns true if a message was sent. -func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { +func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress, buf *msgBuf) bool { // TODO(pav-kv): when pr.Next is updated, we always know the term of entry // pr.Next-1, because the previous append message contains it. We should store // (Next-1, Term) in Progress, instead of just Next. Then we don't have to @@ -636,13 +685,13 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { if err != nil { // The log probably got truncated at >= pr.Next, so we can't catch up the // follower log anymore. Send a snapshot instead. - return r.maybeSendSnapshot(to, pr) + return r.maybeSendSnapshot(to, pr, buf) } ents, err := r.raftLog.entries(pr.Next, r.maxMsgSize) if err != nil { // send a snapshot if we failed to get the entries - return r.maybeSendSnapshot(to, pr) + return r.maybeSendSnapshot(to, pr, buf) } - r.send(pb.Message{ + r.sendTo(buf, pb.Message{ To: to, Type: pb.MsgApp, Index: prevIndex, @@ -655,7 +704,7 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { return true } -func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool { +func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress, buf *msgBuf) bool { // TODO(pav-kv): when pr.Next is updated, we always know the term of entry // pr.Next-1, because the append message contains it. Store (Next-1, Term) in // Progress, instead of just Next. Then we don't have to fetch the term and @@ -664,9 +713,9 @@ func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool { if err != nil { // The log probably got truncated at >= pr.Next, so we can't catch up the // follower log anymore. Send a snapshot instead. - return r.maybeSendSnapshot(to, pr) + return r.maybeSendSnapshot(to, pr, buf) } - r.send(pb.Message{ + r.sendTo(buf, pb.Message{ To: to, Type: pb.MsgApp, Index: pr.Next - 1, @@ -680,7 +729,7 @@ func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool { // maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given // node. Returns true iff the snapshot message has been emitted successfully. -func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { +func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress, buf *msgBuf) bool { if !pr.RecentActive { r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) return false @@ -703,7 +752,7 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) + r.sendTo(buf, pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) return true } diff --git a/rawnode.go b/rawnode.go index 428ef519..e7a22f96 100644 --- a/rawnode.go +++ b/rawnode.go @@ -136,6 +136,29 @@ func (rn *RawNode) Ready() Ready { return rd } +// FlowControl tunes the volume and types of messages that GetMessages call can +// return to the application. +type FlowControl struct { + // MaxMsgAppBytes limits the number of byte in append messages. Ignored if + // zero. + MaxMsgAppBytes uint64 + + // TODO(pav-kv): specify limits for local storage append messages. + // TODO(pav-kv): control the snapshots. +} + +// MessagesTo returns outstanding messages to a particular node. It appends the +// messages to the given slice, and returns the resulting slice. +// +// At the moment, MessagesTo only returns MsgApp or MsgSnap messages, and only +// if Config.DisableEagerAppends is true. All other messages are communicated +// via Ready calls. +// +// WARNING: this is an experimental API, use it with caution. +func (rn *RawNode) MessagesTo(id uint64, fc FlowControl, buffer []pb.Message) []pb.Message { + return rn.raft.getMessages(id, fc, buffer) +} + // readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there // is no obligation that the Ready must be handled. func (rn *RawNode) readyWithoutAccept() Ready {