diff --git a/consensus.go b/consensus.go index d0d9be4e..52465ca4 100644 --- a/consensus.go +++ b/consensus.go @@ -272,9 +272,17 @@ func (p *Pbft) runCycle(ctx context.Context) { func (p *Pbft) setSequence(sequence uint64) { p.state.view = &View{ - Round: 0, Sequence: sequence, } + p.setRound(0) +} + +func (p *Pbft) setRound(round uint64) { + p.state.view.Round = round + + // reset current timeout and start a new one + timeout := p.roundTimeout(round) + p.state.timeout = time.NewTimer(timeout) } // runAcceptState runs the Accept state loop @@ -359,13 +367,11 @@ func (p *Pbft) runAcceptState(ctx context.Context) { // start new round // we are NOT a proposer for this height/round. Then, we have to wait // for a pre-prepare message from the proposer - timeout := p.roundTimeout(p.state.view.Round) - // We only need to wait here for one type of message, the Prepare message from the proposer. // However, since we can receive bad Prepare messages we have to wait (or timeout) until // we get the message from the correct proposer. for p.getState() == AcceptState { - msg, ok := p.getNextMessage(span, timeout) + msg, ok := p.getNextMessage(span) if !ok { return } @@ -485,12 +491,10 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round } } - timeout := p.roundTimeout(p.state.view.Round) - for p.getState() == ValidateState { _, span := p.tracer.Start(ctx, "ValidateState") - msg, ok := p.getNextMessage(span, timeout) + msg, ok := p.getNextMessage(span) if !ok { // closing span.End() @@ -505,7 +509,7 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round // the message must have our local hash if !bytes.Equal(msg.Hash, p.state.proposal.Hash) { - p.logger.Print(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String())) + p.logger.Printf(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String())) continue } @@ -628,7 +632,7 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { sendRoundChange := func(round uint64) { p.logger.Printf("[DEBUG] local round change: round=%d", round) // set the new round - p.state.view.Round = round + p.setRound(round) // clean the round p.state.cleanRound(round) // send the round change message @@ -676,12 +680,10 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { } // create a timer for the round change - timeout := p.roundTimeout(p.state.view.Round) - for p.getState() == RoundChangeState { _, span := p.tracer.Start(ctx, "RoundChangeState") - msg, ok := p.getNextMessage(span, timeout) + msg, ok := p.getNextMessage(span) if !ok { // closing span.End() @@ -689,9 +691,10 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { } if msg == nil { p.logger.Print("[DEBUG] round change timeout") + + // checkTimeout will either produce a sync event and exit + // or restart the timeout checkTimeout() - // update the timeout duration - timeout = p.roundTimeout(p.state.view.Round) span.End() continue } @@ -707,7 +710,6 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { // weak certificate, try to catch up if our round number is smaller if p.state.view.Round < msg.View.Round { // update timer - timeout = p.roundTimeout(p.state.view.Round) sendRoundChange(msg.View.Round) } } @@ -819,8 +821,7 @@ func (p *Pbft) GetProposal() *Proposal { } // getNextMessage reads a new message from the message queue -func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) { - timeoutCh := time.After(timeout) +func (p *Pbft) getNextMessage(span trace.Span) (*MessageReq, bool) { for { msg, discards := p.notifier.ReadNextMessage(p) // send the discard messages @@ -840,7 +841,7 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR // wait until there is a new message or // someone closes the stopCh (i.e. timeout for round change) select { - case <-timeoutCh: + case <-p.state.timeout.C: span.AddEvent("Timeout") p.notifier.HandleTimeout(p.validator.NodeID(), stateToMsg(p.getState()), &View{ Round: p.state.view.Round, diff --git a/state.go b/state.go index f2ebc239..dc623828 100644 --- a/state.go +++ b/state.go @@ -211,13 +211,20 @@ type currentState struct { // If it is nil, it means that proposal isn't locked. lockedRound *uint64 + // timeout tracks the time left for this round + timeout *time.Timer + // Describes whether there has been an error during the computation err error } // newState creates a new state with reset round messages func newState() *currentState { - c := ¤tState{} + c := ¤tState{ + // this is a default value, it will get reset + // at every iteration + timeout: time.NewTimer(0), + } c.resetRoundMsgs() return c