Skip to content

Commit

Permalink
demo: the effect of batched MsgApp ready
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 23, 2024
1 parent 76264df commit 419efa1
Show file tree
Hide file tree
Showing 20 changed files with 227 additions and 198 deletions.
7 changes: 6 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,8 @@ func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb.
}
pr := r.trk.Progress[to]
buf := msgBuf(buffer)
r.maybeSendAppendBuf(to, pr, &buf)
for r.maybeSendAppendBuf(to, pr, &buf) {
}
return buf
}

Expand All @@ -650,6 +651,10 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
return r.maybeSendAppendBuf(to, pr, &r.msgs)
}

func (r *raft) appendsReady(pr *tracker.Progress) bool {
return pr.ShouldSendMsgApp(r.raftLog.lastIndex(), r.raftLog.committed)
}

// maybeSendAppendBuf implements maybeSendAppend, and puts the messages into the
// provided buffer.
func (r *raft) maybeSendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
Expand Down
1 change: 1 addition & 0 deletions rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
}
cfg.Logger = env.Output

cfg.DisableEagerAppends = true
rn, err := raft.NewRawNode(&cfg)
if err != nil {
return err
Expand Down
21 changes: 20 additions & 1 deletion rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package raft

import (
"errors"

Check failure on line 18 in rawnode.go

View workflow job for this annotation

GitHub Actions / run

File is not `goimports`-ed with -local go.etcd.io (goimports)

pb "go.etcd.io/raft/v3/raftpb"
"go.etcd.io/raft/v3/tracker"
)
Expand Down Expand Up @@ -208,6 +207,15 @@ func (rn *RawNode) readyWithoutAccept() Ready {
}
}

if r.disableEagerAppends && r.state == StateLeader {
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
rd.Messages = r.getMessages(id, FlowControl{}, rd.Messages)
})
}

return rd
}

Expand Down Expand Up @@ -486,6 +494,17 @@ func (rn *RawNode) HasReady() bool {
if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 {
return true
}
if rn.raft.state == StateLeader {
ready := false
rn.raft.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id != rn.raft.id && !ready {
ready = rn.raft.appendsReady(pr)
}
})
if ready {
return true
}
}
if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) {
return true
}
Expand Down
52 changes: 16 additions & 36 deletions testdata/async_storage_writes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ stabilize
Entries:
1/11 EntryNormal ""
Messages:
1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
1->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[
1->1 MsgAppResp Term:1 Log:0/11
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/11
]
1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
> 3 receiving messages
Expand Down Expand Up @@ -151,12 +151,12 @@ stabilize
CommittedEntries:
1/11 EntryNormal ""
Messages:
1->2 MsgApp Term:1 Log:1/11 Commit:11
1->3 MsgApp Term:1 Log:1/11 Commit:11
1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:11 Vote:1
1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""]
]
1->2 MsgApp Term:1 Log:1/11 Commit:11
1->3 MsgApp Term:1 Log:1/11 Commit:11
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/11 Commit:11
> 3 receiving messages
Expand Down Expand Up @@ -235,12 +235,12 @@ process-ready 1 2 3
Entries:
1/12 EntryNormal "prop_1"
Messages:
1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"]
1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"]
1->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[
1->1 MsgAppResp Term:1 Log:0/12
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/12
]
1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"]
1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"]
> 2 handling Ready
<empty Ready>
> 3 handling Ready
Expand Down Expand Up @@ -285,12 +285,12 @@ process-ready 1 2 3
Entries:
1/13 EntryNormal "prop_2"
Messages:
1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"]
1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"]
1->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[
1->1 MsgAppResp Term:1 Log:0/13
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/13
]
1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"]
1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"]
> 2 handling Ready
<empty Ready>
> 3 handling Ready
Expand Down Expand Up @@ -368,27 +368,23 @@ process-ready 1 2 3
CommittedEntries:
1/12 EntryNormal "prop_1"
Messages:
1->2 MsgApp Term:1 Log:1/13 Commit:12
1->3 MsgApp Term:1 Log:1/13 Commit:12
1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"]
1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"]
1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[
1->1 MsgAppResp Term:1 Log:0/14
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14
]
1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"]
]
1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"]
1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"]
> 2 handling Ready
<empty Ready>
> 3 handling Ready
<empty Ready>

