Skip to content

Commit

Permalink
Problem: async fireEvents could overlap (#6)
Browse files Browse the repository at this point in the history
* Problem: async fireEvents could overlap

follow up on #5

Solution:
- create a task runner to run async tasks

* don't wait for quit

* cleanup
  • Loading branch information
yihuang authored Nov 1, 2024
1 parent f2b043f commit 8136e47
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
19 changes: 18 additions & 1 deletion consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ var (
errPubKeyIsNotSet = errors.New("pubkey is not set. Look for \"Can't get private validator pubkey\" errors")
)

var msgQueueSize = 1000
var (
msgQueueSize = 1000
taskQueueSize = 128
)

// msgs from the reactor which may update the state
type msgInfo struct {
Expand Down Expand Up @@ -160,6 +163,7 @@ func NewState(
evpool evidencePool,
options ...StateOption,
) *State {
blockExec.SetTaskRunner(spawnTaskRunner(taskQueueSize))
cs := &State{
config: config,
blockExec: blockExec,
Expand Down Expand Up @@ -2585,3 +2589,16 @@ func repairWalFile(src, dst string) error {

return nil
}

// spawnTaskRunner spawn a single goroutine to run tasks in FIFO order.
func spawnTaskRunner(buf int) func(func()) {
taskCh := make(chan func(), buf)
go func() {
for f := range taskCh {
f()
}
}()
return func(f func()) {
taskCh <- f
}
}
22 changes: 21 additions & 1 deletion state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type BlockExecutor struct {
logger log.Logger

metrics *Metrics

asyncRunner func(func())
}

type BlockExecutorOption func(executor *BlockExecutor)
Expand All @@ -53,6 +55,12 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
}
}

func BlockExecutorWithAsyncRunner(runner func(func())) BlockExecutorOption {
return func(blockExec *BlockExecutor) {
blockExec.asyncRunner = runner
}
}

// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(
Expand Down Expand Up @@ -92,6 +100,10 @@ func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher)
blockExec.eventBus = eventBus
}

func (blockExec *BlockExecutor) SetTaskRunner(runner func(func())) {
blockExec.asyncRunner = runner
}

// CreateProposalBlock calls state.MakeBlock with evidence from the evpool
// and txs from the mempool. The max bytes must be big enough to fit the commit.
// The block space is first allocated to outstanding evidence.
Expand Down Expand Up @@ -318,7 +330,15 @@ func (blockExec *BlockExecutor) applyBlock(state State, blockID types.BlockID, b
if _, ok := blockExec.eventBus.(types.NopEventBus); !ok {
// Events are fired after everything else.
// NOTE: if we crash between Commit and Save, events wont be fired during replay
go fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponse, validatorUpdates)
task := func() {
fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, abciResponse, validatorUpdates)
}

if blockExec.asyncRunner != nil {
blockExec.asyncRunner(task)
} else {
task()
}
}

return state, nil
Expand Down

0 comments on commit 8136e47

Please sign in to comment.