Skip to content

Commit

Permalink
Merge pull request #3 from yihuang/backport-async-mempool-update
Browse files Browse the repository at this point in the history
perf: Make mempool update async from block.Commit (backport cometbft#3008)
  • Loading branch information
yihuang authored Oct 25, 2024
2 parents 53dcd9b + 89aa199 commit 05dc302
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .changelog/unreleased/features/3008-mempool-async-update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[consensus]` Make mempool updates asynchronous from consensus Commit's,
reducing latency for reaching consensus timeouts.
([#3008](https://github.com/cometbft/cometbft/pull/3008))
4 changes: 3 additions & 1 deletion spec/abci/abci++_app_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ will be received on the mempool connection during this processing step, providin
update all four
connection states to the latest committed state at the same time.

When `Commit` returns, CometBFT unlocks the mempool.
CometBFT unlocks the mempool after it has finished updating for the new block,
which occurs asynchronously from `Commit`.
See [Mempool Update](../mempool/mempool.md) for more information on what the `update` task does.

WARNING: if the ABCI app logic processing the `Commit` message sends a
`/broadcast_tx_sync` or `/broadcast_tx` and waits for the response
Expand Down
39 changes: 32 additions & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,34 +375,38 @@ func (blockExec *BlockExecutor) VerifyVoteExtension(ctx context.Context, vote *t
return nil
}

// Commit locks the mempool, runs the ABCI Commit message, and updates the
// Commit locks the mempool, runs the ABCI Commit message, and asynchronously starts updating the
// mempool.
// It returns the result of calling abci.Commit which is the height to retain (if any)).
// Commit returns the result of calling abci.Commit which is the height to retain (if any)).
// The application is expected to have persisted its state (if any) before returning
// from the ABCI Commit call. This is the only place where the application should
// persist its state.
// The Mempool must be locked during commit and update because state is
// typically reset on Commit and old txs must be replayed against committed
// state before new txs are run in the mempool, lest they be invalid.
// The mempool is unlocked when the Update routine completes, which is
// asynchronous from Commit.
func (blockExec *BlockExecutor) Commit(
state State,
block *types.Block,
abciResponse *abci.ResponseFinalizeBlock,
) (int64, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
unlockMempool := func() { blockExec.mempool.Unlock() }

// while mempool is Locked, flush to ensure all async requests have completed
// in the ABCI app before Commit.
err := blockExec.mempool.FlushAppConn()
if err != nil {
blockExec.logger.Error("client error during mempool.FlushAppConn", "err", err)
unlockMempool()
blockExec.logger.Error("client error during mempool.FlushAppConn, flushing mempool", "err", err)
return 0, err
}

// Commit block, get hash back
res, err := blockExec.proxyApp.Commit(context.TODO())
if err != nil {
unlockMempool()
blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err)
return 0, err
}
Expand All @@ -415,15 +419,36 @@ func (blockExec *BlockExecutor) Commit(
)

// Update mempool.
err = blockExec.mempool.Update(
go blockExec.asyncUpdateMempool(unlockMempool, block, state.Copy(), abciResponse)

return res.RetainHeight, nil
}

// updates the mempool with the latest state asynchronously.
func (blockExec *BlockExecutor) asyncUpdateMempool(
unlockMempool func(),
block *types.Block,
state State,
abciResponse *abci.ResponseFinalizeBlock,
) {
defer unlockMempool()

err := blockExec.mempool.Update(
block.Height,
block.Txs,
abciResponse.TxResults,
TxPreCheck(state),
TxPostCheck(state),
)

return res.RetainHeight, err
if err != nil {
// We panic in this case, out of legacy behavior. Before we made the mempool
// update complete asynchronously from Commit, we would panic if the mempool
// update failed. This is because we panic on any error within commit.
// We should consider changing this behavior in the future, as there is no
// need to panic if the mempool update failed. The most severe thing we
// would need to do is dump the mempool and restart it.
panic(fmt.Sprintf("client error during mempool.Update; error %v", err))
}
}

//---------------------------------------------------------
Expand Down

0 comments on commit 05dc302

Please sign in to comment.