From 8a0cb502901ff859795b491bc62d0d21bf814bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrei=20B=C4=83ncioiu?= Date: Tue, 26 Nov 2024 22:02:05 +0200 Subject: [PATCH] Fix after review (part 2). --- txcache/README.md | 4 +- txcache/constants.go | 2 +- txcache/diagnosis.go | 2 +- txcache/selection.go | 12 ++- txcache/selection_test.go | 119 +++++++++++++++++++++++++++++ txcache/transactionsHeap.go | 4 +- txcache/transactionsHeapItem.go | 2 +- txcache/wrappedTransaction.go | 9 +++ txcache/wrappedTransaction_test.go | 14 +++- 9 files changed, 156 insertions(+), 12 deletions(-) diff --git a/txcache/README.md b/txcache/README.md index 186b4284..55d11f88 100644 --- a/txcache/README.md +++ b/txcache/README.md @@ -180,11 +180,11 @@ Thus, the mempool selects transactions using an efficient and value-driven algor - The number of selected transactions reaches `maxNum`. **Additional notes:** - - Within the selection loop, the current nonce of the sender is queryied from the blockchain, lazily (when needed). + - Within the selection loop, the current nonce of the sender is queried from the blockchain, lazily (when needed). - If an initial nonce gap is detected, the sender is (completely) skipped in the current selection session. - If a middle nonce gap is detected, the sender is skipped (from now on) in the current selection session. - Transactions with nonces lower than the current nonce of the sender are skipped. - - Transactions with duplicate nonces are skipped. See paragraph 5 for more details. + - Transactions having the same nonce as a previously selected one (in the scope of a sender) are skipped. Also see paragraph 5. - Badly guarded transactions are skipped. - Once the accumulated fees of selected transactions of a given sender exceed the sender's balance, the sender is skipped (from now one). diff --git a/txcache/constants.go b/txcache/constants.go index 811cd4b5..5bb61a52 100644 --- a/txcache/constants.go +++ b/txcache/constants.go @@ -3,4 +3,4 @@ package txcache const diagnosisMaxTransactionsToDisplay = 10000 const diagnosisSelectionGasRequested = 10_000_000_000 const initialCapacityOfSelectionSlice = 30000 -const selectionLoopDurationCheckInterval = 16 +const selectionLoopDurationCheckInterval = 10 diff --git a/txcache/diagnosis.go b/txcache/diagnosis.go index 6f693c97..df2a99fe 100644 --- a/txcache/diagnosis.go +++ b/txcache/diagnosis.go @@ -73,7 +73,7 @@ func (cache *TxCache) diagnoseTransactions() { } // marshalTransactionsToNewlineDelimitedJSON converts a list of transactions to a newline-delimited JSON string. -// Note: each line is indexed, to improve readability. The index is easily removable for if separate analysis is needed. +// Note: each line is indexed, to improve readability. The index is easily removable if separate analysis is needed. func marshalTransactionsToNewlineDelimitedJSON(transactions []*WrappedTransaction, linePrefix string) string { builder := strings.Builder{} builder.WriteString("\n") diff --git a/txcache/selection.go b/txcache/selection.go index a9c427f6..acdd1d36 100644 --- a/txcache/selection.go +++ b/txcache/selection.go @@ -6,6 +6,12 @@ import ( ) func (cache *TxCache) doSelectTransactions(accountStateProvider AccountStateProvider, gasRequested uint64, maxNum int, selectionLoopMaximumDuration time.Duration) (bunchOfTransactions, uint64) { + bunches := cache.acquireBunchesOfTransactions() + + return selectTransactionsFromBunches(accountStateProvider, bunches, gasRequested, maxNum, selectionLoopMaximumDuration) +} + +func (cache *TxCache) acquireBunchesOfTransactions() []bunchOfTransactions { senders := cache.getSenders() bunches := make([]bunchOfTransactions, 0, len(senders)) @@ -13,7 +19,7 @@ func (cache *TxCache) doSelectTransactions(accountStateProvider AccountStateProv bunches = append(bunches, sender.getTxs()) } - return selectTransactionsFromBunches(accountStateProvider, bunches, gasRequested, maxNum, selectionLoopMaximumDuration) + return bunches } // Selection tolerates concurrent transaction additions / removals. @@ -72,9 +78,7 @@ func selectTransactionsFromBunches(accountStateProvider AccountStateProvider, bu } shouldSkipTransaction := detectSkippableTransaction(item) - if shouldSkipTransaction { - // Transaction isn't selected, but the sender is still in the game (will contribute with other transactions). - } else { + if !shouldSkipTransaction { accumulatedGas += gasLimit selectedTransactions = append(selectedTransactions, item.selectCurrentTransaction()) } diff --git a/txcache/selection_test.go b/txcache/selection_test.go index 7684a566..f96b3a28 100644 --- a/txcache/selection_test.go +++ b/txcache/selection_test.go @@ -271,6 +271,108 @@ func TestTxCache_selectTransactionsFromBunches(t *testing.T) { }) } +func TestBenchmarkTxCache_acquireBunchesOfTransactions(t *testing.T) { + config := ConfigSourceMe{ + Name: "untitled", + NumChunks: 16, + NumBytesThreshold: 1000000000, + NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, + CountThreshold: 300001, + CountPerSenderThreshold: math.MaxUint32, + EvictionEnabled: false, + NumItemsToPreemptivelyEvict: 1, + } + + txGasHandler := txcachemocks.NewTxGasHandlerMock() + + sw := core.NewStopWatch() + + t.Run("numSenders = 10000, numTransactions = 100", func(t *testing.T) { + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + + addManyTransactionsWithUniformDistribution(cache, 10000, 100) + + require.Equal(t, 1000000, int(cache.CountTx())) + + sw.Start(t.Name()) + bunches := cache.acquireBunchesOfTransactions() + sw.Stop(t.Name()) + + require.Len(t, bunches, 10000) + require.Len(t, bunches[0], 100) + require.Len(t, bunches[len(bunches)-1], 100) + }) + + t.Run("numSenders = 50000, numTransactions = 2", func(t *testing.T) { + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + + addManyTransactionsWithUniformDistribution(cache, 50000, 2) + + require.Equal(t, 100000, int(cache.CountTx())) + + sw.Start(t.Name()) + bunches := cache.acquireBunchesOfTransactions() + sw.Stop(t.Name()) + + require.Len(t, bunches, 50000) + require.Len(t, bunches[0], 2) + require.Len(t, bunches[len(bunches)-1], 2) + }) + + t.Run("numSenders = 100000, numTransactions = 1", func(t *testing.T) { + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + + addManyTransactionsWithUniformDistribution(cache, 100000, 1) + + require.Equal(t, 100000, int(cache.CountTx())) + + sw.Start(t.Name()) + bunches := cache.acquireBunchesOfTransactions() + sw.Stop(t.Name()) + + require.Len(t, bunches, 100000) + require.Len(t, bunches[0], 1) + require.Len(t, bunches[len(bunches)-1], 1) + }) + + t.Run("numSenders = 300000, numTransactions = 1", func(t *testing.T) { + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + + addManyTransactionsWithUniformDistribution(cache, 300000, 1) + + require.Equal(t, 300000, int(cache.CountTx())) + + sw.Start(t.Name()) + bunches := cache.acquireBunchesOfTransactions() + sw.Stop(t.Name()) + + require.Len(t, bunches, 300000) + require.Len(t, bunches[0], 1) + require.Len(t, bunches[len(bunches)-1], 1) + }) + + for name, measurement := range sw.GetMeasurementsMap() { + fmt.Printf("%fs (%s)\n", measurement, name) + } + + // (1) + // Vendor ID: GenuineIntel + // Model name: 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz + // CPU family: 6 + // Model: 140 + // Thread(s) per core: 2 + // Core(s) per socket: 4 + // + // 0.014468s (TestBenchmarkTxCache_acquireBunchesOfTransactions/numSenders_=_10000,_numTransactions_=_100) + // 0.019183s (TestBenchmarkTxCache_acquireBunchesOfTransactions/numSenders_=_50000,_numTransactions_=_2) + // 0.013876s (TestBenchmarkTxCache_acquireBunchesOfTransactions/numSenders_=_100000,_numTransactions_=_1) + // 0.056631s (TestBenchmarkTxCache_acquireBunchesOfTransactions/numSenders_=_300000,_numTransactions_=_1) +} + func TestBenchmarkTxCache_selectTransactionsFromBunches(t *testing.T) { sw := core.NewStopWatch() @@ -368,6 +470,22 @@ func TestBenchmarkTxCache_doSelectTransactions(t *testing.T) { sw := core.NewStopWatch() + t.Run("numSenders = 10000, numTransactions = 100, maxNum = 50_000", func(t *testing.T) { + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + + addManyTransactionsWithUniformDistribution(cache, 10000, 100) + + require.Equal(t, 1000000, int(cache.CountTx())) + + sw.Start(t.Name()) + selected, accumulatedGas := cache.SelectTransactions(accountStateProvider, 10_000_000_000, 50_000, selectionLoopMaximumDuration) + sw.Stop(t.Name()) + + require.Equal(t, 50000, len(selected)) + require.Equal(t, uint64(2_500_000_000), accumulatedGas) + }) + t.Run("numSenders = 50000, numTransactions = 2, maxNum = 50_000", func(t *testing.T) { cache, err := NewTxCache(config, txGasHandler) require.Nil(t, err) @@ -428,6 +546,7 @@ func TestBenchmarkTxCache_doSelectTransactions(t *testing.T) { // Thread(s) per core: 2 // Core(s) per socket: 4 // + // 0.126612s (TestBenchmarkTxCache_doSelectTransactions/numSenders_=_10000,_numTransactions_=_100,_maxNum_=_50_000) // 0.107361s (TestBenchmarkTxCache_doSelectTransactions/numSenders_=_50000,_numTransactions_=_2,_maxNum_=_50_000) // 0.168364s (TestBenchmarkTxCache_doSelectTransactions/numSenders_=_100000,_numTransactions_=_1,_maxNum_=_50_000) // 0.305363s (TestBenchmarkTxCache_doSelectTransactions/numSenders_=_300000,_numTransactions_=_1,_maxNum_=_50_000) diff --git a/txcache/transactionsHeap.go b/txcache/transactionsHeap.go index e96ebb54..28b4e072 100644 --- a/txcache/transactionsHeap.go +++ b/txcache/transactionsHeap.go @@ -11,7 +11,7 @@ func newMinTransactionsHeap(capacity int) *transactionsHeap { } h.less = func(i, j int) bool { - return h.items[j].holdsTransactionMoreValuableForNetwork(h.items[i]) + return h.items[j].isCurrentTransactionMoreValuableForNetwork(h.items[i]) } return &h @@ -23,7 +23,7 @@ func newMaxTransactionsHeap(capacity int) *transactionsHeap { } h.less = func(i, j int) bool { - return h.items[i].holdsTransactionMoreValuableForNetwork(h.items[j]) + return h.items[i].isCurrentTransactionMoreValuableForNetwork(h.items[j]) } return &h diff --git a/txcache/transactionsHeapItem.go b/txcache/transactionsHeapItem.go index b54b46ef..25191da6 100644 --- a/txcache/transactionsHeapItem.go +++ b/txcache/transactionsHeapItem.go @@ -193,6 +193,6 @@ func (item *transactionsHeapItem) requestAccountStateIfNecessary(accountStatePro return nil } -func (item *transactionsHeapItem) holdsTransactionMoreValuableForNetwork(other *transactionsHeapItem) bool { +func (item *transactionsHeapItem) isCurrentTransactionMoreValuableForNetwork(other *transactionsHeapItem) bool { return item.currentTransaction.isTransactionMoreValuableForNetwork(other.currentTransaction) } diff --git a/txcache/wrappedTransaction.go b/txcache/wrappedTransaction.go index aace7d97..6bcaf471 100644 --- a/txcache/wrappedTransaction.go +++ b/txcache/wrappedTransaction.go @@ -37,10 +37,19 @@ func (wrappedTx *WrappedTransaction) precomputeFields(txGasHandler TxGasHandler) // Equality is out of scope (not possible in our case). func (wrappedTx *WrappedTransaction) isTransactionMoreValuableForNetwork(otherTransaction *WrappedTransaction) bool { + // First, compare by PPU (higher PPU is better). if wrappedTx.PricePerUnit != otherTransaction.PricePerUnit { return wrappedTx.PricePerUnit > otherTransaction.PricePerUnit } + // If PPU is the same, compare by gas limit (higher gas limit is better, promoting less "execution fragmentation"). + gasLimit := wrappedTx.Tx.GetGasLimit() + gasLimitOther := otherTransaction.Tx.GetGasLimit() + + if gasLimit != gasLimitOther { + return gasLimit > gasLimitOther + } + // In the end, compare by transaction hash return bytes.Compare(wrappedTx.TxHash, otherTransaction.TxHash) < 0 } diff --git a/txcache/wrappedTransaction_test.go b/txcache/wrappedTransaction_test.go index 1b486b7e..8adb0b00 100644 --- a/txcache/wrappedTransaction_test.go +++ b/txcache/wrappedTransaction_test.go @@ -58,13 +58,25 @@ func TestWrappedTransaction_isTransactionMoreValuableForNetwork(t *testing.T) { require.True(t, a.isTransactionMoreValuableForNetwork(b)) }) - t.Run("decide by transaction hash (set them up to have the same PPU)", func(t *testing.T) { + t.Run("decide by gas limit (set them up to have the same PPU)", func(t *testing.T) { + a := createTx([]byte("a-7"), "a", 7).withDataLength(30).withGasLimit(95_000).withGasPrice(oneBillion) + a.precomputeFields(txGasHandler) + + b := createTx([]byte("b-7"), "b", 7).withDataLength(60).withGasLimit(140_000).withGasPrice(oneBillion) + b.precomputeFields(txGasHandler) + + require.Equal(t, a.PricePerUnit, b.PricePerUnit) + require.True(t, b.isTransactionMoreValuableForNetwork(a)) + }) + + t.Run("decide by transaction hash (set them up to have the same PPU and gas limit)", func(t *testing.T) { a := createTx([]byte("a-7"), "a", 7) a.precomputeFields(txGasHandler) b := createTx([]byte("b-7"), "b", 7) b.precomputeFields(txGasHandler) + require.Equal(t, a.PricePerUnit, b.PricePerUnit) require.True(t, a.isTransactionMoreValuableForNetwork(b)) }) }