From ad9cced53857e47f70a0af6bf3a83cd9d4559f76 Mon Sep 17 00:00:00 2001 From: Bharath Date: Thu, 9 May 2024 20:04:20 +0530 Subject: [PATCH] remove txs which do not end up in a block out of the mempool --- core/txpool/blobpool/blobpool.go | 8 +- core/txpool/legacypool/legacypool.go | 40 ++++--- core/txpool/subpool.go | 2 + core/txpool/txpool.go | 17 +++ miner/ordering_test.go | 2 +- miner/payload_building.go | 9 +- miner/payload_building_test.go | 157 +++++++++++++++++++++++---- miner/worker.go | 16 ++- miner/worker_test.go | 1 - 9 files changed, 201 insertions(+), 51 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index b569657bea..f369e1f717 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -334,9 +334,11 @@ func New(config Config, chain BlockChain) *BlobPool { } } -func (p *BlobPool) SetAstriaOrdered(types.Transactions) {} -func (p *BlobPool) ClearAstriaOrdered() {} -func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} } +func (p *BlobPool) SetAstriaOrdered(types.Transactions) {} +func (p *BlobPool) ClearAstriaOrdered() {} +func (p *BlobPool) UpdateAstriaInvalid(*types.Transaction) {} +func (p *BlobPool) AstriaInvalid() *types.Transactions { return &types.Transactions{} } +func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} } // Filter returns whether the given transaction can be consumed by the blob pool. func (p *BlobPool) Filter(tx *types.Transaction) bool { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 24dd320658..77fa8dfdd4 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -103,7 +103,7 @@ var ( // Metrics related to the astria ordered txs astriaValidMeter = metrics.GetOrRegisterMeter("astria/txpool/valid", nil) - astriaParsedMeter = metrics.GetOrRegisterMeter("astria/txpool/parsed", nil) + astriaInvalidMeter = metrics.GetOrRegisterMeter("astria/txpool/invalid", nil) astriaRequestedMeter = metrics.GetOrRegisterMeter("astria/txpool/requested", nil) ) @@ -280,32 +280,30 @@ func New(config Config, chain BlockChain) *LegacyPool { } type astriaOrdered struct { - valid types.Transactions - parsed types.Transactions - pool *LegacyPool + valid types.Transactions + invalid types.Transactions + pool *LegacyPool } -func newAstriaOrdered(valid types.Transactions, parsed types.Transactions, pool *LegacyPool) *astriaOrdered { - astriaParsedMeter.Mark(int64(len(parsed))) +func newAstriaOrdered(valid types.Transactions, pool *LegacyPool) *astriaOrdered { astriaValidMeter.Mark(int64(len(valid))) return &astriaOrdered{ - valid: valid, - parsed: parsed, - pool: pool, + valid: valid, + invalid: types.Transactions{}, + pool: pool, } } func (ao *astriaOrdered) clear() { ao.valid = types.Transactions{} - ao.parsed = types.Transactions{} + ao.invalid = types.Transactions{} } func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) { astriaRequestedMeter.Mark(int64(len(txs))) valid := []*types.Transaction{} - parsed := []*types.Transaction{} for idx, tx := range txs { err := pool.validateTxBasics(tx, false) if err != nil { @@ -316,7 +314,22 @@ func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) { valid = append(valid, tx) } - pool.astria = newAstriaOrdered(types.Transactions(valid), types.Transactions(parsed), pool) + pool.astria = newAstriaOrdered(valid, pool) +} + +func (pool *LegacyPool) UpdateAstriaInvalid(tx *types.Transaction) { + if pool.astria.invalid == nil { + pool.astria.invalid = types.Transactions{tx} + } + + pool.astria.invalid = append(pool.astria.invalid, tx) +} + +func (pool *LegacyPool) AstriaInvalid() *types.Transactions { + if pool.astria == nil { + return &types.Transactions{} + } + return &pool.astria.invalid } func (pool *LegacyPool) ClearAstriaOrdered() { @@ -324,7 +337,8 @@ func (pool *LegacyPool) ClearAstriaOrdered() { return } - for _, tx := range pool.astria.parsed { + astriaInvalidMeter.Mark(int64(len(pool.astria.invalid))) + for _, tx := range pool.astria.invalid { pool.removeTx(tx.Hash(), false, true) } diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 47393465d2..a6661a1d95 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -140,5 +140,7 @@ type SubPool interface { SetAstriaOrdered(types.Transactions) ClearAstriaOrdered() + UpdateAstriaInvalid(tx *types.Transaction) + AstriaInvalid() *types.Transactions AstriaOrdered() *types.Transactions } diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 89ced56194..38cf0f103a 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -273,6 +273,23 @@ func (p *TxPool) ClearAstriaOrdered() { } } +func (p *TxPool) UpdateAstriaInvalid(tx *types.Transaction) { + for _, subpool := range p.subpools { + subpool.UpdateAstriaInvalid(tx) + } +} + +func (p *TxPool) AstriaInvalid() *types.Transactions { + txs := types.Transactions{} + + for _, subpool := range p.subpools { + subpoolTxs := subpool.AstriaInvalid() + txs = append(txs, *subpoolTxs...) + } + + return &txs +} + func (p *TxPool) AstriaOrdered() *types.Transactions { txs := types.Transactions{} diff --git a/miner/ordering_test.go b/miner/ordering_test.go index 59d478274d..22fb57621a 100644 --- a/miner/ordering_test.go +++ b/miner/ordering_test.go @@ -183,7 +183,7 @@ func TestTransactionTimeSort(t *testing.T) { if txi.GasPrice().Cmp(next.GasPrice()) < 0 { t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice()) } - // Make sure time order is ascending if the txs have the same gas price + // Make sure time order is ascending if the txsToBuildPayload have the same gas price if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.Time().After(next.Time()) { t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.Time(), i+1, fromNext[:4], next.Time()) } diff --git a/miner/payload_building.go b/miner/payload_building.go index c005779338..8701ce466a 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -111,7 +111,7 @@ func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) { "id", payload.id, "number", r.block.NumberU64(), "hash", r.block.Hash(), - "txs", len(r.block.Transactions()), + "txsToBuildPayload", len(r.block.Transactions()), "withdrawals", len(r.block.Withdrawals()), "gas", r.block.GasUsed(), "fees", feesInEther, @@ -176,10 +176,7 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope { // buildPayload builds the payload according to the provided parameters. func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { - // Build the initial version with no transaction included. It should be fast - // enough to run. The empty payload can at least make sure there is something - // to deliver for not missing slot. - emptyParams := &generateParams{ + fullParams := &generateParams{ timestamp: args.Timestamp, forceTime: true, parentHash: args.Parent, @@ -190,7 +187,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { noTxs: false, } start := time.Now() - full := w.getSealingBlock(emptyParams) + full := w.getSealingBlock(fullParams) if full.err != nil { return nil, full.err } diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index 6f57363441..9a377056bb 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -17,6 +17,7 @@ package miner import ( + "math/big" "reflect" "testing" "time" @@ -38,16 +39,7 @@ func TestBuildPayload(t *testing.T) { defer w.close() timestamp := uint64(time.Now().Unix()) - args := &BuildPayloadArgs{ - Parent: b.chain.CurrentBlock().Hash(), - Timestamp: timestamp, - Random: common.Hash{}, - FeeRecipient: recipient, - } - payload, err := w.buildPayload(args) - if err != nil { - t.Fatalf("Failed to build payload %v", err) - } + verify := func(outer *engine.ExecutionPayloadEnvelope, txs int) { payload := outer.ExecutionPayload if payload.ParentHash != b.chain.CurrentBlock().Hash() { @@ -66,18 +58,139 @@ func TestBuildPayload(t *testing.T) { t.Fatal("Unexpect transaction set") } } - empty := payload.ResolveEmpty() - verify(empty, 0) - - full := payload.ResolveFull() - verify(full, len(pendingTxs)) - - // Ensure resolve can be called multiple times and the - // result should be unchanged - dataOne := payload.Resolve() - dataTwo := payload.Resolve() - if !reflect.DeepEqual(dataOne, dataTwo) { - t.Fatal("Unexpected payload data") + + txGasPrice := big.NewInt(10 * params.InitialBaseFee) + + tests := []struct { + name string + txsToBuildPayload types.Transactions + expectedTxsInPayload types.Transactions + invalidTxs types.Transactions + }{ + { + name: "empty", + txsToBuildPayload: types.Transactions{}, + expectedTxsInPayload: types.Transactions{}, + invalidTxs: types.Transactions{}, + }, + { + name: "transactions with gas enough to fit into a single block", + txsToBuildPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil), + types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil), + }, + expectedTxsInPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil), + types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil), + }, + invalidTxs: types.Transactions{}, + }, + { + name: "transactions with gas which doesn't fit in a single block", + txsToBuildPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil), + types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil), + }, + expectedTxsInPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil), + }, + invalidTxs: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil), + }, + }, + { + name: "transactions with nonce too high", + txsToBuildPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil), + types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil), + }, + expectedTxsInPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil), + }, + invalidTxs: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil), + }, + }, + { + name: "transactions with nonce too low", + txsToBuildPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil), + types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil), + }, + expectedTxsInPayload: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil), + }, + invalidTxs: types.Transactions{ + types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + signedTxs := types.Transactions{} + signedInvalidTxs := types.Transactions{} + + for _, tx := range tt.txsToBuildPayload { + signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey) + if err != nil { + t.Fatalf("Failed to sign tx %v", err) + } + signedTxs = append(signedTxs, signedTx) + } + + for _, tx := range tt.invalidTxs { + signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey) + if err != nil { + t.Fatalf("Failed to sign tx %v", err) + } + signedInvalidTxs = append(signedInvalidTxs, signedTx) + } + + // set the astria ordered txsToBuildPayload + b.TxPool().SetAstriaOrdered(signedTxs) + astriaTxs := b.TxPool().AstriaOrdered() + + if astriaTxs.Len() != len(tt.txsToBuildPayload) { + t.Fatalf("Unexpected number of astria ordered transactions: %d", astriaTxs.Len()) + } + + txs := types.TxDifference(*astriaTxs, signedTxs) + if txs.Len() != 0 { + t.Fatalf("Unexpected transactions in astria ordered transactions: %v", txs) + } + + args := &BuildPayloadArgs{ + Parent: b.chain.CurrentBlock().Hash(), + Timestamp: timestamp, + Random: common.Hash{}, + FeeRecipient: recipient, + } + + payload, err := w.buildPayload(args) + if err != nil { + t.Fatalf("Failed to build payload %v", err) + } + full := payload.ResolveFull() + verify(full, len(tt.expectedTxsInPayload)) + + // Ensure resolve can be called multiple times and the + // result should be unchanged + dataOne := payload.Resolve() + dataTwo := payload.Resolve() + if !reflect.DeepEqual(dataOne, dataTwo) { + t.Fatal("Unexpected payload data") + } + + // Ensure invalid transactions are stored + if len(tt.invalidTxs) > 0 { + invalidTxs := b.TxPool().AstriaInvalid() + txDifference := types.TxDifference(*invalidTxs, signedInvalidTxs) + if txDifference.Len() != 0 { + t.Fatalf("Unexpected invalid transactions in astria invalid transactions: %v", txDifference) + } + } + }) } } diff --git a/miner/worker.go b/miner/worker.go index e77c135b5f..1e19c13601 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -792,7 +792,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ return receipt, err } -// This is a copy of commitTransactions, but updated to take a list of txs instead of using heap +// This is a copy of commitTransactions, but updated to take a list of txsToBuildPayload instead of using heap func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transactions, interrupt *atomic.Int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { @@ -810,6 +810,8 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti // If we don't have enough gas for any further transactions then we're done. if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + // remove txsToBuildPayload from the mempool if they are too big for this block + w.eth.TxPool().UpdateAstriaInvalid(tx) break } @@ -821,7 +823,7 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti // phase, start ignoring the sender until we do. if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - + w.eth.TxPool().UpdateAstriaInvalid(tx) continue } // Start executing the transaction @@ -839,7 +841,7 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti case errors.Is(err, core.ErrNonceTooHigh): // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce()) case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account @@ -855,6 +857,10 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti // nonce-too-high clause will prevent us from executing in vain). log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) } + if err != nil { + log.Trace("Marking transaction as invalid", "hash", tx.Hash(), "err", err) + w.eth.TxPool().UpdateAstriaInvalid(tx) + } } if !w.isRunning() && len(coalescedLogs) > 0 { // We don't push the pendingLogsEvent while we are sealing. The reason is that @@ -1064,7 +1070,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { } func (w *worker) fillAstriaTransactions(interrupt *atomic.Int32, env *environment) error { - // Use pre ordered array of txs + // Use pre ordered array of txsToBuildPayload astriaTxs := w.eth.TxPool().AstriaOrdered() if len(*astriaTxs) > 0 { if err := w.commitAstriaTransactions(env, astriaTxs, interrupt); err != nil { @@ -1226,7 +1232,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti fees := totalFees(block, env.receipts) feesInEther := new(big.Float).Quo(new(big.Float).SetInt(fees), big.NewFloat(params.Ether)) log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), - "txs", env.tcount, "gas", block.GasUsed(), "fees", feesInEther, + "txsToBuildPayload", env.tcount, "gas", block.GasUsed(), "fees", feesInEther, "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: diff --git a/miner/worker_test.go b/miner/worker_test.go index 9c4694c0e2..e992a89703 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -160,7 +160,6 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) - backend.txPool.Add(pendingTxs, true, false) w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) w.setEtherbase(testBankAddress) return w, backend