Skip to content

Commit f669be7

Browse files
authored
Re-broadcast dropped finalize votes (#279)
* Re-broadcast dropped finalize votes This commit makes the Simplex instance be able to discover when it has a round that isn't finalized and remains not finalized. If that happens, Simplex will trigger re-broadcasting of finalize votes. Signed-off-by: Yacov Manevich <[email protected]> * address nits Signed-off-by: Yacov Manevich <[email protected]> --------- Signed-off-by: Yacov Manevich <[email protected]>
1 parent e4f2748 commit f669be7

File tree

6 files changed

+373
-40
lines changed

6 files changed

+373
-40
lines changed

epoch.go

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ const (
2828
DefaultMaxRoundWindow = 10
2929
DefaultMaxPendingBlocks = 20
3030

31-
DefaultMaxProposalWaitTime = 5 * time.Second
32-
DefaultReplicationRequestTimeout = 5 * time.Second
33-
DefaultEmptyVoteRebroadcastTimeout = 5 * time.Second
34-
EmptyVoteTimeoutID = "rebroadcast_empty_vote"
31+
DefaultMaxProposalWaitTime = 5 * time.Second
32+
DefaultReplicationRequestTimeout = 5 * time.Second
33+
DefaultEmptyVoteRebroadcastTimeout = 5 * time.Second
34+
DefaultFinalizeVoteRebroadcastTimeout = 6 * time.Second
35+
EmptyVoteTimeoutID = "rebroadcast_empty_vote"
3536
)
3637

3738
type EmptyVoteSet struct {
@@ -59,22 +60,23 @@ func NewRound(block VerifiedBlock) *Round {
5960
}
6061

6162
type EpochConfig struct {
62-
MaxProposalWait time.Duration
63-
MaxRebroadcastWait time.Duration
64-
QCDeserializer QCDeserializer
65-
Logger Logger
66-
ID NodeID
67-
Signer Signer
68-
Verifier SignatureVerifier
69-
BlockDeserializer BlockDeserializer
70-
SignatureAggregator SignatureAggregator
71-
Comm Communication
72-
Storage Storage
73-
WAL WriteAheadLog
74-
BlockBuilder BlockBuilder
75-
Epoch uint64
76-
StartTime time.Time
77-
ReplicationEnabled bool
63+
MaxProposalWait time.Duration
64+
MaxRebroadcastWait time.Duration
65+
FinalizeRebroadcastTimeout time.Duration
66+
QCDeserializer QCDeserializer
67+
Logger Logger
68+
ID NodeID
69+
Signer Signer
70+
Verifier SignatureVerifier
71+
BlockDeserializer BlockDeserializer
72+
SignatureAggregator SignatureAggregator
73+
Comm Communication
74+
Storage Storage
75+
WAL WriteAheadLog
76+
BlockBuilder BlockBuilder
77+
Epoch uint64
78+
StartTime time.Time
79+
ReplicationEnabled bool
7880
}
7981

8082
type Epoch struct {
@@ -94,6 +96,7 @@ type Epoch struct {
9496
quorumSize int
9597
rounds map[uint64]*Round
9698
emptyVotes map[uint64]*EmptyVoteSet
99+
oldestNotFinalizedNotarization NotarizationTime
97100
futureMessages messagesFromNode
98101
round uint64 // The current round we notarize
99102
maxRoundWindow uint64
@@ -119,6 +122,7 @@ func (e *Epoch) AdvanceTime(t time.Time) {
119122
e.monitor.AdvanceTime(t)
120123
e.replicationState.AdvanceTime(t)
121124
e.timeoutHandler.Tick(t)
125+
e.oldestNotFinalizedNotarization.CheckForNotFinalizedNotarizedBlocks(t)
122126
}
123127

124128
// HandleMessage notifies the engine about a reception of a message.
@@ -174,6 +178,8 @@ func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
174178
}
175179

176180
func (e *Epoch) init() error {
181+
e.maybeAssignDefaultConfig()
182+
e.initOldestNotFinalizedNotarization()
177183
e.oneTimeVerifier = newOneTimeVerifier(e.Logger)
178184
e.sched = NewScheduler(e.Logger)
179185
e.monitor = NewMonitor(e.StartTime, e.Logger)
@@ -211,6 +217,40 @@ func (e *Epoch) init() error {
211217
return e.setMetadataFromStorage()
212218
}
213219

220+
func (e *Epoch) initOldestNotFinalizedNotarization() {
221+
rebroadcastFinalizationVotes := func() {
222+
e.lock.Lock()
223+
defer e.lock.Unlock()
224+
225+
if err := e.rebroadcastPastFinalizeVotes(); err != nil {
226+
e.Logger.Error("Could not rebroadcast past finalization votes", zap.Error(err))
227+
}
228+
}
229+
e.oldestNotFinalizedNotarization = NewNotarizationTime(
230+
e.FinalizeRebroadcastTimeout,
231+
e.haveNotFinalizedNotarizedRound,
232+
rebroadcastFinalizationVotes, e.getRound)
233+
}
234+
235+
func (e *Epoch) getRound() uint64 {
236+
e.lock.Lock()
237+
defer e.lock.Unlock()
238+
239+
return e.round
240+
}
241+
242+
func (e *Epoch) maybeAssignDefaultConfig() {
243+
if e.FinalizeRebroadcastTimeout == 0 {
244+
e.FinalizeRebroadcastTimeout = DefaultFinalizeVoteRebroadcastTimeout
245+
}
246+
if e.MaxProposalWait == 0 {
247+
e.MaxProposalWait = DefaultMaxProposalWaitTime
248+
}
249+
if e.MaxRebroadcastWait == 0 {
250+
e.MaxRebroadcastWait = DefaultEmptyVoteRebroadcastTimeout
251+
}
252+
}
253+
214254
func (e *Epoch) Start() error {
215255
if e.canReceiveMessages.Load() {
216256
return ErrAlreadyStarted
@@ -2688,6 +2728,24 @@ func (e *Epoch) locateQuorumRecord(seq uint64) *VerifiedQuorumRound {
26882728
}
26892729
}
26902730

2731+
func (e *Epoch) haveNotFinalizedNotarizedRound() (uint64, bool) {
2732+
e.lock.Lock()
2733+
defer e.lock.Unlock()
2734+
2735+
var minRoundNum uint64
2736+
var found bool
2737+
for _, round := range e.rounds {
2738+
if !found {
2739+
minRoundNum = round.num
2740+
found = true
2741+
} else if round.num < minRoundNum {
2742+
minRoundNum = round.num
2743+
}
2744+
}
2745+
2746+
return minRoundNum, found
2747+
}
2748+
26912749
func (e *Epoch) handleReplicationResponse(resp *ReplicationResponse, from NodeID) error {
26922750
if !e.ReplicationEnabled {
26932751
return nil

epoch_multinode_test.go

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,120 @@
44
package simplex_test
55

66
import (
7-
"testing"
8-
97
"github.com/ava-labs/simplex"
108
"github.com/ava-labs/simplex/testutil"
119
"github.com/stretchr/testify/require"
10+
"sync"
11+
"sync/atomic"
12+
"testing"
13+
"time"
1214
)
1315

16+
func TestSimplexRebroadcastFinalizationVotes(t *testing.T) {
17+
nodes := []simplex.NodeID{{1}, {2}, {3}, {4}}
18+
net := testutil.NewInMemNetwork(t, nodes)
19+
20+
var allowFinalizeVotes atomic.Bool
21+
22+
var numFinalizeVotesSent atomic.Uint32
23+
24+
config := func(from simplex.NodeID) *testutil.TestNodeConfig {
25+
return &testutil.TestNodeConfig{
26+
Comm: testutil.NewTestComm(from, net, func(msg *simplex.Message, from simplex.NodeID, to simplex.NodeID) bool {
27+
if msg.Finalization != nil && !allowFinalizeVotes.Load() {
28+
return false
29+
}
30+
if allowFinalizeVotes.Load() && msg.FinalizeVote != nil {
31+
numFinalizeVotesSent.Add(1)
32+
}
33+
return allowFinalizeVotes.Load() || msg.FinalizeVote == nil
34+
}),
35+
}
36+
}
37+
38+
testutil.NewSimplexNode(t, nodes[0], net, config(nodes[0]))
39+
testutil.NewSimplexNode(t, nodes[1], net, config(nodes[1]))
40+
testutil.NewSimplexNode(t, nodes[2], net, config(nodes[2]))
41+
testutil.NewSimplexNode(t, nodes[3], net, config(nodes[3]))
42+
43+
net.StartInstances()
44+
45+
lastSeq := uint64(9)
46+
47+
for seq := uint64(0); seq <= lastSeq; seq++ {
48+
for _, n := range net.Instances {
49+
testutil.WaitToEnterRound(t, n.E, seq)
50+
}
51+
net.TriggerLeaderBlockBuilder(seq)
52+
for _, n := range net.Instances {
53+
n.WAL.AssertNotarization(seq)
54+
}
55+
}
56+
57+
var wg sync.WaitGroup
58+
wg.Add(len(net.Instances))
59+
60+
for _, n := range net.Instances {
61+
go func(n *testutil.TestNode) {
62+
defer wg.Done()
63+
n.Storage.EnsureNoBlockCommit(t, 0)
64+
}(n)
65+
}
66+
67+
wg.Wait()
68+
69+
allowFinalizeVotes.Store(true)
70+
71+
require.Eventually(t, func() bool {
72+
var allHaveFinalized bool = true
73+
for _, n := range net.Instances {
74+
if n.Storage.NumBlocks() < lastSeq+1 {
75+
allHaveFinalized = false
76+
break
77+
}
78+
}
79+
80+
if allHaveFinalized {
81+
return true
82+
}
83+
84+
for _, n := range net.Instances {
85+
n.AdvanceTime(simplex.DefaultFinalizeVoteRebroadcastTimeout / 3)
86+
}
87+
88+
return false
89+
90+
}, time.Second*10, time.Millisecond*100)
91+
92+
// Close the recorded messages channel. A message sent to this channel will cause a panic.
93+
finalizeVoteSentCount := numFinalizeVotesSent.Load()
94+
95+
// Advance the time to make sure we do not continue to send finalize votes.
96+
for _, n := range net.Instances {
97+
n.AdvanceTime(simplex.DefaultFinalizeVoteRebroadcastTimeout * 2)
98+
n.AdvanceTime(simplex.DefaultFinalizeVoteRebroadcastTimeout * 2)
99+
n.AdvanceTime(simplex.DefaultFinalizeVoteRebroadcastTimeout * 2)
100+
}
101+
102+
require.Equal(t, finalizeVoteSentCount, numFinalizeVotesSent.Load(), "no more finalize votes should have been sent")
103+
104+
// Next, we run the nodes and notarize 100 blocks, and ensure that less than 400 finalize votes were sent.
105+
previousVoteSentCount := finalizeVoteSentCount // We copy the value just to give it a better name.
106+
for seq := lastSeq + 1; seq < lastSeq+101; seq++ {
107+
for _, n := range net.Instances {
108+
testutil.WaitToEnterRound(t, n.E, seq)
109+
}
110+
net.TriggerLeaderBlockBuilder(seq)
111+
for _, n := range net.Instances {
112+
n.Storage.WaitForBlockCommit(seq)
113+
n.AdvanceTime(simplex.DefaultFinalizeVoteRebroadcastTimeout)
114+
}
115+
}
116+
117+
require.LessOrEqual(t, previousVoteSentCount+400, numFinalizeVotesSent.Load())
118+
119+
}
120+
14121
func TestSimplexMultiNodeSimple(t *testing.T) {
15122
nodes := []simplex.NodeID{{1}, {2}, {3}, {4}}
16123
net := testutil.NewInMemNetwork(t, nodes)

testutil/node.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ package testutil
55

66
import (
77
"context"
8+
"sync/atomic"
89
"testing"
10+
"time"
911

1012
"github.com/ava-labs/simplex"
1113
"github.com/stretchr/testify/require"
1214
)
1315

1416
type TestNode struct {
15-
WAL *TestWAL
16-
Storage *InMemStorage
17-
E *simplex.Epoch
18-
ingress chan struct {
17+
currentTime atomic.Int64
18+
WAL *TestWAL
19+
Storage *InMemStorage
20+
E *simplex.Epoch
21+
ingress chan struct {
1922
msg *simplex.Message
2023
from simplex.NodeID
2124
}
@@ -48,6 +51,8 @@ func NewSimplexNode(t *testing.T, nodeID simplex.NodeID, net *InMemNetwork, conf
4851
from simplex.NodeID
4952
}, 100)}
5053

54+
ti.currentTime.Store(epochConfig.StartTime.UnixMilli())
55+
5156
net.addNode(ti)
5257
return ti
5358
}
@@ -87,6 +92,12 @@ func (t *TestNode) TriggerBlockShouldBeBuilt() {
8792
}
8893
}
8994

95+
func (t *TestNode) AdvanceTime(duration time.Duration) {
96+
now := time.UnixMilli(t.currentTime.Load()).Add(duration)
97+
t.currentTime.Store(now.UnixMilli())
98+
t.E.AdvanceTime(now)
99+
}
100+
90101
func (t *TestNode) Silence() {
91102
t.l.Silence()
92103
}

testutil/util.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@ func DefaultTestNodeEpochConfig(t *testing.T, nodeID simplex.NodeID, comm simple
1818
storage := NewInMemStorage()
1919
wal := NewTestWAL(t)
2020
conf := simplex.EpochConfig{
21-
MaxProposalWait: simplex.DefaultMaxProposalWaitTime,
22-
MaxRebroadcastWait: simplex.DefaultEmptyVoteRebroadcastTimeout,
23-
Comm: comm,
24-
Logger: l,
25-
ID: nodeID,
26-
Signer: &testSigner{},
27-
WAL: wal,
28-
Verifier: &testVerifier{},
29-
Storage: storage,
30-
BlockBuilder: bb,
31-
SignatureAggregator: &TestSignatureAggregator{},
32-
BlockDeserializer: &BlockDeserializer{},
33-
QCDeserializer: &testQCDeserializer{t: t},
34-
StartTime: time.Now(),
21+
MaxProposalWait: simplex.DefaultMaxProposalWaitTime,
22+
MaxRebroadcastWait: simplex.DefaultEmptyVoteRebroadcastTimeout,
23+
FinalizeRebroadcastTimeout: simplex.DefaultFinalizeVoteRebroadcastTimeout,
24+
Comm: comm,
25+
Logger: l,
26+
ID: nodeID,
27+
Signer: &testSigner{},
28+
WAL: wal,
29+
Verifier: &testVerifier{},
30+
Storage: storage,
31+
BlockBuilder: bb,
32+
SignatureAggregator: &TestSignatureAggregator{},
33+
BlockDeserializer: &BlockDeserializer{},
34+
QCDeserializer: &testQCDeserializer{t: t},
35+
StartTime: time.Now(),
3536
}
3637
return conf, wal, storage
3738
}

0 commit comments

Comments
 (0)