From 9099c66ba7bfef0dd0a396af6f85079ba4eb4160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrei=20B=C4=83ncioiu?= Date: Fri, 1 Nov 2024 14:50:11 +0200 Subject: [PATCH] Fix eviction, add some tests. --- txcache/config.go | 24 +++++---- txcache/eviction.go | 81 +++++++++++++++++------------ txcache/txCache_test.go | 109 +++++++++++++++++++++++++++++----------- 3 files changed, 141 insertions(+), 73 deletions(-) diff --git a/txcache/config.go b/txcache/config.go index e9f11475..a0752044 100644 --- a/txcache/config.go +++ b/txcache/config.go @@ -15,17 +15,18 @@ const maxNumBytesUpperBound = 1_073_741_824 // one GB const maxNumItemsPerSenderLowerBound = 1 const maxNumBytesPerSenderLowerBound = maxNumItemsPerSenderLowerBound * 1 const maxNumBytesPerSenderUpperBound = 33_554_432 // 32 MB -const numTxsToPreemptivelyEvictLowerBound = 1 +const numItemsToPreemptivelyEvictLowerBound = 1 // ConfigSourceMe holds cache configuration type ConfigSourceMe struct { - Name string - NumChunks uint32 - EvictionEnabled bool - NumBytesThreshold uint32 - NumBytesPerSenderThreshold uint32 - CountThreshold uint32 - CountPerSenderThreshold uint32 + Name string + NumChunks uint32 + EvictionEnabled bool + NumBytesThreshold uint32 + NumBytesPerSenderThreshold uint32 + CountThreshold uint32 + CountPerSenderThreshold uint32 + NumItemsToPreemptivelyEvict uint32 } type senderConstraints struct { @@ -33,7 +34,6 @@ type senderConstraints struct { maxNumBytes uint32 } -// TODO: Upon further analysis and brainstorming, add some sensible minimum accepted values for the appropriate fields. func (config *ConfigSourceMe) verify() error { if len(config.Name) == 0 { return fmt.Errorf("%w: config.Name is invalid", common.ErrInvalidConfig) @@ -54,6 +54,9 @@ func (config *ConfigSourceMe) verify() error { if config.CountThreshold < maxNumItemsLowerBound { return fmt.Errorf("%w: config.CountThreshold is invalid", common.ErrInvalidConfig) } + if config.NumItemsToPreemptivelyEvict < numItemsToPreemptivelyEvictLowerBound { + return fmt.Errorf("%w: config.NumItemsToPreemptivelyEvict is invalid", common.ErrInvalidConfig) + } } return nil @@ -85,7 +88,6 @@ type ConfigDestinationMe struct { NumItemsToPreemptivelyEvict uint32 } -// TODO: Upon further analysis and brainstorming, add some sensible minimum accepted values for the appropriate fields. func (config *ConfigDestinationMe) verify() error { if len(config.Name) == 0 { return fmt.Errorf("%w: config.Name is invalid", common.ErrInvalidConfig) @@ -99,7 +101,7 @@ func (config *ConfigDestinationMe) verify() error { if config.MaxNumBytes < maxNumBytesLowerBound || config.MaxNumBytes > maxNumBytesUpperBound { return fmt.Errorf("%w: config.MaxNumBytes is invalid", common.ErrInvalidConfig) } - if config.NumItemsToPreemptivelyEvict < numTxsToPreemptivelyEvictLowerBound { + if config.NumItemsToPreemptivelyEvict < numItemsToPreemptivelyEvictLowerBound { return fmt.Errorf("%w: config.NumItemsToPreemptivelyEvict is invalid", common.ErrInvalidConfig) } diff --git a/txcache/eviction.go b/txcache/eviction.go index ca930bd0..68394408 100644 --- a/txcache/eviction.go +++ b/txcache/eviction.go @@ -7,11 +7,12 @@ import ( // evictionJournal keeps a short journal about the eviction process // This is useful for debugging and reasoning about the eviction type evictionJournal struct { - numTxs uint32 + numTxs int + numPasses int } -// doEviction does cache eviction -// We do not allow more evictions to start concurrently +// doEviction does cache eviction. +// We do not allow more evictions to start concurrently. func (cache *TxCache) doEviction() *evictionJournal { if cache.isEvictionInProgress.IsSet() { return nil @@ -53,11 +54,12 @@ func (cache *TxCache) doEviction() *evictionJournal { "evicted txs", evictionJournal.numTxs, ) - return &evictionJournal + return evictionJournal } func (cache *TxCache) isCapacityExceeded() bool { - return cache.areThereTooManyBytes() || cache.areThereTooManySenders() || cache.areThereTooManyTxs() + exceeded := cache.areThereTooManyBytes() || cache.areThereTooManySenders() || cache.areThereTooManyTxs() + return exceeded } func (cache *TxCache) areThereTooManyBytes() bool { @@ -78,7 +80,7 @@ func (cache *TxCache) areThereTooManyTxs() bool { return tooManyTxs } -func (cache *TxCache) evictLeastLikelyToSelectTransactions() evictionJournal { +func (cache *TxCache) evictLeastLikelyToSelectTransactions() *evictionJournal { senders := cache.getSenders() bunches := make([]BunchOfTransactions, 0, len(senders)) @@ -87,42 +89,57 @@ func (cache *TxCache) evictLeastLikelyToSelectTransactions() evictionJournal { bunches = append(bunches, sender.getTxs()) } - mergedBunch := mergeBunchesOfTransactionsInParallel(bunches) - - // Select a reasonable number of transactions to evict. - transactionsToEvict := mergedBunch[3*len(mergedBunch)/4:] - transactionsToEvictHashes := make([][]byte, len(transactionsToEvict)) + transactions := mergeBunchesOfTransactionsInParallel(bunches) + transactionsHashes := make([][]byte, len(transactions)) - // For each sender, find the "lowest" (in nonce) transaction to evict. - lowestToEvictBySender := make(map[string]uint64) + for i, tx := range transactions { + transactionsHashes[i] = tx.TxHash + } - for _, tx := range transactionsToEvict { - transactionsToEvictHashes = append(transactionsToEvictHashes, tx.TxHash) - sender := string(tx.Tx.GetSndAddr()) + journal := &evictionJournal{} - if _, ok := lowestToEvictBySender[sender]; ok { - continue + for pass := 1; cache.isCapacityExceeded(); pass++ { + cutoffIndex := len(transactions) - int(cache.config.NumItemsToPreemptivelyEvict)*pass + if cutoffIndex <= 0 { + cutoffIndex = 0 } - lowestToEvictBySender[sender] = tx.Tx.GetNonce() - } + transactionsToEvict := transactions[cutoffIndex:] + transactionsToEvictHashes := transactionsHashes[cutoffIndex:] + + transactions = transactions[:cutoffIndex] + transactionsHashes = transactionsHashes[:cutoffIndex] + + // For each sender, find the "lowest" (in nonce) transaction to evict. + lowestToEvictBySender := make(map[string]uint64) - // Remove those transactions from "txListBySender". - for sender, nonce := range lowestToEvictBySender { - list, ok := cache.txListBySender.getListForSender(sender) - if !ok { - continue + for _, tx := range transactionsToEvict { + transactionsToEvictHashes = append(transactionsToEvictHashes, tx.TxHash) + sender := string(tx.Tx.GetSndAddr()) + + if _, ok := lowestToEvictBySender[sender]; ok { + continue + } + + lowestToEvictBySender[sender] = tx.Tx.GetNonce() } - list.evictTransactionsWithHigherNonces(nonce - 1) - } + // Remove those transactions from "txListBySender". + for sender, nonce := range lowestToEvictBySender { + list, ok := cache.txListBySender.getListForSender(sender) + if !ok { + continue + } - // Remove those transactions from "txByHash". - cache.txByHash.RemoveTxsBulk(transactionsToEvictHashes) + list.evictTransactionsWithHigherNonces(nonce - 1) + } + + // Remove those transactions from "txByHash". + cache.txByHash.RemoveTxsBulk(transactionsToEvictHashes) - evictionJournal := evictionJournal{ - numTxs: uint32(len(transactionsToEvict)), + journal.numPasses = pass + journal.numTxs += len(transactionsToEvict) } - return evictionJournal + return journal } diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 2047ca80..26be1b17 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -437,41 +437,90 @@ func Test_Keys(t *testing.T) { func Test_AddWithEviction_UniformDistributionOfTxsPerSender(t *testing.T) { txGasHandler := txcachemocks.NewTxGasHandlerMock() - config := ConfigSourceMe{ - Name: "untitled", - NumChunks: 16, - EvictionEnabled: true, - NumBytesThreshold: maxNumBytesUpperBound, - NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, - CountThreshold: 100, - CountPerSenderThreshold: math.MaxUint32, - } - // 11 * 10 - cache, err := NewTxCache(config, txGasHandler) - require.Nil(t, err) - require.NotNil(t, cache) + t.Run("numSenders = 11, numTransactions = 10, countThreshold = 100, numItemsToPreemptivelyEvict = 1", func(t *testing.T) { + config := ConfigSourceMe{ + Name: "untitled", + NumChunks: 16, + EvictionEnabled: true, + NumBytesThreshold: maxNumBytesUpperBound, + NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, + CountThreshold: 100, + CountPerSenderThreshold: math.MaxUint32, + NumItemsToPreemptivelyEvict: 1, + } - addManyTransactionsWithUniformDistribution(cache, 11, 10) - require.LessOrEqual(t, cache.CountTx(), uint64(100)) + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + require.NotNil(t, cache) - config = ConfigSourceMe{ - Name: "untitled", - NumChunks: 16, - EvictionEnabled: true, - NumBytesThreshold: maxNumBytesUpperBound, - NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, - CountThreshold: 250000, - CountPerSenderThreshold: math.MaxUint32, - } + addManyTransactionsWithUniformDistribution(cache, 11, 10) - // 100 * 1000 - cache, err = NewTxCache(config, txGasHandler) - require.Nil(t, err) - require.NotNil(t, cache) + // Eviction happens if the cache capacity is already exceeded, + // but not if the capacity will be exceeded after the addition. + // Thus, for the given value of "NumItemsToPreemptivelyEvict", there will be "countThreshold" + 1 transactions in the cache. + require.Equal(t, 101, int(cache.CountTx())) + }) + + t.Run("numSenders = 11, numTransactions = 10, countThreshold = 100, numItemsToPreemptivelyEvict = 2", func(t *testing.T) { + config := ConfigSourceMe{ + Name: "untitled", + NumChunks: 16, + EvictionEnabled: true, + NumBytesThreshold: maxNumBytesUpperBound, + NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, + CountThreshold: 100, + CountPerSenderThreshold: math.MaxUint32, + NumItemsToPreemptivelyEvict: 2, + } - addManyTransactionsWithUniformDistribution(cache, 100, 1000) - require.LessOrEqual(t, cache.CountTx(), uint64(250000)) + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + require.NotNil(t, cache) + + addManyTransactionsWithUniformDistribution(cache, 11, 10) + require.Equal(t, 100, int(cache.CountTx())) + }) + + t.Run("numSenders = 100, numTransactions = 1000, countThreshold = 250000 (no eviction)", func(t *testing.T) { + config := ConfigSourceMe{ + Name: "untitled", + NumChunks: 16, + EvictionEnabled: true, + NumBytesThreshold: maxNumBytesUpperBound, + NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, + CountThreshold: 250000, + CountPerSenderThreshold: math.MaxUint32, + NumItemsToPreemptivelyEvict: 1, + } + + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + require.NotNil(t, cache) + + addManyTransactionsWithUniformDistribution(cache, 100, 1000) + require.Equal(t, 100000, int(cache.CountTx())) + }) + + t.Run("numSenders = 1000, numTransactions = 500, countThreshold = 250000, NumItemsToPreemptivelyEvict = 50000", func(t *testing.T) { + config := ConfigSourceMe{ + Name: "untitled", + NumChunks: 16, + EvictionEnabled: true, + NumBytesThreshold: maxNumBytesUpperBound, + NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, + CountThreshold: 250000, + CountPerSenderThreshold: math.MaxUint32, + NumItemsToPreemptivelyEvict: 50000, + } + + cache, err := NewTxCache(config, txGasHandler) + require.Nil(t, err) + require.NotNil(t, cache) + + addManyTransactionsWithUniformDistribution(cache, 1000, 500) + require.Equal(t, 250000, int(cache.CountTx())) + }) } func Test_NotImplementedFunctions(t *testing.T) {