diff --git a/mempool/mempool.go b/mempool/mempool.go index a3a7aa62c..6143a1f1b 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/cosmos/evm/mempool/txpool/locals" "sync" + "time" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/holiman/uint256" @@ -46,9 +48,10 @@ type ( vmKeeper VMKeeperI /** Mempools **/ - txPool *txpool.TxPool - legacyTxPool *legacypool.LegacyPool - cosmosPool sdkmempool.ExtMempool + txPool *txpool.TxPool + legacyTxPool *legacypool.LegacyPool + localTxTracker *locals.TxTracker + cosmosPool sdkmempool.ExtMempool /** Utils **/ logger log.Logger @@ -127,6 +130,7 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd // from queued into pending, noting their readiness to be executed. legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error { logger.Debug("broadcasting EVM transactions", "tx_count", len(txs)) + fmt.Println(clientCtx) return broadcastEVMTransactions(clientCtx, txConfig, txs) } } @@ -143,6 +147,21 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd panic("tx pool should contain only legacypool") } + var localTxTracker *locals.TxTracker + + if !legacyConfig.NoLocals { + rejournal := legacyConfig.Rejournal + if rejournal < time.Second { + logger.Debug("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) + rejournal = time.Second + } + localTxTracker = locals.New(legacyConfig.Journal, rejournal, blockchain.Config(), txPool) + err := localTxTracker.Start() + if err != nil { + return nil + } + } + // Create Cosmos Mempool from configuration cosmosPoolConfig := config.CosmosPoolConfig if cosmosPoolConfig == nil { @@ -174,18 +193,19 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd cosmosPool = sdkmempool.NewPriorityMempool(*cosmosPoolConfig) evmMempool := &ExperimentalEVMMempool{ - vmKeeper: vmKeeper, - txPool: txPool, - legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool), - cosmosPool: cosmosPool, - logger: logger, - txConfig: txConfig, - blockchain: blockchain, - bondDenom: bondDenom, - evmDenom: evmDenom, - blockGasLimit: config.BlockGasLimit, - minTip: config.MinTip, - anteHandler: config.AnteHandler, + vmKeeper: vmKeeper, + txPool: txPool, + legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool), + localTxTracker: localTxTracker, + cosmosPool: cosmosPool, + logger: logger, + txConfig: txConfig, + blockchain: blockchain, + bondDenom: bondDenom, + evmDenom: evmDenom, + blockGasLimit: config.BlockGasLimit, + minTip: config.MinTip, + anteHandler: config.AnteHandler, } vmKeeper.SetEvmMempool(evmMempool) @@ -303,6 +323,10 @@ func (m *ExperimentalEVMMempool) Remove(tx sdk.Tx) error { m.mtx.Lock() defer m.mtx.Unlock() + if m.blockchain.latestCtx.BlockHeight() == 0 { + return nil + } + m.logger.Debug("removing transaction from mempool") msg, err := m.getEVMMessage(tx) @@ -419,6 +443,10 @@ func (m *ExperimentalEVMMempool) Close() error { errs = append(errs, fmt.Errorf("failed to close txpool: %w", err)) } + if err := m.localTxTracker.Stop(); err != nil { + errs = append(errs, fmt.Errorf("failed to close localTxTracker: %w", err)) + } + return errors.Join(errs...) } diff --git a/mempool/txpool/locals/errors.go b/mempool/txpool/locals/errors.go new file mode 100644 index 000000000..fda50bf21 --- /dev/null +++ b/mempool/txpool/locals/errors.go @@ -0,0 +1,46 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "errors" + + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" +) + +// IsTemporaryReject determines whether the given error indicates a temporary +// reason to reject a transaction from being included in the txpool. The result +// may change if the txpool's state changes later. +func IsTemporaryReject(err error) bool { + switch { + case errors.Is(err, legacypool.ErrOutOfOrderTxFromDelegated): + return true + case errors.Is(err, txpool.ErrInflightTxLimitReached): + return true + case errors.Is(err, legacypool.ErrAuthorityReserved): + return true + case errors.Is(err, txpool.ErrUnderpriced): + return true + case errors.Is(err, legacypool.ErrTxPoolOverflow): + return true + case errors.Is(err, legacypool.ErrFutureReplacePending): + return true + default: + return false + } +} diff --git a/mempool/txpool/locals/journal.go b/mempool/txpool/locals/journal.go new file mode 100644 index 000000000..46fd6de34 --- /dev/null +++ b/mempool/txpool/locals/journal.go @@ -0,0 +1,186 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "errors" + "io" + "io/fs" + "os" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// errNoActiveJournal is returned if a transaction is attempted to be inserted +// into the journal, but no such file is currently open. +var errNoActiveJournal = errors.New("no active journal") + +// devNull is a WriteCloser that just discards anything written into it. Its +// goal is to allow the transaction journal to write into a fake journal when +// loading transactions on startup without printing warnings due to no file +// being read for write. +type devNull struct{} + +func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil } +func (*devNull) Close() error { return nil } + +// journal is a rotating log of transactions with the aim of storing locally +// created transactions to allow non-executed ones to survive node restarts. +type journal struct { + path string // Filesystem path to store the transactions at + writer io.WriteCloser // Output stream to write new transactions into +} + +// newTxJournal creates a new transaction journal to +func newTxJournal(path string) *journal { + return &journal{ + path: path, + } +} + +// load parses a transaction journal dump from disk, loading its contents into +// the specified pool. +func (journal *journal) load(add func([]*types.Transaction) []error) error { + // Open the journal for loading any past transactions + input, err := os.Open(journal.path) + if errors.Is(err, fs.ErrNotExist) { + // Skip the parsing if the journal file doesn't exist at all + return nil + } + if err != nil { + return err + } + defer input.Close() + + // Temporarily discard any journal additions (don't double add on load) + journal.writer = new(devNull) + defer func() { journal.writer = nil }() + + // Inject all transactions from the journal into the pool + stream := rlp.NewStream(input, 0) + total, dropped := 0, 0 + + // Create a method to load a limited batch of transactions and bump the + // appropriate progress counters. Then use this method to load all the + // journaled transactions in small-ish batches. + loadBatch := func(txs types.Transactions) { + for _, err := range add(txs) { + if err != nil { + log.Debug("Failed to add journaled transaction", "err", err) + dropped++ + } + } + } + var ( + failure error + batch types.Transactions + ) + for { + // Parse the next transaction and terminate on error + tx := new(types.Transaction) + if err = stream.Decode(tx); err != nil { + if err != io.EOF { + failure = err + } + if batch.Len() > 0 { + loadBatch(batch) + } + break + } + // New transaction parsed, queue up for later, import if threshold is reached + total++ + + if batch = append(batch, tx); batch.Len() > 1024 { + loadBatch(batch) + batch = batch[:0] + } + } + log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) + + return failure +} + +// insert adds the specified transaction to the local disk journal. +func (journal *journal) insert(tx *types.Transaction) error { + if journal.writer == nil { + return errNoActiveJournal + } + if err := rlp.Encode(journal.writer, tx); err != nil { + return err + } + return nil +} + +// rotate regenerates the transaction journal based on the current contents of +// the transaction pool. +func (journal *journal) rotate(all map[common.Address]types.Transactions) error { + // Close the current journal (if any is open) + if journal.writer != nil { + if err := journal.writer.Close(); err != nil { + return err + } + journal.writer = nil + } + // Generate a new journal with the contents of the current pool + replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + journaled := 0 + for _, txs := range all { + for _, tx := range txs { + if err = rlp.Encode(replacement, tx); err != nil { + replacement.Close() + return err + } + } + journaled += len(txs) + } + replacement.Close() + + // Replace the live journal with the newly generated one + if err = os.Rename(journal.path+".new", journal.path); err != nil { + return err + } + sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + journal.writer = sink + + logger := log.Info + if len(all) == 0 { + logger = log.Debug + } + logger("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all)) + + return nil +} + +// close flushes the transaction journal contents to disk and closes the file. +func (journal *journal) close() error { + var err error + + if journal.writer != nil { + err = journal.writer.Close() + journal.writer = nil + } + return err +} diff --git a/mempool/txpool/locals/tx_tracker.go b/mempool/txpool/locals/tx_tracker.go new file mode 100644 index 000000000..0f22b1c42 --- /dev/null +++ b/mempool/txpool/locals/tx_tracker.go @@ -0,0 +1,216 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package locals implements tracking for "local" transactions +package locals + +import ( + "github.com/cosmos/evm/mempool/txpool" + "github.com/cosmos/evm/mempool/txpool/legacypool" + "slices" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" +) + +var ( + recheckInterval = time.Minute + localGauge = metrics.GetOrRegisterGauge("txpool/local", nil) +) + +// TxTracker is a struct used to track priority transactions; it will check from +// time to time if the main pool has forgotten about any of the transaction +// it is tracking, and if so, submit it again. +// This is used to track 'locals'. +// This struct does not care about transaction validity, price-bumps or account limits, +// but optimistically accepts transactions. +type TxTracker struct { + all map[common.Hash]*types.Transaction // All tracked transactions + byAddr map[common.Address]*legacypool.SortedMap // Transactions by address + + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + shutdownCh chan struct{} + mu sync.Mutex + wg sync.WaitGroup +} + +// New creates a new TxTracker +func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { + pool := &TxTracker{ + all: make(map[common.Hash]*types.Transaction), + byAddr: make(map[common.Address]*legacypool.SortedMap), + signer: types.LatestSigner(chainConfig), + shutdownCh: make(chan struct{}), + pool: next, + } + if journalPath != "" { + pool.journal = newTxJournal(journalPath) + pool.rejournal = journalTime + } + return pool +} + +// Track adds a transaction to the tracked set. +// Note: blob-type transactions are ignored. +func (tracker *TxTracker) Track(tx *types.Transaction) { + tracker.TrackAll([]*types.Transaction{tx}) +} + +// TrackAll adds a list of transactions to the tracked set. +// Note: blob-type transactions are ignored. +func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + for _, tx := range txs { + if tx.Type() == types.BlobTxType { + continue + } + // If we're already tracking it, it's a no-op + if _, ok := tracker.all[tx.Hash()]; ok { + continue + } + // Theoretically, checking the error here is unnecessary since sender recovery + // is already part of basic validation. However, retrieving the sender address + // from the transaction cache is effectively a no-op if it was previously verified. + // Therefore, the error is still checked just in case. + addr, err := types.Sender(tracker.signer, tx) + if err != nil { + continue + } + tracker.all[tx.Hash()] = tx + if tracker.byAddr[addr] == nil { + tracker.byAddr[addr] = legacypool.NewSortedMap() + } + tracker.byAddr[addr].Put(tx) + + if tracker.journal != nil { + _ = tracker.journal.insert(tx) + } + } + localGauge.Update(int64(len(tracker.all))) +} + +// recheck checks and returns any transactions that needs to be resubmitted. +func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + var ( + numStales = 0 + numOk = 0 + ) + for sender, txs := range tracker.byAddr { + // Wipe the stales + stales := txs.Forward(tracker.pool.Nonce(sender)) + for _, tx := range stales { + delete(tracker.all, tx.Hash()) + } + numStales += len(stales) + + // Check the non-stale + for _, tx := range txs.Flatten() { + if tracker.pool.Has(tx.Hash()) { + numOk++ + continue + } + resubmits = append(resubmits, tx) + } + } + + if journalCheck { // rejournal + rejournal = make(map[common.Address]types.Transactions) + for _, tx := range tracker.all { + addr, _ := types.Sender(tracker.signer, tx) + rejournal[addr] = append(rejournal[addr], tx) + } + // Sort them + for _, list := range rejournal { + // cmp(a, b) should return a negative number when a < b, + slices.SortFunc(list, func(a, b *types.Transaction) int { + return int(a.Nonce() - b.Nonce()) + }) + } + } + localGauge.Update(int64(len(tracker.all))) + log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) + return resubmits, rejournal +} + +// Start implements node.Lifecycle interface +// Start is called after all services have been constructed and the networking +// layer was also initialized to spawn any goroutines required by the service. +func (tracker *TxTracker) Start() error { + tracker.wg.Add(1) + go tracker.loop() + return nil +} + +// Stop implements node.Lifecycle interface +// Stop terminates all goroutines belonging to the service, blocking until they +// are all terminated. +func (tracker *TxTracker) Stop() error { + close(tracker.shutdownCh) + tracker.wg.Wait() + return nil +} + +func (tracker *TxTracker) loop() { + defer tracker.wg.Done() + + if tracker.journal != nil { + tracker.journal.load(func(transactions []*types.Transaction) []error { + tracker.TrackAll(transactions) + return nil + }) + defer tracker.journal.close() + } + var ( + lastJournal = time.Now() + timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. + ) + for { + select { + case <-tracker.shutdownCh: + return + case <-timer.C: + checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal + resubmits, rejournal := tracker.recheck(checkJournal) + if len(resubmits) > 0 { + tracker.pool.Add(resubmits, false) + } + if checkJournal { + // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts + tracker.mu.Lock() + lastJournal = time.Now() + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + tracker.mu.Unlock() + } + timer.Reset(recheckInterval) + } + } +} diff --git a/mempool/txpool/locals/tx_tracker_test.go b/mempool/txpool/locals/tx_tracker_test.go new file mode 100644 index 000000000..367fb6b6d --- /dev/null +++ b/mempool/txpool/locals/tx_tracker_test.go @@ -0,0 +1,165 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/params" +) + +var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + gspec = &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{ + address: {Balance: funds}, + }, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer = types.LatestSigner(gspec.Config) +) + +type testEnv struct { + chain *core.BlockChain + pool *txpool.TxPool + tracker *TxTracker + genDb ethdb.Database +} + +func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { + genDb, blocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), n, func(i int, gen *core.BlockGen) { + tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) + if err != nil { + panic(err) + } + gen.AddTx(tx) + }) + + db := rawdb.NewMemoryDatabase() + chain, _ := core.NewBlockChain(db, gspec, ethash.NewFaker(), nil) + + legacyPool := legacypool.New(legacypool.DefaultConfig, chain) + pool, err := txpool.New(gasTip, chain, []txpool.SubPool{legacyPool}) + if err != nil { + t.Fatalf("Failed to create tx pool: %v", err) + } + if n, err := chain.InsertChain(blocks); err != nil { + t.Fatalf("Failed to process block %d: %v", n, err) + } + if err := pool.Sync(); err != nil { + t.Fatalf("Failed to sync the txpool, %v", err) + } + return &testEnv{ + chain: chain, + pool: pool, + tracker: New(journal, time.Minute, gspec.Config, pool), + genDb: genDb, + } +} + +func (env *testEnv) close() { + env.chain.Stop() +} + +// nolint:unused +func (env *testEnv) setGasTip(gasTip uint64) { + env.pool.SetGasTip(new(big.Int).SetUint64(gasTip)) +} + +// nolint:unused +func (env *testEnv) makeTx(nonce uint64, gasPrice *big.Int) *types.Transaction { + if nonce == 0 { + head := env.chain.CurrentHeader() + state, _ := env.chain.StateAt(head.Root) + nonce = state.GetNonce(address) + } + if gasPrice == nil { + gasPrice = big.NewInt(params.GWei) + } + tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{0x00}, big.NewInt(1000), params.TxGas, gasPrice, nil), signer, key) + return tx +} + +func (env *testEnv) makeTxs(n int) []*types.Transaction { + head := env.chain.CurrentHeader() + state, _ := env.chain.StateAt(head.Root) + nonce := state.GetNonce(address) + + var txs []*types.Transaction + for i := 0; i < n; i++ { + tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(i), common.Address{0x00}, big.NewInt(1000), params.TxGas, big.NewInt(params.GWei), nil), signer, key) + txs = append(txs, tx) + } + return txs +} + +// nolint:unused +func (env *testEnv) commit() { + head := env.chain.CurrentBlock() + block := env.chain.GetBlock(head.Hash(), head.Number.Uint64()) + blocks, _ := core.GenerateChain(env.chain.Config(), block, ethash.NewFaker(), env.genDb, 1, func(i int, gen *core.BlockGen) { + tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) + if err != nil { + panic(err) + } + gen.AddTx(tx) + }) + env.chain.InsertChain(blocks) + if err := env.pool.Sync(); err != nil { + panic(err) + } +} + +func TestResubmit(t *testing.T) { + env := newTestEnv(t, 10, 0, "") + defer env.close() + + txs := env.makeTxs(10) + txsA := txs[:len(txs)/2] + txsB := txs[len(txs)/2:] + env.pool.Add(txsA, true) + pending, queued := env.pool.ContentFrom(address) + if len(pending) != len(txsA) || len(queued) != 0 { + t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued)) + } + env.tracker.TrackAll(txs) + + resubmit, all := env.tracker.recheck(true) + if len(resubmit) != len(txsB) { + t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB)) + } + if len(all) == 0 || len(all[address]) == 0 { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs)) + } + if len(all[address]) != len(txs) { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs)) + } +}