From 1f13fb7cbc4707c780d93899293b4b657a15bc3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20Negovanovi=C4=87?= Date: Wed, 6 Apr 2022 13:27:24 +0200 Subject: [PATCH] Locked iterator implementation --- consensus.go | 12 ++++++------ msg_queue.go | 20 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/consensus.go b/consensus.go index 867a6a0f..d0d9be4e 100644 --- a/consensus.go +++ b/consensus.go @@ -437,12 +437,7 @@ func (p *Pbft) shouldRelock(preprepareMsg *MessageReq) bool { } commitMsgsFound := 0 - commitMsgsQueue := p.msgQueue.getQueue(msgToState(MessageReq_Commit)) - if commitMsgsQueue == nil { - p.logger.Printf("[ERROR] Failed to resolve message queue for %s message type.", MessageReq_Commit) - return false - } - commitMsgsQueue.Iterator(func(currentMsg *MessageReq) { + err := p.msgQueue.iterator(msgToState(MessageReq_Commit), func(currentMsg *MessageReq) { // Logical condition below decomposes to the following. // Count messages that have following properties: // 1. message is from the current round (and sequence), @@ -456,6 +451,11 @@ func (p *Pbft) shouldRelock(preprepareMsg *MessageReq) bool { commitMsgsFound++ } }) + if err != nil { + p.logger.Printf("[ERROR] Iterator failed. Reason: %v", err) + return false + } + // 2*F Commit messages (+1 commit message will correspond to the current non-proposer node COMMIT message) return commitMsgsFound >= p.state.NumValid() } diff --git a/msg_queue.go b/msg_queue.go index 4735e883..8b8d20b5 100644 --- a/msg_queue.go +++ b/msg_queue.go @@ -2,6 +2,7 @@ package pbft import ( "container/heap" + "fmt" "sync" ) @@ -92,6 +93,20 @@ func (m *msgQueue) getQueue(state PbftState) *msgQueueImpl { } } +// iterator fetches corresponding message queue based on pbft state and invokes iteratorLocked method against it. +func (m *msgQueue) iterator(pbftState PbftState, iterateHandler func(*MessageReq)) error { + m.queueLock.Lock() + defer m.queueLock.Unlock() + + queueImpl := m.getQueue(pbftState) + if queueImpl == nil { + return fmt.Errorf("failed to resolve message queue for %s message type", MessageReq_Commit) + } + + queueImpl.iteratorLocked(iterateHandler) + return nil +} + // newMsgQueue creates a new message queue structure func newMsgQueue() *msgQueue { return &msgQueue{ @@ -177,8 +192,9 @@ func (m *msgQueueImpl) Pop() interface{} { return item } -// Iterator with custom handler which contains some arbitrary logic. -func (m *msgQueueImpl) Iterator(iterateHandler func(*MessageReq)) { +// iteratorLocked is iterator with custom handler which contains some arbitrary logic. +// It is assumed that method is invoked within the lock. +func (m *msgQueueImpl) iteratorLocked(iterateHandler func(*MessageReq)) { for _, msg := range *m { if iterateHandler != nil { iterateHandler(msg.Copy())