Skip to content

Commit

Permalink
Merge branch 'main' into pos-173-fix-ibft-liveness-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
0xSasaPrsic committed Apr 26, 2022
2 parents 1f13fb7 + 013f679 commit 0a7043e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
37 changes: 19 additions & 18 deletions consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,17 @@ func (p *Pbft) runCycle(ctx context.Context) {

func (p *Pbft) setSequence(sequence uint64) {
p.state.view = &View{
Round: 0,
Sequence: sequence,
}
p.setRound(0)
}

func (p *Pbft) setRound(round uint64) {
p.state.view.Round = round

// reset current timeout and start a new one
timeout := p.roundTimeout(round)
p.state.timeout = time.NewTimer(timeout)
}

// runAcceptState runs the Accept state loop
Expand Down Expand Up @@ -359,13 +367,11 @@ func (p *Pbft) runAcceptState(ctx context.Context) { // start new round
// we are NOT a proposer for this height/round. Then, we have to wait
// for a pre-prepare message from the proposer

timeout := p.roundTimeout(p.state.view.Round)

// We only need to wait here for one type of message, the Prepare message from the proposer.
// However, since we can receive bad Prepare messages we have to wait (or timeout) until
// we get the message from the correct proposer.
for p.getState() == AcceptState {
msg, ok := p.getNextMessage(span, timeout)
msg, ok := p.getNextMessage(span)
if !ok {
return
}
Expand Down Expand Up @@ -485,12 +491,10 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round
}
}

timeout := p.roundTimeout(p.state.view.Round)

for p.getState() == ValidateState {
_, span := p.tracer.Start(ctx, "ValidateState")

msg, ok := p.getNextMessage(span, timeout)
msg, ok := p.getNextMessage(span)
if !ok {
// closing
span.End()
Expand All @@ -505,7 +509,7 @@ func (p *Pbft) runValidateState(ctx context.Context) { // start new round

// the message must have our local hash
if !bytes.Equal(msg.Hash, p.state.proposal.Hash) {
p.logger.Print(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String()))
p.logger.Printf(fmt.Sprintf("[WARN]: incorrect hash in %s message", msg.Type.String()))
continue
}

Expand Down Expand Up @@ -628,7 +632,7 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) {
sendRoundChange := func(round uint64) {
p.logger.Printf("[DEBUG] local round change: round=%d", round)
// set the new round
p.state.view.Round = round
p.setRound(round)
// clean the round
p.state.cleanRound(round)
// send the round change message
Expand Down Expand Up @@ -676,22 +680,21 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) {
}

// create a timer for the round change
timeout := p.roundTimeout(p.state.view.Round)

for p.getState() == RoundChangeState {
_, span := p.tracer.Start(ctx, "RoundChangeState")

msg, ok := p.getNextMessage(span, timeout)
msg, ok := p.getNextMessage(span)
if !ok {
// closing
span.End()
return
}
if msg == nil {
p.logger.Print("[DEBUG] round change timeout")

// checkTimeout will either produce a sync event and exit
// or restart the timeout
checkTimeout()
// update the timeout duration
timeout = p.roundTimeout(p.state.view.Round)
span.End()
continue
}
Expand All @@ -707,7 +710,6 @@ func (p *Pbft) runRoundChangeState(ctx context.Context) {
// weak certificate, try to catch up if our round number is smaller
if p.state.view.Round < msg.View.Round {
// update timer
timeout = p.roundTimeout(p.state.view.Round)
sendRoundChange(msg.View.Round)
}
}
Expand Down Expand Up @@ -819,8 +821,7 @@ func (p *Pbft) GetProposal() *Proposal {
}

// getNextMessage reads a new message from the message queue
func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) {
timeoutCh := time.After(timeout)
func (p *Pbft) getNextMessage(span trace.Span) (*MessageReq, bool) {
for {
msg, discards := p.notifier.ReadNextMessage(p)
// send the discard messages
Expand All @@ -840,7 +841,7 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR
// wait until there is a new message or
// someone closes the stopCh (i.e. timeout for round change)
select {
case <-timeoutCh:
case <-p.state.timeout.C:
span.AddEvent("Timeout")
p.notifier.HandleTimeout(p.validator.NodeID(), stateToMsg(p.getState()), &View{
Round: p.state.view.Round,
Expand Down
9 changes: 8 additions & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,20 @@ type currentState struct {
// If it is nil, it means that proposal isn't locked.
lockedRound *uint64

// timeout tracks the time left for this round
timeout *time.Timer

// Describes whether there has been an error during the computation
err error
}

// newState creates a new state with reset round messages
func newState() *currentState {
c := &currentState{}
c := &currentState{
// this is a default value, it will get reset
// at every iteration
timeout: time.NewTimer(0),
}
c.resetRoundMsgs()

return c
Expand Down

0 comments on commit 0a7043e

Please sign in to comment.