Skip to content

Commit

Permalink
feat: remove txs which do not end up in a block out of the mempool (#15)
Browse files Browse the repository at this point in the history
When the txs we receive from `ExecuteBlocks` call fail to end up in a
block, we keep them lying around the geth mempool but clear it from the
astria mempool.
This PR attempts to remove such txs from the geth mempool.
  • Loading branch information
bharath-123 committed May 29, 2024
1 parent cd985cd commit f93a513
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 43 deletions.
8 changes: 5 additions & 3 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,11 @@ func New(config Config, chain BlockChain) *BlobPool {
}
}

func (p *BlobPool) SetAstriaOrdered(types.Transactions) {}
func (p *BlobPool) ClearAstriaOrdered() {}
func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} }
func (p *BlobPool) SetAstriaOrdered(types.Transactions) {}
func (p *BlobPool) ClearAstriaOrdered() {}
func (p *BlobPool) AddToAstriaExcludedFromBlock(*types.Transaction) {}
func (p *BlobPool) AstriaExcludedFromBlock() *types.Transactions { return &types.Transactions{} }
func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} }

// Filter returns whether the given transaction can be consumed by the blob pool.
func (p *BlobPool) Filter(tx *types.Transaction) bool {
Expand Down
52 changes: 36 additions & 16 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ var (
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)

// Metrics related to the astria ordered txs
astriaValidMeter = metrics.GetOrRegisterMeter("astria/txpool/valid", nil)
astriaParsedMeter = metrics.GetOrRegisterMeter("astria/txpool/parsed", nil)
astriaRequestedMeter = metrics.GetOrRegisterMeter("astria/txpool/requested", nil)
astriaValidMeter = metrics.GetOrRegisterMeter("astria/txpool/valid", nil)
astriaExcludedFromBlockMeter = metrics.GetOrRegisterMeter("astria/txpool/excludedFromBlock", nil)
astriaRequestedMeter = metrics.GetOrRegisterMeter("astria/txpool/requested", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -281,32 +281,30 @@ func New(config Config, chain BlockChain) *LegacyPool {
}

type astriaOrdered struct {
valid types.Transactions
parsed types.Transactions
pool *LegacyPool
valid types.Transactions
excludedFromBlock types.Transactions
pool *LegacyPool
}

func newAstriaOrdered(valid types.Transactions, parsed types.Transactions, pool *LegacyPool) *astriaOrdered {
astriaParsedMeter.Mark(int64(len(parsed)))
func newAstriaOrdered(valid types.Transactions, pool *LegacyPool) *astriaOrdered {
astriaValidMeter.Mark(int64(len(valid)))

return &astriaOrdered{
valid: valid,
parsed: parsed,
pool: pool,
valid: valid,
excludedFromBlock: types.Transactions{},
pool: pool,
}
}

func (ao *astriaOrdered) clear() {
ao.valid = types.Transactions{}
ao.parsed = types.Transactions{}
ao.excludedFromBlock = types.Transactions{}
}

func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) {
astriaRequestedMeter.Mark(int64(len(txs)))

valid := []*types.Transaction{}
parsed := []*types.Transaction{}
for idx, tx := range txs {
err := pool.validateTxBasics(tx, false)
if err != nil {
Expand All @@ -317,16 +315,38 @@ func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) {
valid = append(valid, tx)
}

pool.astria = newAstriaOrdered(types.Transactions(valid), types.Transactions(parsed), pool)
pool.astria = newAstriaOrdered(valid, pool)
}

func (pool *LegacyPool) AddToAstriaExcludedFromBlock(tx *types.Transaction) {
if pool.astria.excludedFromBlock == nil {
pool.astria.excludedFromBlock = types.Transactions{tx}
return
}

pool.astria.excludedFromBlock = append(pool.astria.excludedFromBlock, tx)
}

func (pool *LegacyPool) AstriaExcludedFromBlock() *types.Transactions {
if pool.astria == nil {
return &types.Transactions{}
}
return &pool.astria.excludedFromBlock
}

func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}

for _, tx := range pool.astria.parsed {
pool.removeTx(tx.Hash(), false, true)
astriaExcludedFromBlockMeter.Mark(int64(len(pool.astria.excludedFromBlock)))
for _, tx := range pool.astria.excludedFromBlock {
n := pool.removeTx(tx.Hash(), false, true)
if n == 0 {
log.Trace("astria tx excluded from block not found in mempool", "hash", tx.Hash())
} else {
log.Trace("astria tx excluded from block removed from mempool", "hash", tx.Hash())
}
}

pool.astria.clear()
Expand Down
2 changes: 2 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,7 @@ type SubPool interface {

SetAstriaOrdered(types.Transactions)
ClearAstriaOrdered()
AddToAstriaExcludedFromBlock(tx *types.Transaction)
AstriaExcludedFromBlock() *types.Transactions
AstriaOrdered() *types.Transactions
}
17 changes: 17 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,23 @@ func (p *TxPool) ClearAstriaOrdered() {
}
}

func (p *TxPool) AddToAstriaExcludedFromBlock(tx *types.Transaction) {
for _, subpool := range p.subpools {
subpool.AddToAstriaExcludedFromBlock(tx)
}
}

func (p *TxPool) AstriaExcludedFromBlock() *types.Transactions {
txs := types.Transactions{}

for _, subpool := range p.subpools {
subpoolTxs := subpool.AstriaExcludedFromBlock()
txs = append(txs, *subpoolTxs...)
}

return &txs
}

func (p *TxPool) AstriaOrdered() *types.Transactions {
txs := types.Transactions{}

Expand Down
156 changes: 134 additions & 22 deletions miner/payload_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,7 @@ func TestBuildPayload(t *testing.T) {
w, b := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), db, 0)

timestamp := uint64(time.Now().Unix())
args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}
payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}

