Skip to content

Commit

Permalink
remove txs which do not end up in a block out of the mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed May 9, 2024
1 parent fe45f4e commit bbefcc6
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 45 deletions.
8 changes: 5 additions & 3 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,11 @@ func New(config Config, chain BlockChain) *BlobPool {
}
}

func (p *BlobPool) SetAstriaOrdered(types.Transactions) {}
func (p *BlobPool) ClearAstriaOrdered() {}
func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} }
func (p *BlobPool) SetAstriaOrdered(types.Transactions) {}
func (p *BlobPool) ClearAstriaOrdered() {}
func (p *BlobPool) UpdateAstriaInvalidTxs(*types.Transaction) {}
func (p *BlobPool) AstriaInvalidTxs() *types.Transactions { return &types.Transactions{} }
func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} }

// Filter returns whether the given transaction can be consumed by the blob pool.
func (p *BlobPool) Filter(tx *types.Transaction) bool {
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/legacypool/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
}
break
}
// New transaction parsed, queue up for later, import if threshold is reached
// New transaction invalid, queue up for later, import if threshold is reached
total++

if batch = append(batch, tx); batch.Len() > 1024 {
Expand Down
36 changes: 25 additions & 11 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var (

// Metrics related to the astria ordered txs
astriaValidMeter = metrics.GetOrRegisterMeter("astria/txpool/valid", nil)
astriaParsedMeter = metrics.GetOrRegisterMeter("astria/txpool/parsed", nil)
astriaParsedMeter = metrics.GetOrRegisterMeter("astria/txpool/invalid", nil)
astriaRequestedMeter = metrics.GetOrRegisterMeter("astria/txpool/requested", nil)
)

Expand Down Expand Up @@ -280,32 +280,31 @@ func New(config Config, chain BlockChain) *LegacyPool {
}

type astriaOrdered struct {
valid types.Transactions
parsed types.Transactions
pool *LegacyPool
valid types.Transactions
invalid types.Transactions
pool *LegacyPool
}

func newAstriaOrdered(valid types.Transactions, parsed types.Transactions, pool *LegacyPool) *astriaOrdered {
astriaParsedMeter.Mark(int64(len(parsed)))
astriaValidMeter.Mark(int64(len(valid)))

return &astriaOrdered{
valid: valid,
parsed: parsed,
pool: pool,
valid: valid,
invalid: parsed,
pool: pool,
}
}

func (ao *astriaOrdered) clear() {
ao.valid = types.Transactions{}
ao.parsed = types.Transactions{}
ao.invalid = types.Transactions{}
}

func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) {
astriaRequestedMeter.Mark(int64(len(txs)))

valid := []*types.Transaction{}
parsed := []*types.Transaction{}
for idx, tx := range txs {
err := pool.validateTxBasics(tx, false)
if err != nil {
Expand All @@ -316,15 +315,30 @@ func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) {
valid = append(valid, tx)
}

pool.astria = newAstriaOrdered(types.Transactions(valid), types.Transactions(parsed), pool)
pool.astria = newAstriaOrdered(valid, types.Transactions{}, pool)
}

func (pool *LegacyPool) UpdateAstriaInvalidTxs(tx *types.Transaction) {
if pool.astria.invalid == nil {
pool.astria.invalid = types.Transactions{tx}
}

pool.astria.invalid = append(pool.astria.invalid, tx)
}

func (pool *LegacyPool) AstriaInvalidTxs() *types.Transactions {
if pool.astria == nil {
return &types.Transactions{}
}
return &pool.astria.invalid
}

func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}

for _, tx := range pool.astria.parsed {
for _, tx := range pool.astria.invalid {
pool.removeTx(tx.Hash(), false, true)
}

Expand Down
2 changes: 2 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,7 @@ type SubPool interface {

SetAstriaOrdered(types.Transactions)
ClearAstriaOrdered()
UpdateAstriaInvalidTxs(tx *types.Transaction)
AstriaInvalidTxs() *types.Transactions
AstriaOrdered() *types.Transactions
}
17 changes: 17 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,23 @@ func (p *TxPool) ClearAstriaOrdered() {
}
}

func (p *TxPool) UpdateAstriaInvalidTxs(tx *types.Transaction) {
for _, subpool := range p.subpools {
subpool.UpdateAstriaInvalidTxs(tx)
}
}

func (p *TxPool) AstriaInvalidTxs() *types.Transactions {
txs := types.Transactions{}

for _, subpool := range p.subpools {
subpoolTxs := subpool.AstriaInvalidTxs()
txs = append(txs, *subpoolTxs...)
}

return &txs
}

