Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean vbft event timer #1446

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 30 additions & 127 deletions consensus/vbft/event_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package vbft

import (
"container/heap"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -79,17 +77,14 @@ type EventTimer struct {

// peer heartbeat tickers
peerTickers map[uint32]*time.Timer
// other timers
normalTimers map[uint32]*time.Timer
}

func NewEventTimer(server *Server) *EventTimer {
timer := &EventTimer{
server: server,
C: make(chan *TimerEvent, 64),
eventTimers: make(map[TimerEventType]perBlockTimer),
peerTickers: make(map[uint32]*time.Timer),
normalTimers: make(map[uint32]*time.Timer),
server: server,
C: make(chan *TimerEvent, 64),
eventTimers: make(map[TimerEventType]perBlockTimer),
peerTickers: make(map[uint32]*time.Timer),
}

for i := 0; i < int(EventMax); i++ {
Expand All @@ -114,42 +109,6 @@ func (self *EventTimer) stop() {
stopAllTimers(self.eventTimers[TimerEventType(i)])
self.eventTimers[TimerEventType(i)] = make(map[uint32]*time.Timer)
}

// clear normal timers
stopAllTimers(self.normalTimers)
self.normalTimers = make(map[uint32]*time.Timer)
}

func (self *EventTimer) StartTimer(Idx uint32, timeout time.Duration) {
self.lock.Lock()
defer self.lock.Unlock()

if t, present := self.normalTimers[Idx]; present {
t.Stop()
log.Infof("timer for %d got reset", Idx)
}

self.normalTimers[Idx] = time.AfterFunc(timeout, func() {
// remove timer from map
self.lock.Lock()
defer self.lock.Unlock()
delete(self.normalTimers, Idx)

self.C <- &TimerEvent{
evtType: EventMax,
blockNum: Idx,
}
})
}

func (self *EventTimer) CancelTimer(idx uint32) {
self.lock.Lock()
defer self.lock.Unlock()

if t, present := self.normalTimers[idx]; present {
t.Stop()
delete(self.normalTimers, idx)
}
}

func (self *EventTimer) getEventTimeout(evtType TimerEventType) time.Duration {
Expand Down Expand Up @@ -180,15 +139,13 @@ func (self *EventTimer) getEventTimeout(evtType TimerEventType) time.Duration {
return time.Duration(txPooltimeout)
case EventTxBlockTimeout:
return time.Duration(atomic.LoadInt64(&zeroTxBlockTimeout))
default:
panic("unknown timer event type")
}

return 0
}

//
// internal helper, should call with lock held
//
func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) error {
func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) {
timers := self.eventTimers[evtType]
if t, present := timers[blockNum]; present {
t.Stop()
Expand All @@ -198,21 +155,18 @@ func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32)

timeout := self.getEventTimeout(evtType)
if timeout == 0 {
log.Errorf("invalid timeout for event %d, blkNum %d", evtType, blockNum)
return fmt.Errorf("invalid timeout for event %d, blkNum %d", evtType, blockNum)
// never happen when config correctly
timeout = time.Second
}
timers[blockNum] = time.AfterFunc(timeout, func() {
self.C <- &TimerEvent{
evtType: evtType,
blockNum: blockNum,
}
})
return nil
}

//
// internal helper, should call with lock held
//
func (self *EventTimer) cancelEventTimer(evtType TimerEventType, blockNum uint32) {
timers := self.eventTimers[evtType]

Expand All @@ -222,12 +176,12 @@ func (self *EventTimer) cancelEventTimer(evtType TimerEventType, blockNum uint32
}
}

func (self *EventTimer) StartProposalTimer(blockNum uint32) error {
func (self *EventTimer) StartProposalTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started proposal timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventProposeBlockTimeout, blockNum)
self.startEventTimer(EventProposeBlockTimeout, blockNum)
}

func (self *EventTimer) CancelProposalTimer(blockNum uint32) {
Expand All @@ -237,12 +191,12 @@ func (self *EventTimer) CancelProposalTimer(blockNum uint32) {
self.cancelEventTimer(EventProposeBlockTimeout, blockNum)
}

func (self *EventTimer) StartEndorsingTimer(blockNum uint32) error {
func (self *EventTimer) StartEndorsingTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started endorsing timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventEndorseBlockTimeout, blockNum)
self.startEventTimer(EventEndorseBlockTimeout, blockNum)
}

func (self *EventTimer) CancelEndorseMsgTimer(blockNum uint32) {
Expand All @@ -252,12 +206,12 @@ func (self *EventTimer) CancelEndorseMsgTimer(blockNum uint32) {
self.cancelEventTimer(EventEndorseBlockTimeout, blockNum)
}

func (self *EventTimer) StartEndorseEmptyBlockTimer(blockNum uint32) error {
func (self *EventTimer) StartEndorseEmptyBlockTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started empty endorsing timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventEndorseEmptyBlockTimeout, blockNum)
self.startEventTimer(EventEndorseEmptyBlockTimeout, blockNum)
}

