diff --git a/integrationTests/chainSimulator/mempool/mempool_test.go b/integrationTests/chainSimulator/mempool/mempool_test.go new file mode 100644 index 0000000000..1aad3d051a --- /dev/null +++ b/integrationTests/chainSimulator/mempool/mempool_test.go @@ -0,0 +1,148 @@ +package mempool + +import ( + "testing" + "time" + + "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-chain-go/config" + "github.com/multiversx/mx-chain-go/node/chainSimulator/configs" + "github.com/multiversx/mx-chain-go/storage" + "github.com/stretchr/testify/require" +) + +func TestMempoolWithChainSimulator_Selection(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + numSenders := 10000 + numTransactionsPerSender := 3 + shard := 0 + + simulator := startChainSimulator(t, func(cfg *config.Configs) {}) + defer simulator.Close() + + participants := createParticipants(t, simulator, numSenders) + noncesTracker := newNoncesTracker() + + transactions := make([]*transaction.Transaction, 0, numSenders*numTransactionsPerSender) + + for i := 0; i < numSenders; i++ { + sender := participants.sendersByShard[shard][i] + receiver := participants.receiverByShard[shard] + + for j := 0; j < numTransactionsPerSender; j++ { + tx := &transaction.Transaction{ + Nonce: noncesTracker.getThenIncrementNonce(sender), + Value: oneQuarterOfEGLD, + SndAddr: sender.Bytes, + RcvAddr: receiver.Bytes, + Data: []byte{}, + GasLimit: 50_000, + GasPrice: 1_000_000_000, + ChainID: []byte(configs.ChainID), + Version: 2, + Signature: []byte("signature"), + } + + transactions = append(transactions, tx) + } + } + + sendTransactions(t, simulator, transactions) + time.Sleep(durationWaitAfterSendMany) + require.Equal(t, 30_000, getNumTransactionsInPool(simulator, shard)) + + selectedTransactions, gas := selectTransactions(t, simulator, shard) + require.Equal(t, 30_000, len(selectedTransactions)) + require.Equal(t, 50_000*30_000, int(gas)) + + err := simulator.GenerateBlocks(1) + require.Nil(t, err) + require.Equal(t, 27_756, getNumTransactionsInCurrentBlock(simulator, shard)) + + selectedTransactions, gas = selectTransactions(t, simulator, shard) + require.Equal(t, 30_000-27_756, len(selectedTransactions)) + require.Equal(t, 50_000*(30_000-27_756), int(gas)) +} + +func TestMempoolWithChainSimulator_Eviction(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + numSenders := 10000 + numTransactionsPerSender := 30 + shard := 0 + + simulator := startChainSimulator(t, func(cfg *config.Configs) {}) + defer simulator.Close() + + participants := createParticipants(t, simulator, numSenders) + noncesTracker := newNoncesTracker() + + transactions := make([]*transaction.Transaction, 0, numSenders*numTransactionsPerSender) + + for i := 0; i < numSenders; i++ { + sender := participants.sendersByShard[shard][i] + receiver := participants.receiverByShard[shard] + + for j := 0; j < numTransactionsPerSender; j++ { + tx := &transaction.Transaction{ + Nonce: noncesTracker.getThenIncrementNonce(sender), + Value: oneQuarterOfEGLD, + SndAddr: sender.Bytes, + RcvAddr: receiver.Bytes, + Data: []byte{}, + GasLimit: 50_000, + GasPrice: 1_000_000_000, + ChainID: []byte(configs.ChainID), + Version: 2, + Signature: []byte("signature"), + } + + transactions = append(transactions, tx) + } + } + + sendTransactions(t, simulator, transactions) + time.Sleep(durationWaitAfterSendMany) + require.Equal(t, 300_000, getNumTransactionsInPool(simulator, shard)) + + // Send one more transaction (fill up the mempool) + sendTransaction(t, simulator, &transaction.Transaction{ + Nonce: 42, + Value: oneEGLD, + SndAddr: participants.sendersByShard[shard][7].Bytes, + RcvAddr: participants.receiverByShard[shard].Bytes, + Data: []byte{}, + GasLimit: 50000, + GasPrice: 1_000_000_000, + ChainID: []byte(configs.ChainID), + Version: 2, + Signature: []byte("signature"), + }) + + time.Sleep(durationWaitAfterSendSome) + require.Equal(t, 300_001, getNumTransactionsInPool(simulator, shard)) + + // Send one more transaction to trigger eviction + sendTransaction(t, simulator, &transaction.Transaction{ + Nonce: 42, + Value: oneEGLD, + SndAddr: participants.sendersByShard[shard][7].Bytes, + RcvAddr: participants.receiverByShard[shard].Bytes, + Data: []byte{}, + GasLimit: 50000, + GasPrice: 1_000_000_000, + ChainID: []byte(configs.ChainID), + Version: 2, + Signature: []byte("signature"), + }) + + time.Sleep(1 * time.Second) + + expectedNumTransactionsInPool := 300_000 + 1 + 1 - int(storage.TxPoolSourceMeNumItemsToPreemptivelyEvict) + require.Equal(t, expectedNumTransactionsInPool, getNumTransactionsInPool(simulator, shard)) +} diff --git a/integrationTests/chainSimulator/mempool/testutils_test.go b/integrationTests/chainSimulator/mempool/testutils_test.go new file mode 100644 index 0000000000..4e1b0c4430 --- /dev/null +++ b/integrationTests/chainSimulator/mempool/testutils_test.go @@ -0,0 +1,174 @@ +package mempool + +import ( + "math/big" + "strconv" + "testing" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-chain-go/config" + testsChainSimulator "github.com/multiversx/mx-chain-go/integrationTests/chainSimulator" + "github.com/multiversx/mx-chain-go/node/chainSimulator" + "github.com/multiversx/mx-chain-go/node/chainSimulator/components/api" + "github.com/multiversx/mx-chain-go/node/chainSimulator/dtos" + "github.com/multiversx/mx-chain-go/process" + "github.com/multiversx/mx-chain-go/process/block/preprocess" + "github.com/multiversx/mx-chain-go/storage/txcache" + "github.com/multiversx/mx-chain-go/testscommon" + "github.com/stretchr/testify/require" +) + +var ( + oneEGLD = big.NewInt(1000000000000000000) + oneQuarterOfEGLD = big.NewInt(250000000000000000) + oneCentOfEGLD = big.NewInt(10000000000000000) + durationWaitAfterSendMany = 750 * time.Millisecond + durationWaitAfterSendSome = 10 * time.Millisecond +) + +func startChainSimulator(t *testing.T, alterConfigsFunction func(cfg *config.Configs)) testsChainSimulator.ChainSimulator { + simulator, err := chainSimulator.NewChainSimulator(chainSimulator.ArgsChainSimulator{ + BypassTxSignatureCheck: true, + TempDir: t.TempDir(), + PathToInitialConfig: "../../../cmd/node/config/", + NumOfShards: 1, + GenesisTimestamp: time.Now().Unix(), + RoundDurationInMillis: uint64(4000), + RoundsPerEpoch: core.OptionalUint64{ + HasValue: true, + Value: 10, + }, + ApiInterface: api.NewNoApiInterface(), + MinNodesPerShard: 1, + MetaChainMinNodes: 1, + NumNodesWaitingListMeta: 0, + NumNodesWaitingListShard: 0, + AlterConfigsFunction: alterConfigsFunction, + }) + require.NoError(t, err) + require.NotNil(t, simulator) + + err = simulator.GenerateBlocksUntilEpochIsReached(1) + require.NoError(t, err) + + return simulator +} + +type participantsHolder struct { + sendersByShard map[int][]dtos.WalletAddress + receiverByShard map[int]dtos.WalletAddress +} + +func newParticipantsHolder() *participantsHolder { + return &participantsHolder{ + sendersByShard: make(map[int][]dtos.WalletAddress), + receiverByShard: make(map[int]dtos.WalletAddress), + } +} + +func createParticipants(t *testing.T, simulator testsChainSimulator.ChainSimulator, numSendersPerShard int) *participantsHolder { + numShards := int(simulator.GetNodeHandler(0).GetShardCoordinator().NumberOfShards()) + participants := newParticipantsHolder() + + for shard := 0; shard < numShards; shard++ { + senders := make([]dtos.WalletAddress, 0, numSendersPerShard) + + for i := 0; i < numSendersPerShard; i++ { + sender, err := simulator.GenerateAndMintWalletAddress(uint32(shard), oneEGLD) + require.NoError(t, err) + + senders = append(senders, sender) + } + + receiver, err := simulator.GenerateAndMintWalletAddress(0, big.NewInt(0)) + require.NoError(t, err) + + participants.sendersByShard[shard] = senders + participants.receiverByShard[shard] = receiver + } + + err := simulator.GenerateBlocks(1) + require.Nil(t, err) + + return participants +} + +type noncesTracker struct { + nonceByAddress map[string]uint64 +} + +func newNoncesTracker() *noncesTracker { + return &noncesTracker{ + nonceByAddress: make(map[string]uint64), + } +} + +func (tracker *noncesTracker) getThenIncrementNonce(address dtos.WalletAddress) uint64 { + nonce, ok := tracker.nonceByAddress[address.Bech32] + if !ok { + tracker.nonceByAddress[address.Bech32] = 0 + } + + tracker.nonceByAddress[address.Bech32]++ + return nonce +} + +func sendTransactions(t *testing.T, simulator testsChainSimulator.ChainSimulator, transactions []*transaction.Transaction) { + transactionsBySenderShard := make(map[int][]*transaction.Transaction) + shardCoordinator := simulator.GetNodeHandler(0).GetShardCoordinator() + + for _, tx := range transactions { + shard := int(shardCoordinator.ComputeId(tx.SndAddr)) + transactionsBySenderShard[shard] = append(transactionsBySenderShard[shard], tx) + } + + for shard, transactionsFromShard := range transactionsBySenderShard { + node := simulator.GetNodeHandler(uint32(shard)) + numSent, err := node.GetFacadeHandler().SendBulkTransactions(transactionsFromShard) + + require.NoError(t, err) + require.Equal(t, len(transactionsFromShard), int(numSent)) + } +} + +func sendTransaction(t *testing.T, simulator testsChainSimulator.ChainSimulator, tx *transaction.Transaction) { + sendTransactions(t, simulator, []*transaction.Transaction{tx}) +} + +func selectTransactions(t *testing.T, simulator testsChainSimulator.ChainSimulator, shard int) ([]*txcache.WrappedTransaction, uint64) { + shardAsString := strconv.Itoa(shard) + node := simulator.GetNodeHandler(uint32(shard)) + accountsAdapter := node.GetStateComponents().AccountsAdapter() + poolsHolder := node.GetDataComponents().Datapool().Transactions() + + selectionSession, err := preprocess.NewSelectionSession(preprocess.ArgsSelectionSession{ + AccountsAdapter: accountsAdapter, + TransactionsProcessor: &testscommon.TxProcessorStub{}, + }) + require.NoError(t, err) + + mempool := poolsHolder.ShardDataStore(shardAsString).(*txcache.TxCache) + + selectedTransactions, gas := mempool.SelectTransactions( + selectionSession, + process.TxCacheSelectionGasRequested, + process.TxCacheSelectionMaxNumTxs, + process.TxCacheSelectionLoopMaximumDuration, + ) + + return selectedTransactions, gas +} + +func getNumTransactionsInPool(simulator testsChainSimulator.ChainSimulator, shard int) int { + node := simulator.GetNodeHandler(uint32(shard)) + poolsHolder := node.GetDataComponents().Datapool().Transactions() + return int(poolsHolder.GetCounts().GetTotal()) +} + +func getNumTransactionsInCurrentBlock(simulator testsChainSimulator.ChainSimulator, shard int) int { + node := simulator.GetNodeHandler(uint32(shard)) + currentBlock := node.GetDataComponents().Blockchain().GetCurrentBlockHeader() + return int(currentBlock.GetTxCount()) +} diff --git a/process/block/preprocess/selectionSession.go b/process/block/preprocess/selectionSession.go index c5c16a56cd..7e3b35687a 100644 --- a/process/block/preprocess/selectionSession.go +++ b/process/block/preprocess/selectionSession.go @@ -21,22 +21,24 @@ type selectionSession struct { ephemeralAccountsCache map[string]vmcommon.AccountHandler } -type argsSelectionSession struct { - accountsAdapter state.AccountsAdapter - transactionsProcessor process.TransactionProcessor +// ArgsSelectionSession holds the arguments for creating a new selection session. +type ArgsSelectionSession struct { + AccountsAdapter state.AccountsAdapter + TransactionsProcessor process.TransactionProcessor } -func newSelectionSession(args argsSelectionSession) (*selectionSession, error) { - if check.IfNil(args.accountsAdapter) { +// NewSelectionSession creates a new selection session. +func NewSelectionSession(args ArgsSelectionSession) (*selectionSession, error) { + if check.IfNil(args.AccountsAdapter) { return nil, process.ErrNilAccountsAdapter } - if check.IfNil(args.transactionsProcessor) { + if check.IfNil(args.TransactionsProcessor) { return nil, process.ErrNilTxProcessor } return &selectionSession{ - accountsAdapter: args.accountsAdapter, - transactionsProcessor: args.transactionsProcessor, + accountsAdapter: args.AccountsAdapter, + transactionsProcessor: args.TransactionsProcessor, ephemeralAccountsCache: make(map[string]vmcommon.AccountHandler), }, nil } diff --git a/process/block/preprocess/selectionSession_test.go b/process/block/preprocess/selectionSession_test.go index fc46d5ced1..939da698cd 100644 --- a/process/block/preprocess/selectionSession_test.go +++ b/process/block/preprocess/selectionSession_test.go @@ -17,23 +17,23 @@ import ( func TestNewSelectionSession(t *testing.T) { t.Parallel() - session, err := newSelectionSession(argsSelectionSession{ - accountsAdapter: nil, - transactionsProcessor: &testscommon.TxProcessorStub{}, + session, err := NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: nil, + TransactionsProcessor: &testscommon.TxProcessorStub{}, }) require.Nil(t, session) require.ErrorIs(t, err, process.ErrNilAccountsAdapter) - session, err = newSelectionSession(argsSelectionSession{ - accountsAdapter: &stateMock.AccountsStub{}, - transactionsProcessor: nil, + session, err = NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: &stateMock.AccountsStub{}, + TransactionsProcessor: nil, }) require.Nil(t, session) require.ErrorIs(t, err, process.ErrNilTxProcessor) - session, err = newSelectionSession(argsSelectionSession{ - accountsAdapter: &stateMock.AccountsStub{}, - transactionsProcessor: &testscommon.TxProcessorStub{}, + session, err = NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: &stateMock.AccountsStub{}, + TransactionsProcessor: &testscommon.TxProcessorStub{}, }) require.NoError(t, err) require.NotNil(t, session) @@ -66,9 +66,9 @@ func TestSelectionSession_GetAccountState(t *testing.T) { return nil, fmt.Errorf("account not found: %s", address) } - session, err := newSelectionSession(argsSelectionSession{ - accountsAdapter: accounts, - transactionsProcessor: processor, + session, err := NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: accounts, + TransactionsProcessor: processor, }) require.NoError(t, err) require.NotNil(t, session) @@ -111,9 +111,9 @@ func TestSelectionSession_IsIncorrectlyGuarded(t *testing.T) { return nil } - session, err := newSelectionSession(argsSelectionSession{ - accountsAdapter: accounts, - transactionsProcessor: processor, + session, err := NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: accounts, + TransactionsProcessor: processor, }) require.NoError(t, err) require.NotNil(t, session) @@ -144,9 +144,9 @@ func TestSelectionSession_ephemeralAccountsCache_IsSharedAmongCalls(t *testing.T return &stateMock.UserAccountStub{}, nil } - session, err := newSelectionSession(argsSelectionSession{ - accountsAdapter: accounts, - transactionsProcessor: processor, + session, err := NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: accounts, + TransactionsProcessor: processor, }) require.NoError(t, err) require.NotNil(t, session) diff --git a/process/block/preprocess/transactions.go b/process/block/preprocess/transactions.go index 8578a97e92..2f3fc290ef 100644 --- a/process/block/preprocess/transactions.go +++ b/process/block/preprocess/transactions.go @@ -1411,9 +1411,9 @@ func (txs *transactions) computeSortedTxs( sortedTransactionsProvider := createSortedTransactionsProvider(txShardPool) log.Debug("computeSortedTxs.GetSortedTransactions") - session, err := newSelectionSession(argsSelectionSession{ - accountsAdapter: txs.accounts, - transactionsProcessor: txs.txProcessor, + session, err := NewSelectionSession(ArgsSelectionSession{ + AccountsAdapter: txs.accounts, + TransactionsProcessor: txs.txProcessor, }) if err != nil { return nil, nil, err