Skip to content

Commit

Permalink
chore: txpool optimization metrics (bnb-chain#247)
Browse files Browse the repository at this point in the history
Co-authored-by: andyzhang2023 <[email protected]>
  • Loading branch information
andyzhang2023 and andyzhang2023 authored Jan 10, 2025
1 parent 3474c47 commit 1d7cb0d
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 46 deletions.
103 changes: 74 additions & 29 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ var (
// demote metrics
// demoteDuration measures how long time a demotion takes.
demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil)
promoteTxMeter = metrics.NewRegisteredMeter("txpool/promote/tx", nil)
sendFeedTxMeter = metrics.NewRegisteredMeter("txpool/sendfeed/tx", nil)
resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote

// mutex latency metrics
Expand All @@ -123,16 +125,27 @@ var (
journalMutexTimer = metrics.NewRegisteredTimer("txpool/mutex/journal/duration", nil)

// latency of add() method
addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil)
addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil)
addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil)
addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil)
addWaitLockTimer = metrics.NewRegisteredTimer("txpool/locked/waittime", nil)
validateBasicTimer = metrics.NewRegisteredTimer("txpool/validate/basic", nil)

// reset detail metrics
resetCount = metrics.NewRegisteredCounter("txpool/reset/count", nil)
resetTimer = metrics.NewRegisteredTimer("txpool/reset/time", nil)
resetWaitLockTimer = metrics.NewRegisteredTimer("txpool/reset/wait/time", nil)
resetResetTimer = metrics.NewRegisteredTimer("txpool/reset/reset/time", nil)
resetPromoteTimer = metrics.NewRegisteredTimer("txpool/reset/promote/time", nil)
resetDemoteTimer = metrics.NewRegisteredTimer("txpool/reset/demote/time", nil)
resetReheapTimer = metrics.NewRegisteredTimer("txpool/reset/reheap/time", nil)
resetFeedTimer = metrics.NewRegisteredTimer("txpool/reset/feed/time", nil)

// reorg detail metrics
resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil)
promoteTimer = metrics.NewRegisteredTimer("txpool/promotetime", nil)
demoteTimer = metrics.NewRegisteredTimer("txpool/demotetime", nil)
reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil)
truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil)
reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil)
reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil)
reorgTimer = metrics.NewRegisteredTimer("txpool/reorg/time", nil)
reorgWaitLockTimer = metrics.NewRegisteredTimer("txpool/reorg/wait/time", nil)
reorgPromoteTimer = metrics.NewRegisteredTimer("txpool/reorg/promote/time", nil)
reorgFeedTimer = metrics.NewRegisteredTimer("txpool/reorg/feed/time", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -1162,11 +1175,12 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
start := time.Now()
defer func(t0 time.Time) {
if len(txs) > 0 {
addTimer.Update(time.Since(t0) / time.Duration(len(txs)))
}
}(time.Now())
}(start)
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals

Expand Down Expand Up @@ -1199,7 +1213,10 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
}

// Process all the new transaction and merge any errors into the original slice
validateBasicTimer.UpdateSince(start)
tw := time.Now()
pool.mu.Lock()
addWaitLockTimer.UpdateSince(tw)
t0 := time.Now()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
if len(news) > 0 {
Expand Down Expand Up @@ -1458,15 +1475,41 @@ func (pool *LegacyPool) scheduleReorgLoop() {

// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) {
var reorgCost, waittime, resetDur, demoteDur, reheapDur, promoteDur, sendfeedDur time.Duration
var t0 time.Time
var promoted []*types.Transaction
var demoted, sendFeed int
var start = time.Now()
defer func(t0 time.Time) {
reorgDurationTimer.Update(time.Since(t0))
reorgCost = time.Since(t0)
demoteTxMeter.Mark(int64(demoted))
promoteTxMeter.Mark(int64(len(promoted)))
sendFeedTxMeter.Mark(int64(sendFeed))

if reset != nil {
reorgresetTimer.UpdateSince(t0)
if reset.newHead != nil {
log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64())
resetCount.Inc(1)
resetTimer.Update(reorgCost)
resetWaitLockTimer.Update(waittime)
resetResetTimer.Update(resetDur)
resetDemoteTimer.Update(demoteDur)
resetReheapTimer.Update(reheapDur)
resetPromoteTimer.Update(promoteDur)
resetFeedTimer.Update(sendfeedDur)

pending, queued := pool.stats()
if reset.newHead != nil && reset.oldHead != nil {
log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64(),
"reorgCost", reorgCost, "waittime", waittime, "promoted", len(promoted), "demoted", demoted, "sendFeed", sendFeed, "pending", pending, "queued", queued,
"resetDur", resetDur, "demoteDur", demoteDur, "reheapDur", reheapDur, "promoteDur", promoteDur, "sendfeedDur", sendfeedDur)
}
} else {
reorgCount.Inc(1)
reorgTimer.Update(reorgCost)
reorgWaitLockTimer.Update(waittime)
reorgPromoteTimer.Update(promoteDur)
reorgFeedTimer.Update(sendfeedDur)
}
}(time.Now())
}(start)
defer close(done)

var promoteAddrs, demoteAddrs []common.Address
Expand All @@ -1476,12 +1519,12 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
// the flatten operation can be avoided.
promoteAddrs = dirtyAccounts.flatten()
}
tw := time.Now()
pool.mu.Lock()
tl, t0 := time.Now(), time.Now()
waittime, t0 = time.Since(tw), time.Now()
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
demoteAddrs = pool.reset(reset.oldHead, reset.newHead)
resetTimer.UpdateSince(t0)

