diff --git a/consensus/state.go b/consensus/state.go index d47b43f2413..28372479ae6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -42,7 +42,10 @@ var ( errPubKeyIsNotSet = errors.New("pubkey is not set. Look for \"Can't get private validator pubkey\" errors") ) -var msgQueueSize = 1000 +var ( + msgQueueSize = 1000 + taskQueueSize = 128 +) // msgs from the reactor which may update the state type msgInfo struct { @@ -160,6 +163,7 @@ func NewState( evpool evidencePool, options ...StateOption, ) *State { + blockExec.SetTaskRunner(spawnTaskRunner(taskQueueSize)) cs := &State{ config: config, blockExec: blockExec, @@ -2585,3 +2589,16 @@ func repairWalFile(src, dst string) error { return nil } + +// spawnTaskRunner spawn a single goroutine to run tasks in FIFO order. +func spawnTaskRunner(buf int) func(func()) { + taskCh := make(chan func(), buf) + go func() { + for f := range taskCh { + f() + } + }() + return func(f func()) { + taskCh <- f + } +} diff --git a/state/execution.go b/state/execution.go index 62b05be3455..5a953eb0101 100644 --- a/state/execution.go +++ b/state/execution.go @@ -43,6 +43,8 @@ type BlockExecutor struct { logger log.Logger metrics *Metrics + + asyncRunner func(func()) } type BlockExecutorOption func(executor *BlockExecutor) @@ -53,6 +55,12 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { } } +func BlockExecutorWithAsyncRunner(runner func(func())) BlockExecutorOption { + return func(blockExec *BlockExecutor) { + blockExec.asyncRunner = runner + } +} + // NewBlockExecutor returns a new BlockExecutor with a NopEventBus. // Call SetEventBus to provide one. func NewBlockExecutor( @@ -92,6 +100,10 @@ func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) blockExec.eventBus = eventBus } +func (blockExec *BlockExecutor) SetTaskRunner(runner func(func())) { + blockExec.asyncRunner = runner +} + // CreateProposalBlock calls state.MakeBlock with evidence from the evpool // and txs from the mempool. The max bytes must be big enough to fit the commit. // The block space is first allocated to outstanding evidence. @@ -318,7 +330,15 @@ func (blockExec *BlockExecutor) applyBlock(state State, blockID types.BlockID, b if _, ok := blockExec.eventBus.(types.NopEventBus); !ok { // Events are fired after everything else. // NOTE: if we crash between Commit and Save, events wont be fired during replay - go fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponse, validatorUpdates) + task := func() { + fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponse, validatorUpdates) + } + + if blockExec.asyncRunner != nil { + blockExec.asyncRunner(task) + } else { + task() + } } return state, nil