verify := func(outer *engine.ExecutionPayloadEnvelope, txs int) {
payload := outer.ExecutionPayload
if payload.ParentHash != b.chain.CurrentBlock().Hash() {
Expand All @@ -182,18 +173,139 @@ func TestBuildPayload(t *testing.T) {
t.Fatal("Unexpected transaction set")
}
}
empty := payload.ResolveEmpty()
verify(empty, 0)

full := payload.ResolveFull()
verify(full, len(pendingTxs))

// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")

txGasPrice := big.NewInt(10 * params.InitialBaseFee)

tests := []struct {
name string
txsToBuildPayload types.Transactions
expectedTxsInPayload types.Transactions
txsExcludedFromBlock types.Transactions
}{
{
name: "empty",
txsToBuildPayload: types.Transactions{},
expectedTxsInPayload: types.Transactions{},
txsExcludedFromBlock: types.Transactions{},
},
{
name: "transactions with gas enough to fit into a single block",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
txsExcludedFromBlock: types.Transactions{},
},
{
name: "transactions with gas which doesn't fit in a single block",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
txsExcludedFromBlock: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
},
{
name: "transactions with nonce too high",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
},
txsExcludedFromBlock: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
},
{
name: "transactions with nonce too low",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
},
txsExcludedFromBlock: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
signedTxs := types.Transactions{}
signedInvalidTxs := types.Transactions{}

for _, tx := range tt.txsToBuildPayload {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey)
if err != nil {
t.Fatalf("Failed to sign tx %v", err)
}
signedTxs = append(signedTxs, signedTx)
}

for _, tx := range tt.txsExcludedFromBlock {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey)
if err != nil {
t.Fatalf("Failed to sign tx %v", err)
}
signedInvalidTxs = append(signedInvalidTxs, signedTx)
}

// set the astria ordered txsToBuildPayload
b.TxPool().SetAstriaOrdered(signedTxs)
astriaTxs := b.TxPool().AstriaOrdered()

if astriaTxs.Len() != len(tt.txsToBuildPayload) {
t.Fatalf("Unexpected number of astria ordered transactions: %d", astriaTxs.Len())
}

txs := types.TxDifference(*astriaTxs, signedTxs)
if txs.Len() != 0 {
t.Fatalf("Unexpected transactions in astria ordered transactions: %v", txs)
}

args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}

payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}
full := payload.ResolveFull()
verify(full, len(tt.expectedTxsInPayload))

// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")
}

// Ensure invalid transactions are stored
if len(tt.txsExcludedFromBlock) > 0 {
invalidTxs := b.TxPool().AstriaExcludedFromBlock()
txDifference := types.TxDifference(*invalidTxs, signedInvalidTxs)
if txDifference.Len() != 0 {
t.Fatalf("Unexpected transactions in transactions excluded from block list: %v", txDifference)
}
}
})
}
}

Expand Down
10 changes: 8 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func (miner *Miner) commitAstriaTransactions(env *environment, txs *types.Transa
// If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
// remove txs from the mempool if they are too big for this block
miner.txpool.AddToAstriaExcludedFromBlock(tx)
break
}

Expand All @@ -267,7 +269,7 @@ func (miner *Miner) commitAstriaTransactions(env *environment, txs *types.Transa
// phase, start ignoring the sender until we do.
if tx.Protected() && !miner.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", miner.chainConfig.EIP155Block)

miner.txpool.AddToAstriaExcludedFromBlock(tx)
continue
}
// Start executing the transaction
Expand All @@ -285,7 +287,7 @@ func (miner *Miner) commitAstriaTransactions(env *environment, txs *types.Transa

case errors.Is(err, core.ErrNonceTooHigh):
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce())

case errors.Is(err, nil):
env.tcount++
Expand All @@ -299,6 +301,10 @@ func (miner *Miner) commitAstriaTransactions(env *environment, txs *types.Transa
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
}
if err != nil {
log.Trace("Marking transaction as invalid", "hash", tx.Hash(), "err", err)
miner.txpool.AddToAstriaExcludedFromBlock(tx)
}
}
return nil
}
Expand Down

0 comments on commit f93a513

Please sign in to comment.