func (p *TxPool) AstriaOrdered() *types.Transactions {
txs := types.Transactions{}

Expand Down
7 changes: 2 additions & 5 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,7 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope {

// buildPayload builds the payload according to the provided parameters.
func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
// Build the initial version with no transaction included. It should be fast
// enough to run. The empty payload can at least make sure there is something
// to deliver for not missing slot.
emptyParams := &generateParams{
fullParams := &generateParams{
timestamp: args.Timestamp,
forceTime: true,
parentHash: args.Parent,
Expand All @@ -190,7 +187,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
noTxs: false,
}
start := time.Now()
full := w.getSealingBlock(emptyParams)
full := w.getSealingBlock(fullParams)
if full.err != nil {
return nil, full.err
}
Expand Down
157 changes: 135 additions & 22 deletions miner/payload_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package miner

import (
"math/big"
"reflect"
"testing"
"time"
Expand All @@ -38,16 +39,7 @@ func TestBuildPayload(t *testing.T) {
defer w.close()

timestamp := uint64(time.Now().Unix())
args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}
payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}

verify := func(outer *engine.ExecutionPayloadEnvelope, txs int) {
payload := outer.ExecutionPayload
if payload.ParentHash != b.chain.CurrentBlock().Hash() {
Expand All @@ -66,18 +58,139 @@ func TestBuildPayload(t *testing.T) {
t.Fatal("Unexpect transaction set")
}
}
empty := payload.ResolveEmpty()
verify(empty, 0)

full := payload.ResolveFull()
verify(full, len(pendingTxs))

// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")

txGasPrice := big.NewInt(10 * params.InitialBaseFee)

tests := []struct {
name string
txs types.Transactions
expectedTxsInPayload types.Transactions
invalidTxs types.Transactions
}{
{
name: "empty",
txs: types.Transactions{},
expectedTxsInPayload: types.Transactions{},
invalidTxs: types.Transactions{},
},
{
name: "transactions with gas enough to fit into a single block",
txs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
invalidTxs: types.Transactions{},
},
{
name: "transactions with gas which doesn't fit in a single block",
txs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
invalidTxs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
},
{
name: "transactions with nonce too high",
txs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
},
invalidTxs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
},
{
name: "transactions with nonce too low",
txs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
},
invalidTxs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
signedTxs := types.Transactions{}
signedInvalidTxs := types.Transactions{}

for _, tx := range tt.txs {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey)
if err != nil {
t.Fatalf("Failed to sign tx %v", err)
}
signedTxs = append(signedTxs, signedTx)
}

for _, tx := range tt.invalidTxs {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey)
if err != nil {
t.Fatalf("Failed to sign tx %v", err)
}
signedInvalidTxs = append(signedInvalidTxs, signedTx)
}

// set the astria ordered txs
b.TxPool().SetAstriaOrdered(signedTxs)
astriaTxs := b.TxPool().AstriaOrdered()

if astriaTxs.Len() != len(tt.txs) {
t.Fatalf("Unexpected number of astria ordered transactions: %d", astriaTxs.Len())
}

txs := types.TxDifference(*astriaTxs, signedTxs)
if txs.Len() != 0 {
t.Fatalf("Unexpected transactions in astria ordered transactions: %v", txs)
}

args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}

payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}
full := payload.ResolveFull()
verify(full, len(tt.expectedTxsInPayload))

// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")
}

// Ensure invalid transactions are stored
if len(tt.invalidTxs) > 0 {
invalidTxs := b.TxPool().AstriaInvalidTxs()
txDifference := types.TxDifference(*invalidTxs, signedInvalidTxs)
if txDifference.Len() != 0 {
t.Fatalf("Unexpected invalid transactions in astria invalid transactions: %v", txDifference)
}
}
})
}
}

Expand Down
10 changes: 8 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti
// If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
// remove txs from the mempool if they are too big for this block
w.eth.TxPool().UpdateAstriaInvalidTxs(tx)
break
}

Expand All @@ -821,7 +823,7 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)

w.eth.TxPool().UpdateAstriaInvalidTxs(tx)
continue
}
// Start executing the transaction
Expand All @@ -839,7 +841,7 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti

case errors.Is(err, core.ErrNonceTooHigh):
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce())

case errors.Is(err, nil):
// Everything ok, collect the logs and shift in the next transaction from the same account
Expand All @@ -855,6 +857,10 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
}
if err != nil {
log.Trace("Removing transaction from mempool", "hash", tx.Hash(), "err", err)
w.eth.TxPool().UpdateAstriaInvalidTxs(tx)
}
}
if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that
Expand Down
1 change: 0 additions & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {

func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.Add(pendingTxs, true, false)
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false)
w.setEtherbase(testBankAddress)
return w, backend
Expand Down

0 comments on commit bbefcc6

Please sign in to comment.