Skip to content

Commit

Permalink
Fix eviction, add some tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreibancioiu committed Nov 1, 2024
1 parent 71de44f commit 9099c66
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 73 deletions.
24 changes: 13 additions & 11 deletions txcache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ const maxNumBytesUpperBound = 1_073_741_824 // one GB
const maxNumItemsPerSenderLowerBound = 1
const maxNumBytesPerSenderLowerBound = maxNumItemsPerSenderLowerBound * 1
const maxNumBytesPerSenderUpperBound = 33_554_432 // 32 MB
const numTxsToPreemptivelyEvictLowerBound = 1
const numItemsToPreemptivelyEvictLowerBound = 1

// ConfigSourceMe holds cache configuration
type ConfigSourceMe struct {
Name string
NumChunks uint32
EvictionEnabled bool
NumBytesThreshold uint32
NumBytesPerSenderThreshold uint32
CountThreshold uint32
CountPerSenderThreshold uint32
Name string
NumChunks uint32
EvictionEnabled bool
NumBytesThreshold uint32
NumBytesPerSenderThreshold uint32
CountThreshold uint32
CountPerSenderThreshold uint32
NumItemsToPreemptivelyEvict uint32
}

type senderConstraints struct {
maxNumTxs uint32
maxNumBytes uint32
}