deliver-msgs 1 2 3
----
1->2 MsgApp Term:1 Log:1/13 Commit:12
1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"]
1->3 MsgApp Term:1 Log:1/13 Commit:12
1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"]

process-ready 1 2 3
Expand All @@ -404,7 +400,6 @@ process-ready 1 2 3
1/12 EntryNormal "prop_1"
Messages:
2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[
2->1 MsgAppResp Term:1 Log:0/13
2->1 MsgAppResp Term:1 Log:0/14
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14
]
Expand All @@ -420,7 +415,6 @@ process-ready 1 2 3
1/12 EntryNormal "prop_1"
Messages:
3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[
3->1 MsgAppResp Term:1 Log:0/13
3->1 MsgAppResp Term:1 Log:0/14
AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14
]
Expand Down Expand Up @@ -472,27 +466,23 @@ process-ready 1 2 3
CommittedEntries:
1/13 EntryNormal "prop_2"
Messages:
1->2 MsgApp Term:1 Log:1/14 Commit:13
1->3 MsgApp Term:1 Log:1/14 Commit:13
1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"]
1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"]
1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[
1->1 MsgAppResp Term:1 Log:0/15
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15
]
1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"]
]
1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"]
1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"]
> 2 handling Ready
<empty Ready>
> 3 handling Ready
<empty Ready>

deliver-msgs 1 2 3
----
1->2 MsgApp Term:1 Log:1/14 Commit:13
1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"]
1->3 MsgApp Term:1 Log:1/14 Commit:13
1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"]

process-ready 1 2 3
Expand All @@ -508,7 +498,6 @@ process-ready 1 2 3
1/13 EntryNormal "prop_2"
Messages:
2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[
2->1 MsgAppResp Term:1 Log:0/14
2->1 MsgAppResp Term:1 Log:0/15
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15
]
Expand All @@ -524,7 +513,6 @@ process-ready 1 2 3
1/13 EntryNormal "prop_2"
Messages:
3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[
3->1 MsgAppResp Term:1 Log:0/14
3->1 MsgAppResp Term:1 Log:0/15
AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15
]
Expand All @@ -544,14 +532,12 @@ process-append-thread 1 2 3
Processing:
2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"]
Responses:
2->1 MsgAppResp Term:1 Log:0/13
2->1 MsgAppResp Term:1 Log:0/14
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14
> 3 processing append thread
Processing:
3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"]
Responses:
3->1 MsgAppResp Term:1 Log:0/13
3->1 MsgAppResp Term:1 Log:0/14
AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14

Expand All @@ -577,9 +563,7 @@ deliver-msgs 1 2 3
----
1->1 MsgAppResp Term:1 Log:0/14
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14
2->1 MsgAppResp Term:1 Log:0/13
2->1 MsgAppResp Term:1 Log:0/14
3->1 MsgAppResp Term:1 Log:0/13
3->1 MsgAppResp Term:1 Log:0/14
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"]
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14
Expand All @@ -595,14 +579,14 @@ process-ready 1 2 3
CommittedEntries:
1/14 EntryNormal "prop_3"
Messages:
1->2 MsgApp Term:1 Log:1/15 Commit:14
1->3 MsgApp Term:1 Log:1/15 Commit:14
1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15
]
1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]
]
1->2 MsgApp Term:1 Log:1/15 Commit:14
1->3 MsgApp Term:1 Log:1/15 Commit:14
> 2 handling Ready
<empty Ready>
> 3 handling Ready
Expand Down Expand Up @@ -656,14 +640,12 @@ process-append-thread 1 2 3
Processing:
2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"]
Responses:
2->1 MsgAppResp Term:1 Log:0/14
2->1 MsgAppResp Term:1 Log:0/15
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15
> 3 processing append thread
Processing:
3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"]
Responses:
3->1 MsgAppResp Term:1 Log:0/14
3->1 MsgAppResp Term:1 Log:0/15
AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15

