diff --git a/consensus.go b/consensus.go index 6d399d43..73bfd13f 100644 --- a/consensus.go +++ b/consensus.go @@ -272,9 +272,17 @@ func (p *Pbft) runCycle(ctx context.Context) { func (p *Pbft) setSequence(sequence uint64) { p.state.view = &View{ - Round: 0, Sequence: sequence, } + p.setRound(0) +} + +func (p *Pbft) setRound(round uint64) { + p.state.view.Round = round + + // reset current timeout and start a new one + timeout := p.roundTimeout(round) + p.state.timeout = time.NewTimer(timeout) } // runAcceptState runs the Accept state loop @@ -355,13 +363,11 @@ func (p *Pbft) runAcceptState(ctx context.Context) { // start new round // we are NOT a proposer for this height/round. Then, we have to wait // for a pre-prepare message from the proposer - timeout := p.roundTimeout(p.state.view.Round) - // We only need to wait here for one type of message, the Prepare message from the proposer. // However, since we can receive bad Prepare messages we have to wait (or timeout) until // we get the message from the correct proposer. for p.getState() == AcceptState { - msg, ok := p.getNextMessage(span, timeout) + msg, ok := p.getNextMessage(span) if !ok { return } @@ -426,12 +432,10 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round } } - timeout := p.roundTimeout(p.state.view.Round) - for p.getState() == ValidateState { _, span := p.tracer.Start(ctx, "ValidateState") - msg, ok := p.getNextMessage(span, timeout) + msg, ok := p.getNextMessage(span) if !ok { // closing span.End() @@ -446,7 +450,7 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round // the message must have our local hash if !bytes.Equal(msg.Hash, p.state.proposal.Hash) { - p.logger.Print(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String())) + p.logger.Printf(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String())) continue } @@ -566,7 +570,7 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { sendRoundChange := func(round uint64) { p.logger.Printf("[DEBUG] local round change: round=%d", round) // set the new round - p.state.view.Round = round + p.setRound(round) // clean the round p.state.cleanRound(round) // send the round change message @@ -614,12 +618,10 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { } // create a timer for the round change - timeout := p.roundTimeout(p.state.view.Round) - for p.getState() == RoundChangeState { _, span := p.tracer.Start(ctx, "RoundChangeState") - msg, ok := p.getNextMessage(span, timeout) + msg, ok := p.getNextMessage(span) if !ok { // closing span.End() @@ -627,9 +629,10 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { } if msg == nil { p.logger.Print("[DEBUG] round change timeout") + + // checkTimeout will either produce a sync event and exit + // or restart the timeout checkTimeout() - // update the timeout duration - timeout = p.roundTimeout(p.state.view.Round) span.End() continue } @@ -645,7 +648,6 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) { // weak certificate, try to catch up if our round number is smaller if p.state.view.Round < msg.View.Round { // update timer - timeout = p.roundTimeout(p.state.view.Round) sendRoundChange(msg.View.Round) } } @@ -757,8 +759,7 @@ func (p *Pbft) GetProposal() *Proposal { } // getNextMessage reads a new message from the message queue -func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) { - timeoutCh := time.After(timeout) +func (p *Pbft) getNextMessage(span trace.Span) (*MessageReq, bool) { for { msg, discards := p.notifier.ReadNextMessage(p) // send the discard messages @@ -778,7 +779,7 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR // wait until there is a new message or // someone closes the stopCh (i.e. timeout for round change) select { - case <-timeoutCh: + case <-p.state.timeout.C: span.AddEvent("Timeout") p.notifier.HandleTimeout(p.validator.NodeID(), stateToMsg(p.getState()), &View{ Round: p.state.view.Round, diff --git a/state.go b/state.go index b02f768c..1517ee7a 100644 --- a/state.go +++ b/state.go @@ -210,13 +210,20 @@ type currentState struct { // Locked signals whether the proposal is locked locked bool + // timeout tracks the time left for this round + timeout *time.Timer + // Describes whether there has been an error during the computation err error } // newState creates a new state with reset round messages func newState() *currentState { - c := ¤tState{} + c := ¤tState{ + // this is a default value, it will get reset + // at every iteration + timeout: time.NewTimer(0), + } c.resetRoundMsgs() return c