// TODO: Upon further analysis and brainstorming, add some sensible minimum accepted values for the appropriate fields.
func (config *ConfigSourceMe) verify() error {
if len(config.Name) == 0 {
return fmt.Errorf("%w: config.Name is invalid", common.ErrInvalidConfig)
Expand All @@ -54,6 +54,9 @@ func (config *ConfigSourceMe) verify() error {
if config.CountThreshold < maxNumItemsLowerBound {
return fmt.Errorf("%w: config.CountThreshold is invalid", common.ErrInvalidConfig)
}
if config.NumItemsToPreemptivelyEvict < numItemsToPreemptivelyEvictLowerBound {
return fmt.Errorf("%w: config.NumItemsToPreemptivelyEvict is invalid", common.ErrInvalidConfig)
}
}

return nil
Expand Down Expand Up @@ -85,7 +88,6 @@ type ConfigDestinationMe struct {
NumItemsToPreemptivelyEvict uint32
}

// TODO: Upon further analysis and brainstorming, add some sensible minimum accepted values for the appropriate fields.
func (config *ConfigDestinationMe) verify() error {
if len(config.Name) == 0 {
return fmt.Errorf("%w: config.Name is invalid", common.ErrInvalidConfig)
Expand All @@ -99,7 +101,7 @@ func (config *ConfigDestinationMe) verify() error {
if config.MaxNumBytes < maxNumBytesLowerBound || config.MaxNumBytes > maxNumBytesUpperBound {
return fmt.Errorf("%w: config.MaxNumBytes is invalid", common.ErrInvalidConfig)
}
if config.NumItemsToPreemptivelyEvict < numTxsToPreemptivelyEvictLowerBound {
if config.NumItemsToPreemptivelyEvict < numItemsToPreemptivelyEvictLowerBound {
return fmt.Errorf("%w: config.NumItemsToPreemptivelyEvict is invalid", common.ErrInvalidConfig)
}

Expand Down
81 changes: 49 additions & 32 deletions txcache/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
// evictionJournal keeps a short journal about the eviction process
// This is useful for debugging and reasoning about the eviction
type evictionJournal struct {
numTxs uint32
numTxs int
numPasses int
}

// doEviction does cache eviction
// We do not allow more evictions to start concurrently
// doEviction does cache eviction.
// We do not allow more evictions to start concurrently.
func (cache *TxCache) doEviction() *evictionJournal {
if cache.isEvictionInProgress.IsSet() {
return nil
Expand Down Expand Up @@ -53,11 +54,12 @@ func (cache *TxCache) doEviction() *evictionJournal {
"evicted txs", evictionJournal.numTxs,
)

return &evictionJournal
return evictionJournal
}

func (cache *TxCache) isCapacityExceeded() bool {
return cache.areThereTooManyBytes() || cache.areThereTooManySenders() || cache.areThereTooManyTxs()
exceeded := cache.areThereTooManyBytes() || cache.areThereTooManySenders() || cache.areThereTooManyTxs()
return exceeded
}

func (cache *TxCache) areThereTooManyBytes() bool {
Expand All @@ -78,7 +80,7 @@ func (cache *TxCache) areThereTooManyTxs() bool {
return tooManyTxs
}

func (cache *TxCache) evictLeastLikelyToSelectTransactions() evictionJournal {
func (cache *TxCache) evictLeastLikelyToSelectTransactions() *evictionJournal {
senders := cache.getSenders()
bunches := make([]BunchOfTransactions, 0, len(senders))

Expand All @@ -87,42 +89,57 @@ func (cache *TxCache) evictLeastLikelyToSelectTransactions() evictionJournal {
bunches = append(bunches, sender.getTxs())
}

mergedBunch := mergeBunchesOfTransactionsInParallel(bunches)

// Select a reasonable number of transactions to evict.
transactionsToEvict := mergedBunch[3*len(mergedBunch)/4:]
transactionsToEvictHashes := make([][]byte, len(transactionsToEvict))
transactions := mergeBunchesOfTransactionsInParallel(bunches)
transactionsHashes := make([][]byte, len(transactions))

// For each sender, find the "lowest" (in nonce) transaction to evict.
lowestToEvictBySender := make(map[string]uint64)
for i, tx := range transactions {
transactionsHashes[i] = tx.TxHash
}

for _, tx := range transactionsToEvict {
transactionsToEvictHashes = append(transactionsToEvictHashes, tx.TxHash)
sender := string(tx.Tx.GetSndAddr())
journal := &evictionJournal{}

if _, ok := lowestToEvictBySender[sender]; ok {
continue
for pass := 1; cache.isCapacityExceeded(); pass++ {
cutoffIndex := len(transactions) - int(cache.config.NumItemsToPreemptivelyEvict)*pass
if cutoffIndex <= 0 {
cutoffIndex = 0
}

lowestToEvictBySender[sender] = tx.Tx.GetNonce()
}
transactionsToEvict := transactions[cutoffIndex:]
transactionsToEvictHashes := transactionsHashes[cutoffIndex:]

transactions = transactions[:cutoffIndex]
transactionsHashes = transactionsHashes[:cutoffIndex]

// For each sender, find the "lowest" (in nonce) transaction to evict.
lowestToEvictBySender := make(map[string]uint64)

// Remove those transactions from "txListBySender".
for sender, nonce := range lowestToEvictBySender {
list, ok := cache.txListBySender.getListForSender(sender)
if !ok {
continue
for _, tx := range transactionsToEvict {
transactionsToEvictHashes = append(transactionsToEvictHashes, tx.TxHash)
sender := string(tx.Tx.GetSndAddr())

if _, ok := lowestToEvictBySender[sender]; ok {
continue
}

lowestToEvictBySender[sender] = tx.Tx.GetNonce()
}

list.evictTransactionsWithHigherNonces(nonce - 1)
}
// Remove those transactions from "txListBySender".
for sender, nonce := range lowestToEvictBySender {
list, ok := cache.txListBySender.getListForSender(sender)
if !ok {
continue
}

// Remove those transactions from "txByHash".
cache.txByHash.RemoveTxsBulk(transactionsToEvictHashes)
list.evictTransactionsWithHigherNonces(nonce - 1)
}

// Remove those transactions from "txByHash".
cache.txByHash.RemoveTxsBulk(transactionsToEvictHashes)

evictionJournal := evictionJournal{
numTxs: uint32(len(transactionsToEvict)),
journal.numPasses = pass
journal.numTxs += len(transactionsToEvict)
}

return evictionJournal
return journal
}
109 changes: 79 additions & 30 deletions txcache/txCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,41 +437,90 @@ func Test_Keys(t *testing.T) {

func Test_AddWithEviction_UniformDistributionOfTxsPerSender(t *testing.T) {
txGasHandler := txcachemocks.NewTxGasHandlerMock()
config := ConfigSourceMe{
Name: "untitled",
NumChunks: 16,
EvictionEnabled: true,
NumBytesThreshold: maxNumBytesUpperBound,
NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound,
CountThreshold: 100,
CountPerSenderThreshold: math.MaxUint32,
}

// 11 * 10
cache, err := NewTxCache(config, txGasHandler)
require.Nil(t, err)
require.NotNil(t, cache)
t.Run("numSenders = 11, numTransactions = 10, countThreshold = 100, numItemsToPreemptivelyEvict = 1", func(t *testing.T) {
config := ConfigSourceMe{
Name: "untitled",
NumChunks: 16,
EvictionEnabled: true,
NumBytesThreshold: maxNumBytesUpperBound,
NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound,
CountThreshold: 100,
CountPerSenderThreshold: math.MaxUint32,
NumItemsToPreemptivelyEvict: 1,
}

addManyTransactionsWithUniformDistribution(cache, 11, 10)
require.LessOrEqual(t, cache.CountTx(), uint64(100))
cache, err := NewTxCache(config, txGasHandler)
require.Nil(t, err)
require.NotNil(t, cache)

config = ConfigSourceMe{
Name: "untitled",
NumChunks: 16,
EvictionEnabled: true,
NumBytesThreshold: maxNumBytesUpperBound,
NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound,
CountThreshold: 250000,
CountPerSenderThreshold: math.MaxUint32,
}
addManyTransactionsWithUniformDistribution(cache, 11, 10)

// 100 * 1000
cache, err = NewTxCache(config, txGasHandler)
require.Nil(t, err)
require.NotNil(t, cache)
// Eviction happens if the cache capacity is already exceeded,
// but not if the capacity will be exceeded after the addition.
// Thus, for the given value of "NumItemsToPreemptivelyEvict", there will be "countThreshold" + 1 transactions in the cache.
require.Equal(t, 101, int(cache.CountTx()))
})

t.Run("numSenders = 11, numTransactions = 10, countThreshold = 100, numItemsToPreemptivelyEvict = 2", func(t *testing.T) {
config := ConfigSourceMe{
Name: "untitled",
NumChunks: 16,
EvictionEnabled: true,
NumBytesThreshold: maxNumBytesUpperBound,
NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound,
CountThreshold: 100,
CountPerSenderThreshold: math.MaxUint32,
NumItemsToPreemptivelyEvict: 2,
}

addManyTransactionsWithUniformDistribution(cache, 100, 1000)
require.LessOrEqual(t, cache.CountTx(), uint64(250000))
cache, err := NewTxCache(config, txGasHandler)
require.Nil(t, err)
require.NotNil(t, cache)

addManyTransactionsWithUniformDistribution(cache, 11, 10)
require.Equal(t, 100, int(cache.CountTx()))
})

t.Run("numSenders = 100, numTransactions = 1000, countThreshold = 250000 (no eviction)", func(t *testing.T) {
config := ConfigSourceMe{
Name: "untitled",
NumChunks: 16,
EvictionEnabled: true,
NumBytesThreshold: maxNumBytesUpperBound,
NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound,
CountThreshold: 250000,
CountPerSenderThreshold: math.MaxUint32,
NumItemsToPreemptivelyEvict: 1,
}

cache, err := NewTxCache(config, txGasHandler)
require.Nil(t, err)
require.NotNil(t, cache)

addManyTransactionsWithUniformDistribution(cache, 100, 1000)
require.Equal(t, 100000, int(cache.CountTx()))
})

t.Run("numSenders = 1000, numTransactions = 500, countThreshold = 250000, NumItemsToPreemptivelyEvict = 50000", func(t *testing.T) {
config := ConfigSourceMe{
Name: "untitled",
NumChunks: 16,
EvictionEnabled: true,
NumBytesThreshold: maxNumBytesUpperBound,
NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound,
CountThreshold: 250000,
CountPerSenderThreshold: math.MaxUint32,
NumItemsToPreemptivelyEvict: 50000,
}

cache, err := NewTxCache(config, txGasHandler)
require.Nil(t, err)
require.NotNil(t, cache)

addManyTransactionsWithUniformDistribution(cache, 1000, 500)
require.Equal(t, 250000, int(cache.CountTx()))
})
}

func Test_NotImplementedFunctions(t *testing.T) {
Expand Down

0 comments on commit 9099c66

Please sign in to comment.