Expand All @@ -689,9 +671,7 @@ deliver-msgs 1 2 3
----
1->1 MsgAppResp Term:1 Log:0/15
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15
2->1 MsgAppResp Term:1 Log:0/14
2->1 MsgAppResp Term:1 Log:0/15
3->1 MsgAppResp Term:1 Log:0/14
3->1 MsgAppResp Term:1 Log:0/15
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"]
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15
Expand All @@ -707,12 +687,12 @@ process-ready 1 2 3
CommittedEntries:
1/15 EntryNormal "prop_4"
Messages:
1->2 MsgApp Term:1 Log:1/15 Commit:15
1->3 MsgApp Term:1 Log:1/15 Commit:15
1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1
1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[
ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]
]
1->2 MsgApp Term:1 Log:1/15 Commit:15
1->3 MsgApp Term:1 Log:1/15 Commit:15
> 2 handling Ready
<empty Ready>
> 3 handling Ready
Expand Down
24 changes: 12 additions & 12 deletions testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ Ready MustSync=true:
Entries:
1/12 EntryNormal "init_prop"
Messages:
2->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/12 EntryNormal "init_prop"] Responses:[
2->2 MsgAppResp Term:1 Log:0/12
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12
]
2->1 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"]
2->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"]
2->4 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"]
2->5 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"]
2->6 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"]
2->7 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"]
2->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/12 EntryNormal "init_prop"] Responses:[
2->2 MsgAppResp Term:1 Log:0/12
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12
]

deliver-msgs 1 drop=(3,4,5,6,7)
----
Expand Down Expand Up @@ -191,16 +191,16 @@ Lead:3 State:StateLeader
Entries:
2/12 EntryNormal ""
Messages:
3->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[2/12 EntryNormal ""] Responses:[
3->3 MsgAppResp Term:2 Log:0/12
AppendThread->3 MsgStorageAppendResp Term:2 Log:2/12
]
3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->4 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->5 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->6 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[2/12 EntryNormal ""] Responses:[
3->3 MsgAppResp Term:2 Log:0/12
AppendThread->3 MsgStorageAppendResp Term:2 Log:2/12
]

deliver-msgs 1 drop=(2,4,5,6,7)
----
Expand Down Expand Up @@ -348,16 +348,16 @@ Lead:4 State:StateLeader
Entries:
3/12 EntryNormal ""
Messages:
4->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[3/12 EntryNormal ""] Responses:[
4->4 MsgAppResp Term:3 Log:0/12
AppendThread->4 MsgStorageAppendResp Term:3 Log:3/12
]
4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->2 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->5 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->6 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[3/12 EntryNormal ""] Responses:[
4->4 MsgAppResp Term:3 Log:0/12
AppendThread->4 MsgStorageAppendResp Term:3 Log:3/12
]

# Step 7: before the new entries reach node 1, it hears of the term change
# through a heartbeat and persists the new term. Node 1 then receives these
Expand Down
4 changes: 2 additions & 2 deletions testdata/confchange_disable_validation.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ stabilize
CommittedEntries:
1/6 EntryConfChangeV2
Messages:
1->2 MsgApp Term:1 Log:1/5 Commit:5 Entries:[1/6 EntryConfChangeV2]
1->3 MsgApp Term:1 Log:1/5 Commit:5 Entries:[1/6 EntryConfChangeV2]
1->2 MsgApp Term:1 Log:1/5 Commit:6 Entries:[1/6 EntryConfChangeV2]
1->3 MsgApp Term:1 Log:1/5 Commit:6 Entries:[1/6 EntryConfChangeV2]
INFO 1 switched to configuration voters=(1) learners=(2 3)
2 changes: 1 addition & 1 deletion testdata/confchange_v1_add_single.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ stabilize
2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0)
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
> 1 handling Ready
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgSnap Term:1 Log:0/0
Expand Down
Loading

0 comments on commit 419efa1

Please sign in to comment.