// Nonces were reset, discard any events that became stale
for addr := range events {
Expand All @@ -1496,18 +1539,18 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
promoteAddrs = append(promoteAddrs, addr)
}
}
resetDur = time.Since(t0)
// Check for pending transactions for every account that sent new ones
t0 = time.Now()
promoted := pool.promoteExecutables(promoteAddrs)
promoteTimer.UpdateSince(t0)
promoted = pool.promoteExecutables(promoteAddrs)
promoteDur, t0 = time.Since(t0), time.Now()

// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
t0 = time.Now()
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
demoteTimer.UpdateSince(t0)
demoted = pool.demoteUnexecutables(demoteAddrs)
demoteDur, t0 = time.Since(t0), time.Now()
var pendingBaseFee = pool.priced.GetBaseFee()
if reset.newHead != nil {
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
Expand All @@ -1524,17 +1567,16 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
nonces[addr] = highestPending.Nonce() + 1
}
pool.pendingNonces.setAll(nonces)
reheapDur, t0 = time.Since(t0), time.Now()
}
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
t0 = time.Now()
pool.truncatePending()
pool.truncateQueue()
truncateTimer.UpdateSince(t0)

dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
pool.changesSinceReorg = 0 // Reset change counter
reorgresetNoblockingTimer.UpdateSince(tl)
pool.mu.Unlock()
t0 = time.Now()

// Notify subsystems for newly added transactions
for _, tx := range promoted {
Expand All @@ -1550,7 +1592,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
sendFeed = len(txs)
}
sendfeedDur = time.Since(t0)
}

// reset retrieves the current state of the blockchain and ensures the content
Expand Down Expand Up @@ -1666,14 +1710,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com
}
}
resetDepthMeter.Mark(int64(depth))
log.Info("reset block depth", "depth", depth)
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
log.Error("Failed to reset txpool state", "err", err, "depth", depth)
return
}
pool.currentHead.Store(newHead)
Expand All @@ -1687,7 +1730,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com
}

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
log.Debug("reset state of txpool", "reinject", len(reinject), "depth", depth)
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
return
Expand Down Expand Up @@ -1925,14 +1968,14 @@ func (pool *LegacyPool) truncateQueue() {
// Note: transactions are not marked as removed in the priced list because re-heaping
// is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful
// to trigger a re-heap is this function
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) int {
var demoteTxNum = 0
if demoteAddrs == nil {
demoteAddrs = make([]common.Address, 0, len(pool.pending))
for addr := range pool.pending {
demoteAddrs = append(demoteAddrs, addr)
}
}
demoteTxMeter.Mark(int64(len(demoteAddrs)))

var removed = 0
// Iterate over all accounts and demote any non-executable transactions
Expand Down Expand Up @@ -2001,8 +2044,10 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
}
pool.pendingCache.del(dropPendingCache, pool.signer)
removed += len(dropPendingCache)
demoteTxNum += len(dropPendingCache)
}
pool.priced.Removed(removed)
return demoteTxNum
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
Expand Down
8 changes: 4 additions & 4 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
duplicate++
case f.isKnownUnderpriced(hash):
underpriced++
log.Info("announced transaction is underpriced", "hash", hash.String())
log.Trace("announced transaction is underpriced", "hash", hash.String())
default:
unknownHashes = append(unknownHashes, hash)
if types == nil {
Expand Down Expand Up @@ -431,13 +431,13 @@ func (f *TxFetcher) loop() {
// check. Should be fine as the limit is in the thousands and the
// request size in the hundreds.
txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes))
log.Debug("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes))
break
}
want := used + len(ann.hashes)
if want > maxTxAnnounces {
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes))
log.Debug("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes))
ann.hashes = ann.hashes[:want-maxTxAnnounces]
ann.metas = ann.metas[:want-maxTxAnnounces]
}
Expand Down Expand Up @@ -556,7 +556,7 @@ func (f *TxFetcher) loop() {
for peer, req := range f.requests {
if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
txRequestTimeoutMeter.Mark(int64(len(req.hashes)))
log.Info("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes))
log.Debug("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes))

// Reschedule all the not-yet-delivered fetches to alternate peers
for _, hash := range req.hashes {
Expand Down
10 changes: 1 addition & 9 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,31 +688,23 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
// Send the tx unconditionally to a subset of our peers
for _, peer := range direct {
txset[peer] = append(txset[peer], tx.Hash())
log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash())
}
// For the remaining peers, send announcement only
for _, peer := range announce {
annos[peer] = append(annos[peer], tx.Hash())
log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash())
}
}
for peer, hashes := range txset {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
log.Trace("Transaction broadcast bodies", "txs", len(hashes),
"peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(),
)
}
for peer, hashes := range annos {
annPeers++
annCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
log.Trace("Transaction broadcast hashes", "txs", len(hashes),
"peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(),
)
}
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
log.Trace("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount)
}

Expand Down
6 changes: 3 additions & 3 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (p *Peer) broadcastTransactions() {
return
}
close(done)
p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String(), "hashes", concat(collectHashes(txs)))
p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String())
}()
}
}
Expand All @@ -129,7 +129,7 @@ func (p *Peer) broadcastTransactions() {
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
if len(queue) > maxQueuedTxs {
p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs, "hashes", concat(queue[:len(queue)-maxQueuedTxs]))
p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs)
txBroadcastAbandonMeter.Mark(int64(len(queue) - maxQueuedTxs))
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
Expand Down Expand Up @@ -192,7 +192,7 @@ func (p *Peer) announceTransactions() {
return
}
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String(), "hashes", concat(pending))
p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String())
}()
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ

// RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
p.Log().Trace("Fetching batch of transactions", "count", len(hashes))
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
Expand Down

0 comments on commit 1d7cb0d

Please sign in to comment.