Skip to content

Commit

Permalink
Txpool opt async priced (bnb-chain#246)
Browse files Browse the repository at this point in the history
Co-authored-by: andyzhang2023 <[email protected]>
  • Loading branch information
andyzhang2023 and andyzhang2023 authored Jan 9, 2025
1 parent cb54455 commit 3474c47
Show file tree
Hide file tree
Showing 7 changed files with 553 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolEnableAsyncPricedFlag,
utils.TxPoolAccountSlotsFlag,
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ var (
Value: ethconfig.Defaults.TxPool.PriceBump,
Category: flags.TxPoolCategory,
}
TxPoolEnableAsyncPricedFlag = &cli.BoolFlag{
Name: "txpool.asyncpriced",
Usage: "enable async-priced-sorted list for txpool",
Value: false,
Category: flags.TxPoolCategory,
}
TxPoolAccountSlotsFlag = &cli.Uint64Flag{
Name: "txpool.accountslots",
Usage: "Minimum number of executable transaction slots guaranteed per account",
Expand Down Expand Up @@ -1723,6 +1729,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolPriceBumpFlag.Name) {
cfg.PriceBump = ctx.Uint64(TxPoolPriceBumpFlag.Name)
}
if ctx.IsSet(TxPoolEnableAsyncPricedFlag.Name) {
cfg.EnableAsyncPriced = ctx.Bool(TxPoolEnableAsyncPricedFlag.Name)
}
if ctx.IsSet(TxPoolAccountSlotsFlag.Name) {
cfg.AccountSlots = ctx.Uint64(TxPoolAccountSlotsFlag.Name)
}
Expand Down
192 changes: 192 additions & 0 deletions core/txpool/legacypool/async_priced_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package legacypool

import (
"math/big"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/core/types"
)

var _ pricedListInterface = &asyncPricedList{}

type addEvent struct {
tx *types.Transaction
local bool
}

type asyncPricedList struct {
priced *pricedList
floatingLowest atomic.Value
urgentLowest atomic.Value
baseFee atomic.Value
mu sync.Mutex

// events
quit chan struct{}
reheap chan struct{}
add chan *addEvent
remove chan int
setBaseFee chan *big.Int
}

func newAsyncPricedList(all *lookup) *asyncPricedList {
a := &asyncPricedList{
priced: newPricedList(all),
quit: make(chan struct{}),
reheap: make(chan struct{}),
add: make(chan *addEvent),
remove: make(chan int),
setBaseFee: make(chan *big.Int),
}
go a.run()
return a
}

// run is a loop that handles async operations:
// - reheap: reheap the whole priced list, to get the lowest gas price
// - put: add a transaction to the priced list
// - remove: remove transactions from the priced list
// - discard: remove transactions to make room for new ones
func (a *asyncPricedList) run() {
var reheap bool
var newOnes []*types.Transaction
var toRemove int = 0
// current loop state
var currentDone chan struct{} = nil
var baseFee *big.Int = nil
for {
if currentDone == nil {
currentDone = make(chan struct{})
go a.handle(reheap, newOnes, toRemove, baseFee, currentDone)
reheap, newOnes, toRemove, baseFee = false, nil, 0, nil
}
select {
case <-a.reheap:
reheap = true

case add := <-a.add:
newOnes = append(newOnes, add.tx)

case remove := <-a.remove:
toRemove += remove

case baseFee = <-a.setBaseFee:

case <-currentDone:
currentDone = nil

case <-a.quit:
// Wait for current run to finish.
if currentDone != nil {
<-currentDone
}
return
}
}
}

func (a *asyncPricedList) handle(reheap bool, newOnes []*types.Transaction, toRemove int, baseFee *big.Int, finished chan struct{}) {
defer close(finished)
a.mu.Lock()
defer a.mu.Unlock()
// add new transactions to the priced list
for _, tx := range newOnes {
a.priced.Put(tx, false)
}
// remove staled transactions from the priced list
a.priced.Removed(toRemove)
// reheap if needed
if reheap {
a.priced.Reheap()
// set the lowest priced transaction when reheap is done
var emptyTx *types.Transaction = nil
if len(a.priced.floating.list) > 0 {
a.floatingLowest.Store(a.priced.floating.list[0])
} else {
a.floatingLowest.Store(emptyTx)
}
if len(a.priced.urgent.list) > 0 {
a.urgentLowest.Store(a.priced.urgent.list[0])
} else {
a.urgentLowest.Store(emptyTx)
}
}
if baseFee != nil {
a.baseFee.Store(baseFee)
a.priced.SetBaseFee(baseFee)
}
}

func (a *asyncPricedList) Staled() int {
// the Staled() of pricedList is thread-safe, so we don't need to lock here
return a.priced.Staled()
}

func (a *asyncPricedList) Put(tx *types.Transaction, local bool) {
a.add <- &addEvent{tx, local}
}

func (a *asyncPricedList) Removed(count int) {
a.remove <- count
}

func (a *asyncPricedList) Underpriced(tx *types.Transaction) bool {
var urgentLowest, floatingLowest *types.Transaction = nil, nil
ul, fl := a.urgentLowest.Load(), a.floatingLowest.Load()
if ul != nil {
// be careful that ul might be nil
urgentLowest = ul.(*types.Transaction)
}
if fl != nil {
// be careful that fl might be nil
floatingLowest = fl.(*types.Transaction)
}
a.mu.Lock()
defer a.mu.Unlock()
return (urgentLowest == nil || a.priced.urgent.cmp(urgentLowest, tx) >= 0) &&
(floatingLowest == nil || a.priced.floating.cmp(floatingLowest, tx) >= 0) &&
(floatingLowest != nil || urgentLowest != nil)
}

// Disacard cleans staled transactions to make room for new ones
func (a *asyncPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
a.mu.Lock()
defer a.mu.Unlock()
return a.priced.Discard(slots, force)
}

func (a *asyncPricedList) NeedReheap(currHead *types.Header) bool {
return false
}

func (a *asyncPricedList) Reheap() {
a.reheap <- struct{}{}
}

func (a *asyncPricedList) SetBaseFee(baseFee *big.Int) {
a.setBaseFee <- baseFee
a.reheap <- struct{}{}
}

func (a *asyncPricedList) SetHead(currHead *types.Header) {
//do nothing
}

func (a *asyncPricedList) GetBaseFee() *big.Int {
baseFee := a.baseFee.Load()
if baseFee == nil {
return big.NewInt(0)
}
return baseFee.(*big.Int)
}

func (a *asyncPricedList) Close() {
close(a.quit)
}

func (a *asyncPricedList) TxCount() int {
a.mu.Lock()
defer a.mu.Unlock()
return a.priced.TxCount()
}
31 changes: 16 additions & 15 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ type BlockChain interface {

// Config are the configuration parameters of the transaction pool.
type Config struct {
EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled
EnableAsyncPriced bool // enable async pricedlist. Set as true only --txpool.enableasyncpriced option is enabled
EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled

Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Expand Down Expand Up @@ -238,6 +239,9 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
if config.EnableAsyncPriced {
log.Info("Enabling async pricedlist")
}
// log to inform user if the cache is enabled or not
if conf.EnableCache {
log.Info("legacytxpool Pending Cache is enabled")
Expand Down Expand Up @@ -276,7 +280,7 @@ type LegacyPool struct {
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
priced pricedListInterface // All transactions sorted by price

pendingCounter int
queueCounter int
Expand Down Expand Up @@ -333,7 +337,11 @@ func New(config Config, chain BlockChain) *LegacyPool {
pool.locals.add(addr)
pool.pendingCache.markLocal(addr)
}
pool.priced = newPricedList(pool.all)
if config.EnableAsyncPriced {
pool.priced = newAsyncPricedList(pool.all)
} else {
pool.priced = newPricedList(pool.all)
}

if (!config.NoLocals || config.JournalRemote) && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
Expand Down Expand Up @@ -419,6 +427,7 @@ func (pool *LegacyPool) loop() {
defer evict.Stop()
defer journal.Stop()
defer reannounce.Stop()
defer pool.priced.Close()

// Notify tests that the init phase is done
close(pool.initDoneCh)
Expand All @@ -433,7 +442,7 @@ func (pool *LegacyPool) loop() {
pool.mu.RLock()
pending, queued := pool.stats()
pool.mu.RUnlock()
stales := int(pool.priced.stales.Load())
stales := pool.priced.Staled()

if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
Expand Down Expand Up @@ -882,16 +891,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
currHead := pool.currentHead.Load()
if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) {
if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) {
baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1)
pool.priced.SetBaseFee(baseFee)
}
pool.priced.Reheap()
pool.priced.currHead = currHead
}

// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
Expand Down Expand Up @@ -1509,11 +1508,13 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
demoteTimer.UpdateSince(t0)
var pendingBaseFee = pool.priced.urgent.baseFee
var pendingBaseFee = pool.priced.GetBaseFee()
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pool.priced.SetBaseFee(pendingBaseFee)
} else {
pool.priced.Reheap()
}
}
// Update all accounts to the latest known pending nonce
Expand Down
Loading

0 comments on commit 3474c47

Please sign in to comment.