Skip to content

Commit

Permalink
Use timer only at the round level (#50)
Browse files Browse the repository at this point in the history
* Use timer only at the round level

* Create a new ticker instead of reset since Reset function does not behave as expected if timer is not expired.

Co-authored-by: Sasa Prsic <[email protected]>
  • Loading branch information
ferranbt and 0xSasaPrsic committed Apr 26, 2022
1 parent 27b452b commit 013f679
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
37 changes: 19 additions & 18 deletions consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,17 @@ func (p *Pbft) runCycle(ctx context.Context) {

func (p *Pbft) setSequence(sequence uint64) {
p.state.view = &View{
Round: 0,
Sequence: sequence,
}
p.setRound(0)
}

func (p *Pbft) setRound(round uint64) {
p.state.view.Round = round

// reset current timeout and start a new one
timeout := p.roundTimeout(round)
p.state.timeout = time.NewTimer(timeout)
}

// runAcceptState runs the Accept state loop
Expand Down Expand Up @@ -355,13 +363,11 @@ func (p *Pbft) runAcceptState(ctx context.Context) { // start new round
// we are NOT a proposer for this height/round. Then, we have to wait
// for a pre-prepare message from the proposer

timeout := p.roundTimeout(p.state.view.Round)

// We only need to wait here for one type of message, the Prepare message from the proposer.
// However, since we can receive bad Prepare messages we have to wait (or timeout) until
// we get the message from the correct proposer.
for p.getState() == AcceptState {
msg, ok := p.getNextMessage(span, timeout)
msg, ok := p.getNextMessage(span)
if !ok {
return
}
Expand Down Expand Up @@ -426,12 +432,10 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round
}
}

timeout := p.roundTimeout(p.state.view.Round)

for p.getState() == ValidateState {
_, span := p.tracer.Start(ctx, "ValidateState")

msg, ok := p.getNextMessage(span, timeout)
msg, ok := p.getNextMessage(span)
if !ok {
// closing
span.End()
Expand All @@ -446,7 +450,7 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round

// the message must have our local hash
if !bytes.Equal(msg.Hash, p.state.proposal.Hash) {
p.logger.Print(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String()))
p.logger.Printf(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String()))
continue
}

Expand Down Expand Up @@ -566,7 +570,7 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) {
sendRoundChange := func(round uint64) {
p.logger.Printf("[DEBUG] local round change: round=%d", round)
// set the new round
p.state.view.Round = round
p.setRound(round)
// clean the round
p.state.cleanRound(round)
// send the round change message
Expand Down Expand Up @@ -614,22 +618,21 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) {
}

// create a timer for the round change
timeout := p.roundTimeout(p.state.view.Round)

for p.getState() == RoundChangeState {
_, span := p.tracer.Start(ctx, "RoundChangeState")

msg, ok := p.getNextMessage(span, timeout)
msg, ok := p.getNextMessage(span)
if !ok {
// closing
span.End()
return
}
if msg == nil {
p.logger.Print("[DEBUG] round change timeout")

// checkTimeout will either produce a sync event and exit
// or restart the timeout
checkTimeout()
// update the timeout duration
timeout = p.roundTimeout(p.state.view.Round)
span.End()
continue
}
Expand All @@ -645,7 +648,6 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) {
// weak certificate, try to catch up if our round number is smaller
if p.state.view.Round < msg.View.Round {
// update timer
timeout = p.roundTimeout(p.state.view.Round)
sendRoundChange(msg.View.Round)
}
}
Expand Down Expand Up @@ -757,8 +759,7 @@ func (p *Pbft) GetProposal() *Proposal {
}

// getNextMessage reads a new message from the message queue
func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) {
timeoutCh := time.After(timeout)
func (p *Pbft) getNextMessage(span trace.Span) (*MessageReq, bool) {
for {
msg, discards := p.notifier.ReadNextMessage(p)
// send the discard messages
Expand All @@ -778,7 +779,7 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR
// wait until there is a new message or
// someone closes the stopCh (i.e. timeout for round change)
select {
case <-timeoutCh:
case <-p.state.timeout.C:
span.AddEvent("Timeout")
p.notifier.HandleTimeout(p.validator.NodeID(), stateToMsg(p.getState()), &View{
Round: p.state.view.Round,
Expand Down
9 changes: 8 additions & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,20 @@ type currentState struct {
// Locked signals whether the proposal is locked
locked bool

// timeout tracks the time left for this round
timeout *time.Timer

// Describes whether there has been an error during the computation
err error
}

// newState creates a new state with reset round messages
func newState() *currentState {
c := &currentState{}
c := &currentState{
// this is a default value, it will get reset
// at every iteration
timeout: time.NewTimer(0),
}
c.resetRoundMsgs()

return c
Expand Down

0 comments on commit 013f679

Please sign in to comment.