Skip to content

Commit

Permalink
On nonce notifications, don't remove transactions. On RemoveTxByHash,…
Browse files Browse the repository at this point in the history
… remove those with lower or equal nonces, as well.
  • Loading branch information
andreibancioiu committed Nov 11, 2024
1 parent b45ce82 commit fe86b68
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 208 deletions.
22 changes: 5 additions & 17 deletions txcache/txCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,12 @@ func (cache *TxCache) RemoveTxByHash(txHash []byte) bool {
return false
}

foundInBySender := cache.txListBySender.removeTx(tx)
if !foundInBySender {
// This condition can arise often at high load & eviction, when two go-routines concur to remove the same transaction:
// - A = remove transactions upon commit / final
// - B = remove transactions due to high load (eviction)
//
// - A reaches "RemoveTxByHash()", then "cache.txByHash.removeTx()".
// - B reaches "cache.txByHash.RemoveTxsBulk()"
// - B reaches "cache.txListBySender.RemoveSendersBulk()"
// - A reaches "cache.txListBySender.removeTx()", but sender does not exist anymore
logRemove.Debug("RemoveTxByHash, but !foundInBySender", "tx", txHash)
evicted := cache.txListBySender.removeTxReturnEvicted(tx)
if len(evicted) > 0 {
cache.txByHash.RemoveTxsBulk(evicted)
}

logRemove.Trace("RemoveTxByHash", "tx", txHash)
logRemove.Trace("RemoveTxByHash", "tx", txHash, "len(evicted)", len(evicted))
return true
}

Expand Down Expand Up @@ -285,11 +277,7 @@ func (cache *TxCache) UnRegisterHandler(string) {
// NotifyAccountNonce should be called by external components (such as interceptors and transactions processor)
// in order to inform the cache about initial nonce gap phenomena
func (cache *TxCache) NotifyAccountNonce(accountKey []byte, nonce uint64) {
evicted := cache.txListBySender.notifyAccountNonceReturnEvictedTransactions(accountKey, nonce)

if len(evicted) > 0 {
cache.txByHash.RemoveTxsBulk(evicted)
}
cache.txListBySender.notifyAccountNonce(accountKey, nonce)
}

// ImmunizeTxsAgainstEviction does nothing for this type of cache
Expand Down
50 changes: 2 additions & 48 deletions txcache/txCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/testscommon/txcachemocks"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -217,7 +216,7 @@ func Test_RemoveByTxHash_RemovesFromByHash_WhenMapsInconsistency(t *testing.T) {
cache.AddTx(tx)

// Cause an inconsistency between the two internal maps (theoretically possible in case of misbehaving eviction)
cache.txListBySender.removeTx(tx)
_ = cache.txListBySender.removeTxReturnEvicted(tx)

_ = cache.RemoveTxByHash(txHash)
require.Equal(t, 0, cache.txByHash.backingMap.Count())
Expand Down Expand Up @@ -508,7 +507,7 @@ func TestTxCache_TransactionIsAdded_EvenWhenInternalMapsAreInconsistent(t *testi
func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *testing.T) {
cache := newUnconstrainedCacheToTest()

// A lot of routines concur to add & remove THE FIRST transaction of a sender
// A lot of routines concur to add & remove a transaction
for try := 0; try < 100; try++ {
var wg sync.WaitGroup

Expand Down Expand Up @@ -544,51 +543,6 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t
require.True(t, cache.Has([]byte("alice-x")))
require.Equal(t, []string{"alice-x"}, cache.getHashesForSender("alice"))
}

cache.Clear()

// A lot of routines concur to add & remove subsequent transactions of a sender
cache.AddTx(createTx([]byte("alice-w"), "alice", 41))

for try := 0; try < 100; try++ {
var wg sync.WaitGroup

for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
cache.AddTx(createTx([]byte("alice-x"), "alice", 42))
_ = cache.RemoveTxByHash([]byte("alice-x"))
wg.Done()
}()
}

wg.Wait()

// In this case, there is the slight chance that:
// go A: add to map by hash
// go B: won't add in map by hash, already there
// go A: add to map by sender (existing sender/list)
// go A: remove from map by hash
// go A: remove from map by sender
// go B: add to map by sender (existing sender/list)
// go B: can't remove from map by hash, not found
// go B: won't remove from map by sender (sender unknown)

// Therefore, Alice may have one or two transactions in her list.
require.Equal(t, 1, cache.txByHash.backingMap.Count())
expectedTxsConsistent := []string{"alice-w"}
expectedTxsSlightlyInconsistent := []string{"alice-w", "alice-x"}
actualTxs := cache.getHashesForSender("alice")
require.True(t, assert.ObjectsAreEqual(expectedTxsConsistent, actualTxs) || assert.ObjectsAreEqual(expectedTxsSlightlyInconsistent, actualTxs))

// A further addition works:
cache.AddTx(createTx([]byte("alice-x"), "alice", 42))
require.True(t, cache.Has([]byte("alice-w")))
require.True(t, cache.Has([]byte("alice-x")))
require.Equal(t, []string{"alice-w", "alice-x"}, cache.getHashesForSender("alice"))
}

