Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use mutex lock for tx removal #51

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}
pool.mu.Lock()
defer pool.mu.Unlock()

astriaExcludedFromBlockMeter.Mark(int64(len(pool.astria.excludedFromBlock)))
for _, tx := range pool.astria.excludedFromBlock {
Expand Down
61 changes: 61 additions & 0 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,67 @@ func TestChainFork(t *testing.T) {
}
}

func TestRemoveTxSanity(t *testing.T) {
t.Parallel()

pool, key := setupPool()
defer pool.Close()

addr := crypto.PubkeyToAddress(key.PublicKey)
resetState := func() {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, uint256.NewInt(100000000000000), tracing.BalanceChangeUnspecified)

pool.chain = newTestBlockChain(pool.chainconfig, 1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()

tx1 := transaction(0, 100000, key)
tx2 := transaction(1, 100000, key)
tx3 := transaction(2, 100000, key)

if err := pool.addLocal(tx1); err != nil {
t.Error("didn't expect error", err)
}
if err := pool.addLocal(tx2); err != nil {
t.Error("didn't expect error", err)
}
if err := pool.addLocal(tx3); err != nil {
t.Error("didn't expect error", err)
}

pendingTxs := pool.pending[addr]
if pendingTxs.Len() != 3 {
t.Error("expected 3 pending transactions, got", pendingTxs.Len())
}

if err := validatePoolInternals(pool); err != nil {
t.Errorf("pool internals validation failed: %v", err)
}

n := pool.removeTx(tx1.Hash(), false, true)
if n != 3 {
t.Error("expected 3 transactions to be removed, got", n)
}
n = pool.removeTx(tx2.Hash(), false, true)
if n != 0 {
t.Error("expected 0 transactions to be removed, got", n)
}
n = pool.removeTx(tx3.Hash(), false, true)
if n != 0 {
t.Error("expected 0 transactions to be removed, got", n)
}

if len(pool.pending) != 0 {
t.Error("expected 0 pending transactions, got", pendingTxs.Len())
}

if err := validatePoolInternals(pool); err != nil {
t.Errorf("pool internals validation failed: %v", err)
}
}

func TestDoubleNonce(t *testing.T) {
t.Parallel()

Expand Down
137 changes: 137 additions & 0 deletions grpc/execution/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"math/big"
"testing"
"time"
)

func TestExecutionService_GetGenesisInfo(t *testing.T) {
Expand Down Expand Up @@ -475,3 +476,139 @@ func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitment(t *testi
balanceDiff := new(uint256.Int).Sub(chainDestinationAddressBalanceAfter, chainDestinationAddressBalanceBefore)
require.True(t, balanceDiff.Cmp(uint256.NewInt(1000000000000000000)) == 0, "Chain destination address balance is not correct")
}

// Check that invalid transactions are not added into a block and are removed from the mempool
func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitmentWithInvalidTransactions(t *testing.T) {
ethservice, serviceV1Alpha1 := setupExecutionService(t, 10)

// call genesis info
genesisInfo, err := serviceV1Alpha1.GetGenesisInfo(context.Background(), &astriaPb.GetGenesisInfoRequest{})
require.Nil(t, err, "GetGenesisInfo failed")
require.NotNil(t, genesisInfo, "GenesisInfo is nil")

// call get commitment state
commitmentState, err := serviceV1Alpha1.GetCommitmentState(context.Background(), &astriaPb.GetCommitmentStateRequest{})
require.Nil(t, err, "GetCommitmentState failed")
require.NotNil(t, commitmentState, "CommitmentState is nil")

ethservice.BlockChain().SetSafe(ethservice.BlockChain().CurrentBlock())

// get previous block hash
previousBlock := ethservice.BlockChain().CurrentSafeBlock()
require.NotNil(t, previousBlock, "Previous block not found")

gasLimit := ethservice.BlockChain().GasLimit()

stateDb, err := ethservice.BlockChain().StateAt(previousBlock.Root)
require.Nil(t, err, "Failed to get state db")

latestNonce := stateDb.GetNonce(testAddr)

// create 5 txs
txs := []*types.Transaction{}
marshalledTxs := []*sequencerblockv1alpha1.RollupData{}
for i := 0; i < 5; i++ {
unsignedTx := types.NewTransaction(latestNonce+uint64(i), testToAddress, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil)
tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey)
require.Nil(t, err, "Failed to sign tx")
txs = append(txs, tx)

marshalledTx, err := tx.MarshalBinary()
require.Nil(t, err, "Failed to marshal tx")
marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{
Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx},
})
}

