Skip to content

Commit

Permalink
moved the worker pool on NotifyEviction calls instead of notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Sep 19, 2023
1 parent ac3c529 commit 8bc7cfe
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 15 deletions.
8 changes: 5 additions & 3 deletions txcache/baseTxCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ func (cache *baseTxCache) RegisterEvictionHandler(handler types.EvictionNotifier
return nil
}

// notifyEvictionHandlers will be called on a separate go routine
func (cache *baseTxCache) notifyEvictionHandlers(txHashes [][]byte) {
// enqueueEvictedHashesForNotification will enqueue the provided hashes on the workers pool
func (cache *baseTxCache) enqueueEvictedHashesForNotification(txHashes [][]byte) {
cache.mutEvictionHandlers.RLock()
handlers := make([]types.EvictionNotifier, len(cache.evictionHandlers))
copy(handlers, cache.evictionHandlers)
cache.mutEvictionHandlers.RUnlock()

for _, handler := range handlers {
for _, txHash := range txHashes {
handler.NotifyEviction(txHash)
cache.evictionWorkerPool.Submit(func() {
handler.NotifyEviction(txHash)
})
}
}
}
4 changes: 1 addition & 3 deletions txcache/crossTxCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) {
func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool {
ok := cache.RemoveWithResult(txHash)
if ok {
cache.evictionWorkerPool.Submit(func() {
cache.notifyEvictionHandlers([][]byte{txHash})
})
cache.enqueueEvictedHashesForNotification([][]byte{txHash})
}
return ok
}
Expand Down
5 changes: 2 additions & 3 deletions txcache/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ func (cache *TxCache) areThereTooManyTxs() bool {

// This is called concurrently by two goroutines: the eviction one and the sweeping one
func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) {
cache.evictionWorkerPool.Submit(func() {
cache.notifyEvictionHandlers(txsToEvict)
})
cache.enqueueEvictedHashesForNotification(txsToEvict)

countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict)
countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict)
return
Expand Down
8 changes: 2 additions & 6 deletions txcache/txCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ func (cache *TxCache) AddTx(tx *WrappedTransaction) (ok bool, added bool) {

if len(evicted) > 0 {
cache.monitorEvictionWrtSenderLimit(tx.Tx.GetSndAddr(), evicted)
cache.evictionWorkerPool.Submit(func() {
cache.notifyEvictionHandlers(evicted)
})
cache.enqueueEvictedHashesForNotification(evicted)
cache.txByHash.RemoveTxsBulk(evicted)
}

Expand Down Expand Up @@ -174,9 +172,7 @@ func (cache *TxCache) doAfterSelection() {

// RemoveTxByHash removes tx by hash
func (cache *TxCache) RemoveTxByHash(txHash []byte) bool {
cache.evictionWorkerPool.Submit(func() {
cache.notifyEvictionHandlers([][]byte{txHash})
})
cache.enqueueEvictedHashesForNotification([][]byte{txHash})

cache.mutTxOperation.Lock()
defer cache.mutTxOperation.Unlock()
Expand Down

0 comments on commit 8bc7cfe

Please sign in to comment.