From 1d7cb0dab85a0af88cafcb34a76cd8cee2eded65 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 10 Jan 2025 10:43:58 +0800 Subject: [PATCH] chore: txpool optimization metrics (#247) Co-authored-by: andyzhang2023 --- core/txpool/legacypool/legacypool.go | 103 +++++++++++++++++++-------- eth/fetcher/tx_fetcher.go | 8 +-- eth/handler.go | 10 +-- eth/protocols/eth/broadcast.go | 6 +- eth/protocols/eth/peer.go | 2 +- 5 files changed, 83 insertions(+), 46 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index c82b2eeb73..740ffffd15 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -115,6 +115,8 @@ var ( // demote metrics // demoteDuration measures how long time a demotion takes. demoteTxMeter = metrics.NewRegisteredMeter("txpool/demote/tx", nil) + promoteTxMeter = metrics.NewRegisteredMeter("txpool/promote/tx", nil) + sendFeedTxMeter = metrics.NewRegisteredMeter("txpool/sendfeed/tx", nil) resetDepthMeter = metrics.NewRegisteredMeter("txpool/reset/depth", nil) //reorg depth of blocks which causes demote // mutex latency metrics @@ -123,16 +125,27 @@ var ( journalMutexTimer = metrics.NewRegisteredTimer("txpool/mutex/journal/duration", nil) // latency of add() method - addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil) - addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil) + addTimer = metrics.NewRegisteredTimer("txpool/addtime", nil) + addWithLockTimer = metrics.NewRegisteredTimer("txpool/locked/addtime", nil) + addWaitLockTimer = metrics.NewRegisteredTimer("txpool/locked/waittime", nil) + validateBasicTimer = metrics.NewRegisteredTimer("txpool/validate/basic", nil) + + // reset detail metrics + resetCount = metrics.NewRegisteredCounter("txpool/reset/count", nil) + resetTimer = metrics.NewRegisteredTimer("txpool/reset/time", nil) + resetWaitLockTimer = metrics.NewRegisteredTimer("txpool/reset/wait/time", nil) + resetResetTimer = metrics.NewRegisteredTimer("txpool/reset/reset/time", nil) + resetPromoteTimer = metrics.NewRegisteredTimer("txpool/reset/promote/time", nil) + resetDemoteTimer = metrics.NewRegisteredTimer("txpool/reset/demote/time", nil) + resetReheapTimer = metrics.NewRegisteredTimer("txpool/reset/reheap/time", nil) + resetFeedTimer = metrics.NewRegisteredTimer("txpool/reset/feed/time", nil) // reorg detail metrics - resetTimer = metrics.NewRegisteredTimer("txpool/resettime", nil) - promoteTimer = metrics.NewRegisteredTimer("txpool/promotetime", nil) - demoteTimer = metrics.NewRegisteredTimer("txpool/demotetime", nil) - reorgresetTimer = metrics.NewRegisteredTimer("txpool/reorgresettime", nil) - truncateTimer = metrics.NewRegisteredTimer("txpool/truncatetime", nil) - reorgresetNoblockingTimer = metrics.NewRegisteredTimer("txpool/noblocking/reorgresettime", nil) + reorgCount = metrics.NewRegisteredCounter("txpool/reorg/count", nil) + reorgTimer = metrics.NewRegisteredTimer("txpool/reorg/time", nil) + reorgWaitLockTimer = metrics.NewRegisteredTimer("txpool/reorg/wait/time", nil) + reorgPromoteTimer = metrics.NewRegisteredTimer("txpool/reorg/promote/time", nil) + reorgFeedTimer = metrics.NewRegisteredTimer("txpool/reorg/feed/time", nil) ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -1162,11 +1175,12 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { // If sync is set, the method will block until all internal maintenance related // to the add is finished. Only use this during tests for determinism! func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { + start := time.Now() defer func(t0 time.Time) { if len(txs) > 0 { addTimer.Update(time.Since(t0) / time.Duration(len(txs))) } - }(time.Now()) + }(start) // Do not treat as local if local transactions have been disabled local = local && !pool.config.NoLocals @@ -1199,7 +1213,10 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error } // Process all the new transaction and merge any errors into the original slice + validateBasicTimer.UpdateSince(start) + tw := time.Now() pool.mu.Lock() + addWaitLockTimer.UpdateSince(tw) t0 := time.Now() newErrs, dirtyAddrs := pool.addTxsLocked(news, local) if len(news) > 0 { @@ -1458,15 +1475,41 @@ func (pool *LegacyPool) scheduleReorgLoop() { // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) { + var reorgCost, waittime, resetDur, demoteDur, reheapDur, promoteDur, sendfeedDur time.Duration + var t0 time.Time + var promoted []*types.Transaction + var demoted, sendFeed int + var start = time.Now() defer func(t0 time.Time) { - reorgDurationTimer.Update(time.Since(t0)) + reorgCost = time.Since(t0) + demoteTxMeter.Mark(int64(demoted)) + promoteTxMeter.Mark(int64(len(promoted))) + sendFeedTxMeter.Mark(int64(sendFeed)) + if reset != nil { - reorgresetTimer.UpdateSince(t0) - if reset.newHead != nil { - log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64()) + resetCount.Inc(1) + resetTimer.Update(reorgCost) + resetWaitLockTimer.Update(waittime) + resetResetTimer.Update(resetDur) + resetDemoteTimer.Update(demoteDur) + resetReheapTimer.Update(reheapDur) + resetPromoteTimer.Update(promoteDur) + resetFeedTimer.Update(sendfeedDur) + + pending, queued := pool.stats() + if reset.newHead != nil && reset.oldHead != nil { + log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64(), + "reorgCost", reorgCost, "waittime", waittime, "promoted", len(promoted), "demoted", demoted, "sendFeed", sendFeed, "pending", pending, "queued", queued, + "resetDur", resetDur, "demoteDur", demoteDur, "reheapDur", reheapDur, "promoteDur", promoteDur, "sendfeedDur", sendfeedDur) } + } else { + reorgCount.Inc(1) + reorgTimer.Update(reorgCost) + reorgWaitLockTimer.Update(waittime) + reorgPromoteTimer.Update(promoteDur) + reorgFeedTimer.Update(sendfeedDur) } - }(time.Now()) + }(start) defer close(done) var promoteAddrs, demoteAddrs []common.Address @@ -1476,12 +1519,12 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, // the flatten operation can be avoided. promoteAddrs = dirtyAccounts.flatten() } + tw := time.Now() pool.mu.Lock() - tl, t0 := time.Now(), time.Now() + waittime, t0 = time.Since(tw), time.Now() if reset != nil { // Reset from the old head to the new, rescheduling any reorged transactions demoteAddrs = pool.reset(reset.oldHead, reset.newHead) - resetTimer.UpdateSince(t0) // Nonces were reset, discard any events that became stale for addr := range events { @@ -1496,18 +1539,18 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, promoteAddrs = append(promoteAddrs, addr) } } + resetDur = time.Since(t0) // Check for pending transactions for every account that sent new ones t0 = time.Now() - promoted := pool.promoteExecutables(promoteAddrs) - promoteTimer.UpdateSince(t0) + promoted = pool.promoteExecutables(promoteAddrs) + promoteDur, t0 = time.Since(t0), time.Now() // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). - t0 = time.Now() if reset != nil { - pool.demoteUnexecutables(demoteAddrs) - demoteTimer.UpdateSince(t0) + demoted = pool.demoteUnexecutables(demoteAddrs) + demoteDur, t0 = time.Since(t0), time.Now() var pendingBaseFee = pool.priced.GetBaseFee() if reset.newHead != nil { if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { @@ -1524,17 +1567,16 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, nonces[addr] = highestPending.Nonce() + 1 } pool.pendingNonces.setAll(nonces) + reheapDur, t0 = time.Since(t0), time.Now() } // Ensure pool.queue and pool.pending sizes stay within the configured limits. - t0 = time.Now() pool.truncatePending() pool.truncateQueue() - truncateTimer.UpdateSince(t0) dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg)) pool.changesSinceReorg = 0 // Reset change counter - reorgresetNoblockingTimer.UpdateSince(tl) pool.mu.Unlock() + t0 = time.Now() // Notify subsystems for newly added transactions for _, tx := range promoted { @@ -1550,7 +1592,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, txs = append(txs, set.Flatten()...) } pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) + sendFeed = len(txs) } + sendfeedDur = time.Since(t0) } // reset retrieves the current state of the blockchain and ensures the content @@ -1666,14 +1710,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com } } resetDepthMeter.Mark(int64(depth)) - log.Info("reset block depth", "depth", depth) // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock() // Special case during testing } statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { - log.Error("Failed to reset txpool state", "err", err) + log.Error("Failed to reset txpool state", "err", err, "depth", depth) return } pool.currentHead.Store(newHead) @@ -1687,7 +1730,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) (demoteAddrs []com } // Inject any transactions discarded due to reorgs - log.Debug("Reinjecting stale transactions", "count", len(reinject)) + log.Debug("reset state of txpool", "reinject", len(reinject), "depth", depth) core.SenderCacher.Recover(pool.signer, reinject) pool.addTxsLocked(reinject, false) return @@ -1925,14 +1968,14 @@ func (pool *LegacyPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { +func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) int { + var demoteTxNum = 0 if demoteAddrs == nil { demoteAddrs = make([]common.Address, 0, len(pool.pending)) for addr := range pool.pending { demoteAddrs = append(demoteAddrs, addr) } } - demoteTxMeter.Mark(int64(len(demoteAddrs))) var removed = 0 // Iterate over all accounts and demote any non-executable transactions @@ -2001,8 +2044,10 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) { } pool.pendingCache.del(dropPendingCache, pool.signer) removed += len(dropPendingCache) + demoteTxNum += len(dropPendingCache) } pool.priced.Removed(removed) + return demoteTxNum } // addressByHeartbeat is an account address tagged with its last activity timestamp. diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index e24cbaf25e..cd7ed32c29 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -257,7 +257,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c duplicate++ case f.isKnownUnderpriced(hash): underpriced++ - log.Info("announced transaction is underpriced", "hash", hash.String()) + log.Trace("announced transaction is underpriced", "hash", hash.String()) default: unknownHashes = append(unknownHashes, hash) if types == nil { @@ -431,13 +431,13 @@ func (f *TxFetcher) loop() { // check. Should be fine as the limit is in the thousands and the // request size in the hundreds. txAnnounceDOSMeter.Mark(int64(len(ann.hashes))) - log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes)) + log.Debug("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes)) break } want := used + len(ann.hashes) if want > maxTxAnnounces { txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces)) - log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes)) + log.Debug("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes)) ann.hashes = ann.hashes[:want-maxTxAnnounces] ann.metas = ann.metas[:want-maxTxAnnounces] } @@ -556,7 +556,7 @@ func (f *TxFetcher) loop() { for peer, req := range f.requests { if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout { txRequestTimeoutMeter.Mark(int64(len(req.hashes))) - log.Info("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes)) + log.Debug("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes)) // Reschedule all the not-yet-delivered fetches to alternate peers for _, hash := range req.hashes { diff --git a/eth/handler.go b/eth/handler.go index bcb56d5b1e..b138b66bb6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -688,31 +688,23 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { // Send the tx unconditionally to a subset of our peers for _, peer := range direct { txset[peer] = append(txset[peer], tx.Hash()) - log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only for _, peer := range announce { annos[peer] = append(annos[peer], tx.Hash()) - log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } } for peer, hashes := range txset { directPeers++ directCount += len(hashes) peer.AsyncSendTransactions(hashes) - log.Trace("Transaction broadcast bodies", "txs", len(hashes), - "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), - ) } for peer, hashes := range annos { annPeers++ annCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) - log.Trace("Transaction broadcast hashes", "txs", len(hashes), - "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), - ) } - log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs, + log.Trace("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs, "bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount) } diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 89c848a429..1fb4d92d8b 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -115,7 +115,7 @@ func (p *Peer) broadcastTransactions() { return } close(done) - p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String(), "hashes", concat(collectHashes(txs))) + p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String()) }() } } @@ -129,7 +129,7 @@ func (p *Peer) broadcastTransactions() { // New batch of transactions to be broadcast, queue them (with cap) queue = append(queue, hashes...) if len(queue) > maxQueuedTxs { - p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs, "hashes", concat(queue[:len(queue)-maxQueuedTxs])) + p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs) txBroadcastAbandonMeter.Mark(int64(len(queue) - maxQueuedTxs)) // Fancy copy and resize to ensure buffer doesn't grow indefinitely queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] @@ -192,7 +192,7 @@ func (p *Peer) announceTransactions() { return } close(done) - p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String(), "hashes", concat(pending)) + p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String()) }() } } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index fc7c2a18ea..c597e247fe 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -450,7 +450,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ // RequestTxs fetches a batch of transactions from a remote node. func (p *Peer) RequestTxs(hashes []common.Hash) error { - p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) + p.Log().Trace("Fetching batch of transactions", "count", len(hashes)) id := rand.Uint64() requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)