cache.Clear()
}

func newUnconstrainedCacheToTest() *TxCache {
Expand Down
20 changes: 9 additions & 11 deletions txcache/txListBySenderMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ func (txMap *txListBySenderMap) addSender(sender string) *txListForSender {
return listForSender
}

// removeTx removes a transaction from the map
func (txMap *txListBySenderMap) removeTx(tx *WrappedTransaction) bool {
// removeTxReturnEvicted removes a transaction from the map
func (txMap *txListBySenderMap) removeTxReturnEvicted(tx *WrappedTransaction) [][]byte {
sender := string(tx.Tx.GetSndAddr())

listForSender, ok := txMap.getListForSender(sender)
if !ok {
// This happens when a sender whose transactions were selected for processing is removed from cache in the meantime.
// When it comes to remove one if its transactions due to processing (commited / finalized block), they don't exist in cache anymore.
log.Trace("txListBySenderMap.removeTx detected slight inconsistency: sender of tx not in cache", "tx", tx.TxHash, "sender", []byte(sender))
return false
log.Trace("txListBySenderMap.removeTxReturnEvicted detected slight inconsistency: sender of tx not in cache", "tx", tx.TxHash, "sender", []byte(sender))
return nil
}

isFound := listForSender.RemoveTx(tx)
evicted := listForSender.evictTransactionsWithLowerOrEqualNonces(tx.Tx.GetNonce())
txMap.removeSenderIfEmpty(listForSender)
return isFound
return evicted
}

func (txMap *txListBySenderMap) removeSenderIfEmpty(listForSender *txListForSender) {
Expand Down Expand Up @@ -123,16 +123,14 @@ func (txMap *txListBySenderMap) RemoveSendersBulk(senders []string) uint32 {
return numRemoved
}

func (txMap *txListBySenderMap) notifyAccountNonceReturnEvictedTransactions(accountKey []byte, nonce uint64) [][]byte {
func (txMap *txListBySenderMap) notifyAccountNonce(accountKey []byte, nonce uint64) {
sender := string(accountKey)
listForSender, ok := txMap.getListForSender(sender)
if !ok {
return nil
return
}

evictedTxHashes := listForSender.notifyAccountNonceReturnEvictedTransactions(nonce)
txMap.removeSenderIfEmpty(listForSender)
return evictedTxHashes
listForSender.notifyAccountNonce(nonce)
}

// evictTransactionsWithHigherOrEqualNonces removes transactions with nonces higher or equal to the given nonce.
Expand Down
12 changes: 6 additions & 6 deletions txcache/txListBySenderMap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ func TestSendersMap_RemoveTx_AlsoRemovesSenderWhenNoTransactionLeft(t *testing.T
require.Equal(t, uint64(2), myMap.testGetListForSender("alice").countTx())
require.Equal(t, uint64(1), myMap.testGetListForSender("bob").countTx())

myMap.removeTx(txAlice1)
_ = myMap.removeTxReturnEvicted(txAlice1)
require.Equal(t, int64(2), myMap.counter.Get())
require.Equal(t, uint64(1), myMap.testGetListForSender("alice").countTx())
require.Equal(t, uint64(1), myMap.testGetListForSender("bob").countTx())

myMap.removeTx(txAlice2)
_ = myMap.removeTxReturnEvicted(txAlice2)
// All alice's transactions have been removed now
require.Equal(t, int64(1), myMap.counter.Get())

myMap.removeTx(txBob)
_ = myMap.removeTxReturnEvicted(txBob)
// Also Bob has no more transactions
require.Equal(t, int64(0), myMap.counter.Get())
}
Expand Down Expand Up @@ -100,14 +100,14 @@ func TestSendersMap_notifyAccountNonce(t *testing.T) {
myMap := newSendersMapToTest()

// Discarded notification, since sender not added yet
myMap.notifyAccountNonceReturnEvictedTransactions([]byte("alice"), 42)
myMap.notifyAccountNonce([]byte("alice"), 42)

myMap.addTxReturnEvicted(createTx([]byte("tx-42"), "alice", 42))
_, _ = myMap.addTxReturnEvicted(createTx([]byte("tx-42"), "alice", 42))
alice, _ := myMap.getListForSender("alice")
require.Equal(t, uint64(0), alice.accountNonce.Get())
require.False(t, alice.accountNonceKnown.IsSet())

myMap.notifyAccountNonceReturnEvictedTransactions([]byte("alice"), 42)
myMap.notifyAccountNonce([]byte("alice"), 42)
require.Equal(t, uint64(42), alice.accountNonce.Get())
require.True(t, alice.accountNonceKnown.IsSet())
}
Expand Down
78 changes: 16 additions & 62 deletions txcache/txListForSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,52 +146,11 @@ func (listForSender *txListForSender) findInsertionPlace(incomingTx *WrappedTran
return nil, nil
}

// RemoveTx removes a transaction from the sender's list
func (listForSender *txListForSender) RemoveTx(tx *WrappedTransaction) bool {
// We don't allow concurrent interceptor goroutines to mutate a given sender's list
listForSender.mutex.Lock()
defer listForSender.mutex.Unlock()

marker := listForSender.findListElementWithTx(tx)
isFound := marker != nil
if isFound {
listForSender.items.Remove(marker)
listForSender.onRemovedListElement(marker)
}

return isFound
}

func (listForSender *txListForSender) onRemovedListElement(element *list.Element) {
tx := element.Value.(*WrappedTransaction)
listForSender.totalBytes.Subtract(tx.Size)
}

// This function should only be used in critical section (listForSender.mutex)
func (listForSender *txListForSender) findListElementWithTx(txToFind *WrappedTransaction) *list.Element {
txToFindHash := txToFind.TxHash
txToFindNonce := txToFind.Tx.GetNonce()

for element := listForSender.items.Front(); element != nil; element = element.Next() {
value := element.Value.(*WrappedTransaction)
nonce := value.Tx.GetNonce()

// Optimization: first, compare nonces, then hashes.
if nonce == txToFindNonce {
if bytes.Equal(value.TxHash, txToFindHash) {
return element
}
}

// Optimization: stop search at this point, since the list is sorted by nonce
if nonce > txToFindNonce {
break
}
}

return nil
}

// IsEmpty checks whether the list is empty
func (listForSender *txListForSender) IsEmpty() bool {
return listForSender.countTxWithLock() == 0
Expand Down Expand Up @@ -245,8 +204,14 @@ func (listForSender *txListForSender) getSequentialTxs() []*WrappedTransaction {
isFirstTx := len(result) == 0

if isFirstTx {
// Handle lower nonces.
if accountNonce > nonce {
log.Trace("txListForSender.getSequentialTxs, lower nonce", "sender", listForSender.sender, "nonce", nonce, "accountNonce", accountNonce)
continue
}

// Handle initial gaps.
if accountNonceKnown && accountNonce != nonce {
if accountNonceKnown && accountNonce < nonce {
log.Trace("txListForSender.getSequentialTxs, initial gap", "sender", listForSender.sender, "nonce", nonce, "accountNonce", accountNonce)
break
}
Expand Down Expand Up @@ -282,36 +247,25 @@ func (listForSender *txListForSender) countTxWithLock() uint64 {
return uint64(listForSender.items.Len())
}

// notifyAccountNonceReturnEvictedTransactions sets the known account nonce, removes the transactions with lower nonces, and returns their hashes
func (listForSender *txListForSender) notifyAccountNonceReturnEvictedTransactions(nonce uint64) [][]byte {
// Optimization: if nonce is the same, do nothing (good for heavy load).
if listForSender.accountNonce.Get() == nonce {
logRemove.Trace("notifyAccountNonceReturnEvictedTransactions, nonce is the same", "sender", listForSender.sender, "nonce", nonce)
return nil
}

listForSender.mutex.Lock()
defer listForSender.mutex.Unlock()

// notifyAccountNonce sets the known account nonce, removes the transactions with lower nonces, and returns their hashes
func (listForSender *txListForSender) notifyAccountNonce(nonce uint64) {
listForSender.accountNonce.Set(nonce)
_ = listForSender.accountNonceKnown.SetReturningPrevious()

evicted := listForSender.evictTransactionsWithLowerNoncesNoLockReturnEvicted(nonce)

logRemove.Trace("notifyAccountNonceReturnEvictedTransactions, nonce changed", "sender", listForSender.sender, "nonce", nonce, "num evicted txs", len(evicted))

return evicted
}

// This function should only be used in critical section (listForSender.mutex)
func (listForSender *txListForSender) evictTransactionsWithLowerNoncesNoLockReturnEvicted(givenNonce uint64) [][]byte {
// evictTransactionsWithLowerOrEqualNonces removes transactions with nonces lower or equal to the given nonce
func (listForSender *txListForSender) evictTransactionsWithLowerOrEqualNonces(targetNonce uint64) [][]byte {
evictedTxHashes := make([][]byte, 0)

// We don't allow concurrent goroutines to mutate a given sender's list
listForSender.mutex.Lock()
defer listForSender.mutex.Unlock()

for element := listForSender.items.Front(); element != nil; {
tx := element.Value.(*WrappedTransaction)
txNonce := tx.Tx.GetNonce()

if txNonce >= givenNonce {
if txNonce > targetNonce {
break
}

Expand Down
Loading

0 comments on commit fe86b68

Please sign in to comment.