Skip to content

Commit

Permalink
Reimplement eviction, using min heap.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreibancioiu committed Nov 1, 2024
1 parent 7565d6e commit a6faf2e
Showing 1 changed file with 58 additions and 28 deletions.
86 changes: 58 additions & 28 deletions txcache/eviction.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package txcache

import (
"container/heap"

"github.com/multiversx/mx-chain-core-go/core"
)

// evictionJournal keeps a short journal about the eviction process
// This is useful for debugging and reasoning about the eviction
type evictionJournal struct {
numTxs int
numPasses int
numEvicted int
numEvictedByPass []int
}

// doEviction does cache eviction.
Expand Down Expand Up @@ -51,7 +53,7 @@ func (cache *TxCache) doEviction() *evictionJournal {
"num now", cache.CountTx(),
"num senders", cache.CountSenders(),
"duration", stopWatch.GetMeasurement("eviction"),
"evicted txs", evictionJournal.numTxs,
"evicted txs", evictionJournal.numEvicted,
)

return evictionJournal
Expand Down Expand Up @@ -80,47 +82,75 @@ func (cache *TxCache) areThereTooManyTxs() bool {
return tooManyTxs
}

// Eviction tolerates concurrent transaction additions / removals.
func (cache *TxCache) evictLeastLikelyToSelectTransactions() *evictionJournal {
senders := cache.getSenders()
bunches := make([]BunchOfTransactions, 0, len(senders))

for _, sender := range senders {
// Include transactions after gaps, as well (important), unlike when selecting transactions for processing.
bunches = append(bunches, sender.getTxs())
}

transactions := mergeBunchesInParallel(bunches, numJobsForMerging)
transactionsHashes := make([][]byte, len(transactions))
bunch := sender.getTxs()
// Reverse the order of transactions (will come in handy later, when creating the min-heap).
reverseSlice(bunch)

for i, tx := range transactions {
transactionsHashes[i] = tx.TxHash
bunches = append(bunches, bunch)
}

journal := &evictionJournal{}

for pass := 1; cache.isCapacityExceeded(); pass++ {
cutoffIndex := len(transactions) - int(cache.config.NumItemsToPreemptivelyEvict)*pass
if cutoffIndex <= 0 {
cutoffIndex = 0
// Heap is reused among passes.
// Items popped from the heap are added to "transactionsToEvict" (slice is re-created in each pass).
transactionsHeap := &TransactionsMinHeap{}
heap.Init(transactionsHeap)

// Initialize the heap with the first transaction of each bunch
for i, bunch := range bunches {
if len(bunch) == 0 {
// Some senders may have no transaction anymore (hazardous concurrent removals).
continue
}

transactionsToEvict := transactions[cutoffIndex:]
transactionsToEvictHashes := transactionsHashes[cutoffIndex:]
// Items will be reused (see below). Each sender gets one (and only one) item in the heap.
heap.Push(transactionsHeap, &TransactionsHeapItem{
senderIndex: i,
transactionIndex: 0,
transaction: bunch[0],
})
}

for pass := 0; cache.isCapacityExceeded(); pass++ {
transactionsToEvict := make(BunchOfTransactions, 0, cache.config.NumItemsToPreemptivelyEvict)
transactionsToEvictHashes := make([][]byte, 0, cache.config.NumItemsToPreemptivelyEvict)

// Select transactions (sorted).
for transactionsHeap.Len() > 0 {
// Always pick the "worst" transaction.
item := heap.Pop(transactionsHeap).(*TransactionsHeapItem)

if len(transactionsToEvict) >= int(cache.config.NumItemsToPreemptivelyEvict) {
// We have enough transactions to evict in this pass.
break
}

transactions = transactions[:cutoffIndex]
transactionsHashes = transactionsHashes[:cutoffIndex]
transactionsToEvict = append(transactionsToEvict, item.transaction)
transactionsToEvictHashes = append(transactionsToEvictHashes, item.transaction.TxHash)

// If there are more transactions in the same bunch (same sender as the popped item),
// add the next one to the heap (to compete with the others in being "the worst").
item.transactionIndex++

if item.transactionIndex < len(bunches[item.senderIndex]) {
// Item is reused (same originating sender), pushed back on the heap.
item.transaction = bunches[item.senderIndex][item.transactionIndex]
heap.Push(transactionsHeap, item)
}
}

// For each sender, find the "lowest" (in nonce) transaction to evict.
lowestToEvictBySender := make(map[string]uint64)

for _, tx := range transactionsToEvict {
transactionsToEvictHashes = append(transactionsToEvictHashes, tx.TxHash)
sender := string(tx.Tx.GetSndAddr())

if _, ok := lowestToEvictBySender[sender]; ok {
continue
}

lowestToEvictBySender[sender] = tx.Tx.GetNonce()
}

Expand All @@ -131,14 +161,14 @@ func (cache *TxCache) evictLeastLikelyToSelectTransactions() *evictionJournal {
continue
}

list.evictTransactionsWithHigherNonces(nonce - 1)
list.evictTransactionsWithHigherOrEqualNonces(nonce)
}

// Remove those transactions from "txByHash".
cache.txByHash.RemoveTxsBulk(transactionsToEvictHashes)
_ = cache.txByHash.RemoveTxsBulk(transactionsToEvictHashes)

journal.numPasses = pass
journal.numTxs += len(transactionsToEvict)
journal.numEvictedByPass = append(journal.numEvictedByPass, len(transactionsToEvict))
journal.numEvicted += len(transactionsToEvict)
}

return journal
Expand Down

0 comments on commit a6faf2e

Please sign in to comment.