Skip to content

Commit

Permalink
Locked iterator implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan-Ethernal authored and 0xSasaPrsic committed Jun 14, 2022
1 parent 9049a41 commit 938b9be
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
12 changes: 6 additions & 6 deletions consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,12 +443,7 @@ func (p *Pbft) shouldRelock(preprepareMsg *MessageReq) bool {
}

commitMsgsFound := 0
commitMsgsQueue := p.msgQueue.getQueue(msgToState(MessageReq_Commit))
if commitMsgsQueue == nil {
p.logger.Printf("[ERROR] Failed to resolve message queue for %s message type.", MessageReq_Commit)
return false
}
commitMsgsQueue.Iterator(func(currentMsg *MessageReq) {
err := p.msgQueue.iterator(msgToState(MessageReq_Commit), func(currentMsg *MessageReq) {
// Logical condition below decomposes to the following.
// Count messages that have following properties:
// 1. message is from the current round (and sequence),
Expand All @@ -462,6 +457,11 @@ func (p *Pbft) shouldRelock(preprepareMsg *MessageReq) bool {
commitMsgsFound++
}
})
if err != nil {
p.logger.Printf("[ERROR] Iterator failed. Reason: %v", err)
return false
}

// 2*F Commit messages (+1 commit message will correspond to the current non-proposer node COMMIT message)
return commitMsgsFound >= p.state.NumValid()
}
Expand Down
20 changes: 18 additions & 2 deletions msg_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pbft

import (
"container/heap"
"fmt"
"sync"
)

Expand Down Expand Up @@ -92,6 +93,20 @@ func (m *msgQueue) getQueue(state PbftState) *msgQueueImpl {
}
}

// iterator fetches corresponding message queue based on pbft state and invokes iteratorLocked method against it.
func (m *msgQueue) iterator(pbftState PbftState, iterateHandler func(*MessageReq)) error {
m.queueLock.Lock()
defer m.queueLock.Unlock()

queueImpl := m.getQueue(pbftState)
if queueImpl == nil {
return fmt.Errorf("failed to resolve message queue for %s message type", MessageReq_Commit)
}

queueImpl.iteratorLocked(iterateHandler)
return nil
}

// newMsgQueue creates a new message queue structure
func newMsgQueue() *msgQueue {
return &msgQueue{
Expand Down Expand Up @@ -177,8 +192,9 @@ func (m *msgQueueImpl) Pop() interface{} {
return item
}

// Iterator with custom handler which contains some arbitrary logic.
func (m *msgQueueImpl) Iterator(iterateHandler func(*MessageReq)) {
// iteratorLocked is iterator with custom handler which contains some arbitrary logic.
// It is assumed that method is invoked within the lock.
func (m *msgQueueImpl) iteratorLocked(iterateHandler func(*MessageReq)) {
for _, msg := range *m {
if iterateHandler != nil {
iterateHandler(msg.Copy())
Expand Down

0 comments on commit 938b9be

Please sign in to comment.