// add a tx with lesser gas than the base gas
unsignedTx := types.NewTransaction(latestNonce+uint64(5), testToAddress, big.NewInt(1), gasLimit, big.NewInt(params.InitialBaseFee*2), nil)
tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey)
require.Nil(t, err, "Failed to sign tx")
txs = append(txs, tx)

marshalledTx, err := tx.MarshalBinary()
require.Nil(t, err, "Failed to marshal tx")
marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{
Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx},
})

errors := ethservice.TxPool().Add(txs, true, false)
for _, err := range errors {
require.Nil(t, err, "Failed to add tx to pool")
}

pending, queued := ethservice.TxPool().Stats()
require.Equal(t, 6, pending, "Pending txs should be 6")
require.Equal(t, 0, queued, "Queued txs should be 0")

executeBlockReq := &astriaPb.ExecuteBlockRequest{
PrevBlockHash: previousBlock.Hash().Bytes(),
Timestamp: &timestamppb.Timestamp{
Seconds: int64(previousBlock.Time + 2),
},
Transactions: marshalledTxs,
}

executeBlockRes, err := serviceV1Alpha1.ExecuteBlock(context.Background(), executeBlockReq)
require.Nil(t, err, "ExecuteBlock failed")

require.NotNil(t, executeBlockRes, "ExecuteBlock response is nil")

// check if astria ordered txs are cleared
astriaOrdered := ethservice.TxPool().AstriaOrdered()
require.Equal(t, 0, astriaOrdered.Len(), "AstriaOrdered should be empty")

// call update commitment state to set the block we executed as soft and firm
updateCommitmentStateReq := &astriaPb.UpdateCommitmentStateRequest{
CommitmentState: &astriaPb.CommitmentState{
Soft: &astriaPb.Block{
Hash: executeBlockRes.Hash,
ParentBlockHash: executeBlockRes.ParentBlockHash,
Number: executeBlockRes.Number,
Timestamp: executeBlockRes.Timestamp,
},
Firm: &astriaPb.Block{
Hash: executeBlockRes.Hash,
ParentBlockHash: executeBlockRes.ParentBlockHash,
Number: executeBlockRes.Number,
Timestamp: executeBlockRes.Timestamp,
},
BaseCelestiaHeight: commitmentState.BaseCelestiaHeight + 1,
},
}

updateCommitmentStateRes, err := serviceV1Alpha1.UpdateCommitmentState(context.Background(), updateCommitmentStateReq)
require.Nil(t, err, "UpdateCommitmentState failed")
require.NotNil(t, updateCommitmentStateRes, "UpdateCommitmentState response should not be nil")
require.Equal(t, updateCommitmentStateRes, updateCommitmentStateReq.CommitmentState, "CommitmentState response should match request")

// get the soft and firm block
softBlock := ethservice.BlockChain().CurrentSafeBlock()
require.NotNil(t, softBlock, "SoftBlock is nil")
firmBlock := ethservice.BlockChain().CurrentFinalBlock()
require.NotNil(t, firmBlock, "FirmBlock is nil")

block := ethservice.BlockChain().GetBlockByNumber(softBlock.Number.Uint64())
require.NotNil(t, block, "Soft Block not found")
require.Equal(t, block.Transactions().Len(), 5, "Soft Block should have 5 txs")

// give the tx loop time to run
time.Sleep(1 * time.Millisecond)

// after the tx loop is run, all pending txs should be removed
pending, queued = ethservice.TxPool().Stats()
require.Equal(t, 0, pending, "Pending txs should be 0")
require.Equal(t, 0, queued, "Queued txs should be 0")

// check if the soft and firm block are set correctly
require.True(t, bytes.Equal(softBlock.Hash().Bytes(), updateCommitmentStateRes.Soft.Hash), "Soft Block Hashes do not match")
require.True(t, bytes.Equal(softBlock.ParentHash.Bytes(), updateCommitmentStateRes.Soft.ParentBlockHash), "Soft Block Parent Hash do not match")
require.Equal(t, softBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Soft.Number), "Soft Block Number do not match")

require.True(t, bytes.Equal(firmBlock.Hash().Bytes(), updateCommitmentStateRes.Firm.Hash), "Firm Block Hashes do not match")
require.True(t, bytes.Equal(firmBlock.ParentHash.Bytes(), updateCommitmentStateRes.Firm.ParentBlockHash), "Firm Block Parent Hash do not match")
require.Equal(t, firmBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Firm.Number), "Firm Block Number do not match")

celestiaBaseHeight := ethservice.BlockChain().CurrentBaseCelestiaHeight()
require.Equal(t, celestiaBaseHeight, updateCommitmentStateRes.BaseCelestiaHeight, "BaseCelestiaHeight should be updated in db")
}
Loading