func (self *EventTimer) CancelEndorseEmptyBlockTimer(blockNum uint32) {
Expand All @@ -267,12 +221,12 @@ func (self *EventTimer) CancelEndorseEmptyBlockTimer(blockNum uint32) {
self.cancelEventTimer(EventEndorseEmptyBlockTimeout, blockNum)
}

func (self *EventTimer) StartCommitTimer(blockNum uint32) error {
func (self *EventTimer) StartCommitTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

log.Infof("server %d started commit timer for blk %d", self.server.Index, blockNum)
return self.startEventTimer(EventCommitBlockTimeout, blockNum)
self.startEventTimer(EventCommitBlockTimeout, blockNum)
}

func (self *EventTimer) CancelCommitMsgTimer(blockNum uint32) {
Expand All @@ -282,11 +236,11 @@ func (self *EventTimer) CancelCommitMsgTimer(blockNum uint32) {
self.cancelEventTimer(EventCommitBlockTimeout, blockNum)
}

func (self *EventTimer) StartProposalBackoffTimer(blockNum uint32) error {
func (self *EventTimer) StartProposalBackoffTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventProposalBackoff, blockNum)
self.startEventTimer(EventProposalBackoff, blockNum)
}

func (self *EventTimer) CancelProposalBackoffTimer(blockNum uint32) {
Expand All @@ -296,11 +250,11 @@ func (self *EventTimer) CancelProposalBackoffTimer(blockNum uint32) {
self.cancelEventTimer(EventProposalBackoff, blockNum)
}

func (self *EventTimer) StartBackoffTimer(blockNum uint32) error {
func (self *EventTimer) StartBackoffTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventRandomBackoff, blockNum)
self.startEventTimer(EventRandomBackoff, blockNum)
}

func (self *EventTimer) CancelBackoffTimer(blockNum uint32) {
Expand All @@ -310,11 +264,11 @@ func (self *EventTimer) CancelBackoffTimer(blockNum uint32) {
self.cancelEventTimer(EventRandomBackoff, blockNum)
}

func (self *EventTimer) Start2ndProposalTimer(blockNum uint32) error {
func (self *EventTimer) Start2ndProposalTimer(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventPropose2ndBlockTimeout, blockNum)
self.startEventTimer(EventPropose2ndBlockTimeout, blockNum)
}

func (self *EventTimer) Cancel2ndProposalTimer(blockNum uint32) {
Expand All @@ -334,11 +288,11 @@ func (self *EventTimer) onBlockSealed(blockNum uint32) {
}
}

func (self *EventTimer) StartTxBlockTimeout(blockNum uint32) error {
func (self *EventTimer) StartTxBlockTimeout(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventTxBlockTimeout, blockNum)
self.startEventTimer(EventTxBlockTimeout, blockNum)
}

func (self *EventTimer) CancelTxBlockTimeout(blockNum uint32) {
Expand All @@ -348,7 +302,7 @@ func (self *EventTimer) CancelTxBlockTimeout(blockNum uint32) {
self.cancelEventTimer(EventTxBlockTimeout, blockNum)
}

func (self *EventTimer) startPeerTicker(peerIdx uint32) error {
func (self *EventTimer) startPeerTicker(peerIdx uint32) {
self.lock.Lock()
defer self.lock.Unlock()

Expand All @@ -365,79 +319,28 @@ func (self *EventTimer) startPeerTicker(peerIdx uint32) error {
}
self.peerTickers[peerIdx].Reset(timeout)
})

return nil
}

func (self *EventTimer) stopPeerTicker(peerIdx uint32) error {
func (self *EventTimer) stopPeerTicker(peerIdx uint32) {
self.lock.Lock()
defer self.lock.Unlock()

if p, present := self.peerTickers[peerIdx]; present {
p.Stop()
delete(self.peerTickers, peerIdx)
}
return nil
}

func (self *EventTimer) startTxTicker(blockNum uint32) error {
func (self *EventTimer) startTxPoolTicker(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

return self.startEventTimer(EventTxPool, blockNum)
self.startEventTimer(EventTxPool, blockNum)
}

func (self *EventTimer) stopTxTicker(blockNum uint32) {
func (self *EventTimer) stopTxPoolTicker(blockNum uint32) {
self.lock.Lock()
defer self.lock.Unlock()

self.cancelEventTimer(EventTxPool, blockNum)
}

///////////////////////////////////////////////////////////
//
// timer queue
//
///////////////////////////////////////////////////////////

type TimerItem struct {
due time.Time
evt *TimerEvent
index int
}

type TimerQueue []*TimerItem

func (tq TimerQueue) Len() int {
return len(tq)
}

func (tq TimerQueue) Less(i, j int) bool {
return tq[j].due.After(tq[i].due)
}

func (tq TimerQueue) Swap(i, j int) {
tq[i], tq[j] = tq[j], tq[i]
tq[i].index = i
tq[j].index = j
}

func (tq *TimerQueue) Push(x interface{}) {
item := x.(*TimerItem)
item.index = len(*tq)
*tq = append(*tq, item)
}

func (tq *TimerQueue) Pop() interface{} {
old := *tq
n := len(old)
item := old[n-1]
item.index = -1
*tq = old[0 : n-1]
return item
}

func (tq *TimerQueue) update(item *TimerItem, due time.Time) {
item.due = due
heap.Fix(tq, item.index)
}
16 changes: 2 additions & 14 deletions consensus/vbft/event_timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,13 @@ func constructEventTimer() *EventTimer {
return NewEventTimer(server)
}

func TestStartTimer(t *testing.T) {
eventtimer := constructEventTimer()
eventtimer.StartTimer(1, 10)
}

func TestCancelTimer(t *testing.T) {
eventtimer := constructEventTimer()
eventtimer.StartTimer(1, 10)
eventtimer.CancelTimer(1)
}
func TestStartEventTimer(t *testing.T) {
eventtimer := constructEventTimer()
err := eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
t.Logf("TestStartEventTimer: %v", err)
eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
}

func TestCancelEventTimer(t *testing.T) {
eventtimer := constructEventTimer()
err := eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
t.Logf("startEventTimer: %v", err)
eventtimer.startEventTimer(EventProposeBlockTimeout, 1)
eventtimer.cancelEventTimer(EventProposeBlockTimeout, 1)
}
Loading
Loading