From fe86b6841b96ef45441225084d90182dfd154e1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrei=20B=C4=83ncioiu?= Date: Mon, 11 Nov 2024 14:59:11 +0200 Subject: [PATCH] On nonce notifications, don't remove transactions. On RemoveTxByHash, remove those with lower or equal nonces, as well. --- txcache/txCache.go | 22 ++------- txcache/txCache_test.go | 50 +------------------ txcache/txListBySenderMap.go | 20 ++++---- txcache/txListBySenderMap_test.go | 12 ++--- txcache/txListForSender.go | 78 +++++++----------------------- txcache/txListForSender_test.go | 80 +++++++------------------------ 6 files changed, 54 insertions(+), 208 deletions(-) diff --git a/txcache/txCache.go b/txcache/txCache.go index 2b4acc67..140adbef 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -142,20 +142,12 @@ func (cache *TxCache) RemoveTxByHash(txHash []byte) bool { return false } - foundInBySender := cache.txListBySender.removeTx(tx) - if !foundInBySender { - // This condition can arise often at high load & eviction, when two go-routines concur to remove the same transaction: - // - A = remove transactions upon commit / final - // - B = remove transactions due to high load (eviction) - // - // - A reaches "RemoveTxByHash()", then "cache.txByHash.removeTx()". - // - B reaches "cache.txByHash.RemoveTxsBulk()" - // - B reaches "cache.txListBySender.RemoveSendersBulk()" - // - A reaches "cache.txListBySender.removeTx()", but sender does not exist anymore - logRemove.Debug("RemoveTxByHash, but !foundInBySender", "tx", txHash) + evicted := cache.txListBySender.removeTxReturnEvicted(tx) + if len(evicted) > 0 { + cache.txByHash.RemoveTxsBulk(evicted) } - logRemove.Trace("RemoveTxByHash", "tx", txHash) + logRemove.Trace("RemoveTxByHash", "tx", txHash, "len(evicted)", len(evicted)) return true } @@ -285,11 +277,7 @@ func (cache *TxCache) UnRegisterHandler(string) { // NotifyAccountNonce should be called by external components (such as interceptors and transactions processor) // in order to inform the cache about initial nonce gap phenomena func (cache *TxCache) NotifyAccountNonce(accountKey []byte, nonce uint64) { - evicted := cache.txListBySender.notifyAccountNonceReturnEvictedTransactions(accountKey, nonce) - - if len(evicted) > 0 { - cache.txByHash.RemoveTxsBulk(evicted) - } + cache.txListBySender.notifyAccountNonce(accountKey, nonce) } // ImmunizeTxsAgainstEviction does nothing for this type of cache diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 14ed0db2..ed14a6c4 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -13,7 +13,6 @@ import ( "github.com/multiversx/mx-chain-storage-go/common" "github.com/multiversx/mx-chain-storage-go/testscommon/txcachemocks" "github.com/multiversx/mx-chain-storage-go/types" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -217,7 +216,7 @@ func Test_RemoveByTxHash_RemovesFromByHash_WhenMapsInconsistency(t *testing.T) { cache.AddTx(tx) // Cause an inconsistency between the two internal maps (theoretically possible in case of misbehaving eviction) - cache.txListBySender.removeTx(tx) + _ = cache.txListBySender.removeTxReturnEvicted(tx) _ = cache.RemoveTxByHash(txHash) require.Equal(t, 0, cache.txByHash.backingMap.Count()) @@ -508,7 +507,7 @@ func TestTxCache_TransactionIsAdded_EvenWhenInternalMapsAreInconsistent(t *testi func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *testing.T) { cache := newUnconstrainedCacheToTest() - // A lot of routines concur to add & remove THE FIRST transaction of a sender + // A lot of routines concur to add & remove a transaction for try := 0; try < 100; try++ { var wg sync.WaitGroup @@ -544,51 +543,6 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t require.True(t, cache.Has([]byte("alice-x"))) require.Equal(t, []string{"alice-x"}, cache.getHashesForSender("alice")) } - - cache.Clear() - - // A lot of routines concur to add & remove subsequent transactions of a sender - cache.AddTx(createTx([]byte("alice-w"), "alice", 41)) - - for try := 0; try < 100; try++ { - var wg sync.WaitGroup - - for i := 0; i < 50; i++ { - wg.Add(1) - go func() { - cache.AddTx(createTx([]byte("alice-x"), "alice", 42)) - _ = cache.RemoveTxByHash([]byte("alice-x")) - wg.Done() - }() - } - - wg.Wait() - - // In this case, there is the slight chance that: - // go A: add to map by hash - // go B: won't add in map by hash, already there - // go A: add to map by sender (existing sender/list) - // go A: remove from map by hash - // go A: remove from map by sender - // go B: add to map by sender (existing sender/list) - // go B: can't remove from map by hash, not found - // go B: won't remove from map by sender (sender unknown) - - // Therefore, Alice may have one or two transactions in her list. - require.Equal(t, 1, cache.txByHash.backingMap.Count()) - expectedTxsConsistent := []string{"alice-w"} - expectedTxsSlightlyInconsistent := []string{"alice-w", "alice-x"} - actualTxs := cache.getHashesForSender("alice") - require.True(t, assert.ObjectsAreEqual(expectedTxsConsistent, actualTxs) || assert.ObjectsAreEqual(expectedTxsSlightlyInconsistent, actualTxs)) - - // A further addition works: - cache.AddTx(createTx([]byte("alice-x"), "alice", 42)) - require.True(t, cache.Has([]byte("alice-w"))) - require.True(t, cache.Has([]byte("alice-x"))) - require.Equal(t, []string{"alice-w", "alice-x"}, cache.getHashesForSender("alice")) - } - - cache.Clear() } func newUnconstrainedCacheToTest() *TxCache { diff --git a/txcache/txListBySenderMap.go b/txcache/txListBySenderMap.go index 4e981bf3..72bfdec3 100644 --- a/txcache/txListBySenderMap.go +++ b/txcache/txListBySenderMap.go @@ -75,21 +75,21 @@ func (txMap *txListBySenderMap) addSender(sender string) *txListForSender { return listForSender } -// removeTx removes a transaction from the map -func (txMap *txListBySenderMap) removeTx(tx *WrappedTransaction) bool { +// removeTxReturnEvicted removes a transaction from the map +func (txMap *txListBySenderMap) removeTxReturnEvicted(tx *WrappedTransaction) [][]byte { sender := string(tx.Tx.GetSndAddr()) listForSender, ok := txMap.getListForSender(sender) if !ok { // This happens when a sender whose transactions were selected for processing is removed from cache in the meantime. // When it comes to remove one if its transactions due to processing (commited / finalized block), they don't exist in cache anymore. - log.Trace("txListBySenderMap.removeTx detected slight inconsistency: sender of tx not in cache", "tx", tx.TxHash, "sender", []byte(sender)) - return false + log.Trace("txListBySenderMap.removeTxReturnEvicted detected slight inconsistency: sender of tx not in cache", "tx", tx.TxHash, "sender", []byte(sender)) + return nil } - isFound := listForSender.RemoveTx(tx) + evicted := listForSender.evictTransactionsWithLowerOrEqualNonces(tx.Tx.GetNonce()) txMap.removeSenderIfEmpty(listForSender) - return isFound + return evicted } func (txMap *txListBySenderMap) removeSenderIfEmpty(listForSender *txListForSender) { @@ -123,16 +123,14 @@ func (txMap *txListBySenderMap) RemoveSendersBulk(senders []string) uint32 { return numRemoved } -func (txMap *txListBySenderMap) notifyAccountNonceReturnEvictedTransactions(accountKey []byte, nonce uint64) [][]byte { +func (txMap *txListBySenderMap) notifyAccountNonce(accountKey []byte, nonce uint64) { sender := string(accountKey) listForSender, ok := txMap.getListForSender(sender) if !ok { - return nil + return } - evictedTxHashes := listForSender.notifyAccountNonceReturnEvictedTransactions(nonce) - txMap.removeSenderIfEmpty(listForSender) - return evictedTxHashes + listForSender.notifyAccountNonce(nonce) } // evictTransactionsWithHigherOrEqualNonces removes transactions with nonces higher or equal to the given nonce. diff --git a/txcache/txListBySenderMap_test.go b/txcache/txListBySenderMap_test.go index 3d0ae10f..3fda916b 100644 --- a/txcache/txListBySenderMap_test.go +++ b/txcache/txListBySenderMap_test.go @@ -33,16 +33,16 @@ func TestSendersMap_RemoveTx_AlsoRemovesSenderWhenNoTransactionLeft(t *testing.T require.Equal(t, uint64(2), myMap.testGetListForSender("alice").countTx()) require.Equal(t, uint64(1), myMap.testGetListForSender("bob").countTx()) - myMap.removeTx(txAlice1) + _ = myMap.removeTxReturnEvicted(txAlice1) require.Equal(t, int64(2), myMap.counter.Get()) require.Equal(t, uint64(1), myMap.testGetListForSender("alice").countTx()) require.Equal(t, uint64(1), myMap.testGetListForSender("bob").countTx()) - myMap.removeTx(txAlice2) + _ = myMap.removeTxReturnEvicted(txAlice2) // All alice's transactions have been removed now require.Equal(t, int64(1), myMap.counter.Get()) - myMap.removeTx(txBob) + _ = myMap.removeTxReturnEvicted(txBob) // Also Bob has no more transactions require.Equal(t, int64(0), myMap.counter.Get()) } @@ -100,14 +100,14 @@ func TestSendersMap_notifyAccountNonce(t *testing.T) { myMap := newSendersMapToTest() // Discarded notification, since sender not added yet - myMap.notifyAccountNonceReturnEvictedTransactions([]byte("alice"), 42) + myMap.notifyAccountNonce([]byte("alice"), 42) - myMap.addTxReturnEvicted(createTx([]byte("tx-42"), "alice", 42)) + _, _ = myMap.addTxReturnEvicted(createTx([]byte("tx-42"), "alice", 42)) alice, _ := myMap.getListForSender("alice") require.Equal(t, uint64(0), alice.accountNonce.Get()) require.False(t, alice.accountNonceKnown.IsSet()) - myMap.notifyAccountNonceReturnEvictedTransactions([]byte("alice"), 42) + myMap.notifyAccountNonce([]byte("alice"), 42) require.Equal(t, uint64(42), alice.accountNonce.Get()) require.True(t, alice.accountNonceKnown.IsSet()) } diff --git a/txcache/txListForSender.go b/txcache/txListForSender.go index 728350bd..1767b277 100644 --- a/txcache/txListForSender.go +++ b/txcache/txListForSender.go @@ -146,52 +146,11 @@ func (listForSender *txListForSender) findInsertionPlace(incomingTx *WrappedTran return nil, nil } -// RemoveTx removes a transaction from the sender's list -func (listForSender *txListForSender) RemoveTx(tx *WrappedTransaction) bool { - // We don't allow concurrent interceptor goroutines to mutate a given sender's list - listForSender.mutex.Lock() - defer listForSender.mutex.Unlock() - - marker := listForSender.findListElementWithTx(tx) - isFound := marker != nil - if isFound { - listForSender.items.Remove(marker) - listForSender.onRemovedListElement(marker) - } - - return isFound -} - func (listForSender *txListForSender) onRemovedListElement(element *list.Element) { tx := element.Value.(*WrappedTransaction) listForSender.totalBytes.Subtract(tx.Size) } -// This function should only be used in critical section (listForSender.mutex) -func (listForSender *txListForSender) findListElementWithTx(txToFind *WrappedTransaction) *list.Element { - txToFindHash := txToFind.TxHash - txToFindNonce := txToFind.Tx.GetNonce() - - for element := listForSender.items.Front(); element != nil; element = element.Next() { - value := element.Value.(*WrappedTransaction) - nonce := value.Tx.GetNonce() - - // Optimization: first, compare nonces, then hashes. - if nonce == txToFindNonce { - if bytes.Equal(value.TxHash, txToFindHash) { - return element - } - } - - // Optimization: stop search at this point, since the list is sorted by nonce - if nonce > txToFindNonce { - break - } - } - - return nil -} - // IsEmpty checks whether the list is empty func (listForSender *txListForSender) IsEmpty() bool { return listForSender.countTxWithLock() == 0 @@ -245,8 +204,14 @@ func (listForSender *txListForSender) getSequentialTxs() []*WrappedTransaction { isFirstTx := len(result) == 0 if isFirstTx { + // Handle lower nonces. + if accountNonce > nonce { + log.Trace("txListForSender.getSequentialTxs, lower nonce", "sender", listForSender.sender, "nonce", nonce, "accountNonce", accountNonce) + continue + } + // Handle initial gaps. - if accountNonceKnown && accountNonce != nonce { + if accountNonceKnown && accountNonce < nonce { log.Trace("txListForSender.getSequentialTxs, initial gap", "sender", listForSender.sender, "nonce", nonce, "accountNonce", accountNonce) break } @@ -282,36 +247,25 @@ func (listForSender *txListForSender) countTxWithLock() uint64 { return uint64(listForSender.items.Len()) } -// notifyAccountNonceReturnEvictedTransactions sets the known account nonce, removes the transactions with lower nonces, and returns their hashes -func (listForSender *txListForSender) notifyAccountNonceReturnEvictedTransactions(nonce uint64) [][]byte { - // Optimization: if nonce is the same, do nothing (good for heavy load). - if listForSender.accountNonce.Get() == nonce { - logRemove.Trace("notifyAccountNonceReturnEvictedTransactions, nonce is the same", "sender", listForSender.sender, "nonce", nonce) - return nil - } - - listForSender.mutex.Lock() - defer listForSender.mutex.Unlock() - +// notifyAccountNonce sets the known account nonce, removes the transactions with lower nonces, and returns their hashes +func (listForSender *txListForSender) notifyAccountNonce(nonce uint64) { listForSender.accountNonce.Set(nonce) _ = listForSender.accountNonceKnown.SetReturningPrevious() - - evicted := listForSender.evictTransactionsWithLowerNoncesNoLockReturnEvicted(nonce) - - logRemove.Trace("notifyAccountNonceReturnEvictedTransactions, nonce changed", "sender", listForSender.sender, "nonce", nonce, "num evicted txs", len(evicted)) - - return evicted } -// This function should only be used in critical section (listForSender.mutex) -func (listForSender *txListForSender) evictTransactionsWithLowerNoncesNoLockReturnEvicted(givenNonce uint64) [][]byte { +// evictTransactionsWithLowerOrEqualNonces removes transactions with nonces lower or equal to the given nonce +func (listForSender *txListForSender) evictTransactionsWithLowerOrEqualNonces(targetNonce uint64) [][]byte { evictedTxHashes := make([][]byte, 0) + // We don't allow concurrent goroutines to mutate a given sender's list + listForSender.mutex.Lock() + defer listForSender.mutex.Unlock() + for element := listForSender.items.Front(); element != nil; { tx := element.Value.(*WrappedTransaction) txNonce := tx.Tx.GetNonce() - if txNonce >= givenNonce { + if txNonce > targetNonce { break } diff --git a/txcache/txListForSender_test.go b/txcache/txListForSender_test.go index 9fdee4d7..3f8b075a 100644 --- a/txcache/txListForSender_test.go +++ b/txcache/txListForSender_test.go @@ -107,74 +107,19 @@ func TestListForSender_AddTx_AppliesSizeConstraintsForNumBytes(t *testing.T) { require.Equal(t, []string{"tx4"}, hashesAsStrings(evicted)) } -func TestListForSender_findTx(t *testing.T) { - list := newUnconstrainedListToTest() - - txA := createTx([]byte("A"), ".", 41) - txANewer := createTx([]byte("ANewer"), ".", 41) - txB := createTx([]byte("B"), ".", 42) - txD := createTx([]byte("none"), ".", 43) - list.AddTx(txA) - list.AddTx(txANewer) - list.AddTx(txB) - - elementWithA := list.findListElementWithTx(txA) - elementWithANewer := list.findListElementWithTx(txANewer) - elementWithB := list.findListElementWithTx(txB) - noElementWithD := list.findListElementWithTx(txD) - - require.NotNil(t, elementWithA) - require.NotNil(t, elementWithANewer) - require.NotNil(t, elementWithB) - - require.Equal(t, txA, elementWithA.Value.(*WrappedTransaction)) - require.Equal(t, txANewer, elementWithANewer.Value.(*WrappedTransaction)) - require.Equal(t, txB, elementWithB.Value.(*WrappedTransaction)) - require.Nil(t, noElementWithD) -} - -func TestListForSender_findTx_CoverNonceComparisonOptimization(t *testing.T) { - list := newUnconstrainedListToTest() - - list.AddTx(createTx([]byte("A"), ".", 42)) - - // Find one with a lower nonce, not added to cache - noElement := list.findListElementWithTx(createTx(nil, ".", 41)) - require.Nil(t, noElement) -} - -func TestListForSender_RemoveTransaction(t *testing.T) { - list := newUnconstrainedListToTest() - tx := createTx([]byte("a"), ".", 1) - - list.AddTx(tx) - require.Equal(t, 1, list.items.Len()) - - list.RemoveTx(tx) - require.Equal(t, 0, list.items.Len()) -} - -func TestListForSender_RemoveTransaction_NoPanicWhenTxMissing(t *testing.T) { - list := newUnconstrainedListToTest() - tx := createTx([]byte(""), ".", 1) - - list.RemoveTx(tx) - require.Equal(t, 0, list.items.Len()) -} - func TestListForSender_NotifyAccountNonce(t *testing.T) { list := newUnconstrainedListToTest() require.Equal(t, uint64(0), list.accountNonce.Get()) require.False(t, list.accountNonceKnown.IsSet()) - list.notifyAccountNonceReturnEvictedTransactions(42) + list.notifyAccountNonce(42) require.Equal(t, uint64(42), list.accountNonce.Get()) require.True(t, list.accountNonceKnown.IsSet()) } -func TestListForSender_evictTransactionsWithLowerNoncesNoLock(t *testing.T) { +func TestListForSender_evictTransactionsWithLowerOrEqualNonces(t *testing.T) { list := newUnconstrainedListToTest() list.AddTx(createTx([]byte("tx-42"), ".", 42)) @@ -184,19 +129,19 @@ func TestListForSender_evictTransactionsWithLowerNoncesNoLock(t *testing.T) { require.Equal(t, 4, list.items.Len()) - list.evictTransactionsWithLowerNoncesNoLockReturnEvicted(43) - require.Equal(t, 3, list.items.Len()) - - list.evictTransactionsWithLowerNoncesNoLockReturnEvicted(44) + _ = list.evictTransactionsWithLowerOrEqualNonces(43) require.Equal(t, 2, list.items.Len()) - list.evictTransactionsWithLowerNoncesNoLockReturnEvicted(99) + _ = list.evictTransactionsWithLowerOrEqualNonces(44) + require.Equal(t, 1, list.items.Len()) + + _ = list.evictTransactionsWithLowerOrEqualNonces(99) require.Equal(t, 0, list.items.Len()) } func TestListForSender_getTxs(t *testing.T) { list := newUnconstrainedListToTest() - list.notifyAccountNonceReturnEvictedTransactions(42) + list.notifyAccountNonce(42) // No transaction, no gap require.Len(t, list.getTxs(), 0) @@ -241,6 +186,13 @@ func TestListForSender_getTxs(t *testing.T) { require.Equal(t, []byte("tx-43++"), list.getTxsReversed()[1].TxHash) require.Equal(t, []byte("tx-42"), list.getTxsReversed()[2].TxHash) require.Equal(t, []byte("tx-42++"), list.getTxsReversed()[3].TxHash) + + // With lower nonces + list.notifyAccountNonce(43) + require.Len(t, list.getTxs(), 4) + require.Len(t, list.getTxsReversed(), 4) + require.Len(t, list.getSequentialTxs(), 1) + require.Equal(t, []byte("tx-43++"), list.getSequentialTxs()[0].TxHash) } func TestListForSender_DetectRaceConditions(t *testing.T) { @@ -255,7 +207,7 @@ func TestListForSender_DetectRaceConditions(t *testing.T) { _ = list.getTxsReversed() _ = list.getSequentialTxs() _ = list.countTxWithLock() - _ = list.notifyAccountNonceReturnEvictedTransactions(42) + list.notifyAccountNonce(42) _, _ = list.AddTx(createTx([]byte("test"), ".", 42)) wg.Done()