diff --git a/core/blockchain.go b/core/blockchain.go index cb67c4005b..6378a3d128 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -148,11 +148,12 @@ type CacheConfig struct { TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically TrieDirtyLimit int // Memory limit (MB) at which to block on insert and force a flush of dirty trie nodes to disk TrieDirtyCommitTarget int // Memory limit (MB) to target for the dirties cache before invoking commit + TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once CommitInterval uint64 // Commit the trie every [CommitInterval] blocks. Pruning bool // Whether to disable trie write caching and GC altogether (archive node) AcceptorQueueLimit int // Blocks to queue before blocking during acceptance PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries. - PopulateMissingTriesParallelism int // Is the number of readers to use when trying to populate missing tries. + PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries. AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory @@ -166,14 +167,15 @@ type CacheConfig struct { } var DefaultCacheConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, // 20% overhead in memory counting (this targets 16 MB) - Pruning: true, - CommitInterval: 4096, - AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay - SnapshotLimit: 256, - AcceptedCacheSize: 32, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, // 20% overhead in memory counting (this targets 16 MB) + TriePrefetcherParallelism: 16, + Pruning: true, + CommitInterval: 4096, + AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay + SnapshotLimit: 256, + AcceptedCacheSize: 32, } // BlockChain represents the canonical chain given a database with a genesis @@ -1312,7 +1314,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain") + statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism) activeState = statedb // If we have a followup block, run that against the current state to pre-cache @@ -1687,7 +1689,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain") + statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism) defer func() { statedb.StopPrefetcher() }() @@ -2094,3 +2096,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { bc.initSnapshot(head) return nil } + +// CacheConfig returns a reference to [bc.cacheConfig] +// +// This is used by [miner] to set prefetch parallelism +// during block building. +func (bc *BlockChain) CacheConfig() *CacheConfig { + return bc.cacheConfig +} diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 4bd07e83ee..3ec47fe72f 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -524,9 +524,10 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { } engine = dummy.NewFullFaker() config = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - SnapshotLimit: 0, // Disable snapshot by default + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TriePrefetcherParallelism: 4, + SnapshotLimit: 0, // Disable snapshot by default } ) defer engine.Close() diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4b18b2dfa0..b7e62586a8 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -28,22 +28,24 @@ import ( var ( archiveConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: false, // Archive mode - SnapshotLimit: 256, - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: false, // Archive mode + SnapshotLimit: 256, + AcceptorQueueLimit: 64, } pruningConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, // Enable pruning - CommitInterval: 4096, - SnapshotLimit: 256, - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, // Enable pruning + CommitInterval: 4096, + SnapshotLimit: 256, + AcceptorQueueLimit: 64, } ) @@ -180,12 +182,13 @@ func TestArchiveBlockChainSnapsDisabled(t *testing.T) { return createBlockChain( db, &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: false, // Archive mode - SnapshotLimit: 0, // Disable snapshots - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: false, // Archive mode + SnapshotLimit: 0, // Disable snapshots + AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -214,13 +217,14 @@ func TestPruningBlockChainSnapsDisabled(t *testing.T) { return createBlockChain( db, &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, // Enable pruning - CommitInterval: 4096, - SnapshotLimit: 0, // Disable snapshots - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, // Enable pruning + CommitInterval: 4096, + SnapshotLimit: 0, // Disable snapshots + AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -263,13 +267,14 @@ func TestPruningBlockChainUngracefulShutdownSnapsDisabled(t *testing.T) { blockchain, err := createBlockChain( db, &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, // Enable pruning - CommitInterval: 4096, - SnapshotLimit: 0, // Disable snapshots - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, // Enable pruning + CommitInterval: 4096, + SnapshotLimit: 0, // Disable snapshots + AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -298,13 +303,14 @@ func TestEnableSnapshots(t *testing.T) { blockchain, err := createBlockChain( db, &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, // Enable pruning - CommitInterval: 4096, - SnapshotLimit: snapLimit, - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, // Enable pruning + CommitInterval: 4096, + SnapshotLimit: snapLimit, + AcceptorQueueLimit: 64, }, gspec, lastAcceptedHash, @@ -455,6 +461,7 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) { TrieCleanLimit: 256, TrieDirtyLimit: 256, TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, Pruning: false, // Archive mode SnapshotLimit: 256, PopulateMissingTries: &startHeight, // Starting point for re-populating. @@ -487,14 +494,15 @@ func TestUngracefulAsyncShutdown(t *testing.T) { var ( create = func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) { blockchain, err := createBlockChain(db, &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, - CommitInterval: 4096, - SnapshotLimit: 256, - SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 1000, // ensure channel doesn't block + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, + CommitInterval: 4096, + SnapshotLimit: 256, + SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails + AcceptorQueueLimit: 1000, // ensure channel doesn't block }, gspec, lastAcceptedHash) if err != nil { return nil, err @@ -681,14 +689,15 @@ func TestTransactionIndices(t *testing.T) { } conf := &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, - CommitInterval: 4096, - SnapshotLimit: 256, - SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 64, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, + CommitInterval: 4096, + SnapshotLimit: 256, + SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails + AcceptorQueueLimit: 64, } // Init block chain and check all needed indices has been indexed. @@ -851,15 +860,16 @@ func TestCanonicalHashMarker(t *testing.T) { func TestTxLookupBlockChain(t *testing.T) { cacheConf := &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieDirtyCommitTarget: 20, - Pruning: true, - CommitInterval: 4096, - SnapshotLimit: 256, - SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails - AcceptorQueueLimit: 64, // ensure channel doesn't block - TxLookupLimit: 5, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 4, + Pruning: true, + CommitInterval: 4096, + SnapshotLimit: 256, + SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails + AcceptorQueueLimit: 64, // ensure channel doesn't block + TxLookupLimit: 5, } createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) { return createBlockChain(db, cacheConf, gspec, lastAcceptedHash) diff --git a/core/state/statedb.go b/core/state/statedb.go index 6c116369b5..c64534b622 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -186,13 +186,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string) { +func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) { if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency) } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index fcd751aae4..e6dabeb0f0 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -28,16 +28,16 @@ package state import ( "sync" + "time" "github.com/ava-labs/coreth/metrics" + "github.com/ava-labs/coreth/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) -var ( - // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. - triePrefetchMetricsPrefix = "trie/prefetch/" -) +// triePrefetchMetricsPrefix is the prefix under which to publish the metrics. +const triePrefetchMetricsPrefix = "trie/prefetch/" // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content @@ -50,63 +50,91 @@ type triePrefetcher struct { fetches map[string]Trie // Partially or fully fetcher tries fetchers map[string]*subfetcher // Subfetchers for each trie - deliveryCopyMissMeter metrics.Meter - deliveryRequestMissMeter metrics.Meter - deliveryWaitMissMeter metrics.Meter + maxConcurrency int + workers *utils.BoundedWorkers + + subfetcherWorkersMeter metrics.Meter + subfetcherWaitTimer metrics.Counter + subfetcherCopiesMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter accountSkipMeter metrics.Meter accountWasteMeter metrics.Meter - storageLoadMeter metrics.Meter - storageDupMeter metrics.Meter - storageSkipMeter metrics.Meter - storageWasteMeter metrics.Meter + + storageFetchersMeter metrics.Meter + storageLoadMeter metrics.Meter + storageLargestLoadMeter metrics.Meter + storageDupMeter metrics.Meter + storageSkipMeter metrics.Meter + storageWasteMeter metrics.Meter } -func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { +func newTriePrefetcher(db Database, root common.Hash, namespace string, maxConcurrency int) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace - p := &triePrefetcher{ + return &triePrefetcher{ db: db, root: root, fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map - deliveryCopyMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss/copy", nil), - deliveryRequestMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss/request", nil), - deliveryWaitMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss/wait", nil), + maxConcurrency: maxConcurrency, + workers: utils.NewBoundedWorkers(maxConcurrency), // Scale up as needed to [maxConcurrency] + + subfetcherWorkersMeter: metrics.GetOrRegisterMeter(prefix+"/subfetcher/workers", nil), + subfetcherWaitTimer: metrics.GetOrRegisterCounter(prefix+"/subfetcher/wait", nil), + subfetcherCopiesMeter: metrics.GetOrRegisterMeter(prefix+"/subfetcher/copies", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), - storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), - storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), - storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), - storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + + storageFetchersMeter: metrics.GetOrRegisterMeter(prefix+"/storage/fetchers", nil), + storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), + storageLargestLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/lload", nil), + storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), + storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), + storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } - return p } // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { + // If the prefetcher is an inactive one, bail out + if p.fetches != nil { + return + } + + // Collect stats from all fetchers + var ( + storageFetchers int64 + largestLoad int64 + ) for _, fetcher := range p.fetchers { - fetcher.abort() // safe to do multiple times + fetcher.abort() // safe to call multiple times (should be a no-op on happy path) if metrics.Enabled { + p.subfetcherCopiesMeter.Mark(int64(fetcher.copies())) + if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) + p.accountSkipMeter.Mark(int64(fetcher.skips())) for _, key := range fetcher.used { delete(fetcher.seen, string(key)) } p.accountWasteMeter.Mark(int64(len(fetcher.seen))) } else { - p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + storageFetchers++ + oseen := int64(len(fetcher.seen)) + if oseen > largestLoad { + largestLoad = oseen + } + p.storageLoadMeter.Mark(oseen) p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + p.storageSkipMeter.Mark(int64(fetcher.skips())) for _, key := range fetcher.used { delete(fetcher.seen, string(key)) @@ -115,6 +143,20 @@ func (p *triePrefetcher) close() { } } } + if metrics.Enabled { + p.storageFetchersMeter.Mark(storageFetchers) + p.storageLargestLoadMeter.Mark(largestLoad) + } + + // Stop all workers once fetchers are aborted (otherwise + // could stop while waiting) + // + // Record number of workers that were spawned during this run + workersUsed := int64(p.workers.Wait()) + if metrics.Enabled { + p.subfetcherWorkersMeter.Mark(workersUsed) + } + // Clear out all fetchers (will crash on a second call, deliberate) p.fetchers = nil } @@ -127,20 +169,23 @@ func (p *triePrefetcher) copy() *triePrefetcher { copy := &triePrefetcher{ db: p.db, root: p.root, - fetches: make(map[string]Trie), // Active prefetchers use the fetches map + fetches: make(map[string]Trie), // Active prefetchers use the fetchers map - deliveryCopyMissMeter: p.deliveryCopyMissMeter, - deliveryRequestMissMeter: p.deliveryRequestMissMeter, - deliveryWaitMissMeter: p.deliveryWaitMissMeter, + subfetcherWorkersMeter: p.subfetcherWorkersMeter, + subfetcherWaitTimer: p.subfetcherWaitTimer, + subfetcherCopiesMeter: p.subfetcherCopiesMeter, accountLoadMeter: p.accountLoadMeter, accountDupMeter: p.accountDupMeter, accountSkipMeter: p.accountSkipMeter, accountWasteMeter: p.accountWasteMeter, - storageLoadMeter: p.storageLoadMeter, - storageDupMeter: p.storageDupMeter, - storageSkipMeter: p.storageSkipMeter, - storageWasteMeter: p.storageWasteMeter, + + storageFetchersMeter: p.storageFetchersMeter, + storageLoadMeter: p.storageLoadMeter, + storageLargestLoadMeter: p.storageLargestLoadMeter, + storageDupMeter: p.storageDupMeter, + storageSkipMeter: p.storageSkipMeter, + storageWasteMeter: p.storageWasteMeter, } // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { @@ -165,11 +210,12 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm if p.fetches != nil { return } + // Active fetcher, schedule the retrievals id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { - fetcher = newSubfetcher(p.db, p.root, owner, root, addr) + fetcher = newSubfetcher(p, owner, root, addr) p.fetchers[id] = fetcher } fetcher.schedule(keys) @@ -183,24 +229,27 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { if p.fetches != nil { trie := p.fetches[id] if trie == nil { - p.deliveryCopyMissMeter.Mark(1) return nil } return p.db.CopyTrie(trie) } + // Otherwise the prefetcher is active, bail if no trie was prefetched for this root fetcher := p.fetchers[id] if fetcher == nil { - p.deliveryRequestMissMeter.Mark(1) return nil } - // Interrupt the prefetcher if it's by any chance still running and return - // a copy of any pre-loaded trie. - fetcher.abort() // safe to do multiple times + // Wait for the fetcher to finish and shutdown orchestrator, if it exists + start := time.Now() + fetcher.wait() + if metrics.Enabled { + p.subfetcherWaitTimer.Inc(time.Since(start).Milliseconds()) + } + + // Return a copy of one of the prefetched tries trie := fetcher.peek() if trie == nil { - p.deliveryWaitMissMeter.Mark(1) return nil } return trie @@ -224,20 +273,15 @@ func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string { // main prefetcher is paused and either all requested items are processed or if // the trie being worked on is retrieved from the prefetcher. type subfetcher struct { + p *triePrefetcher + db Database // Database to load trie nodes through state common.Hash // Root hash of the state to prefetch owner common.Hash // Owner of the trie, usually account hash root common.Hash // Root hash of the trie to prefetch addr common.Address // Address of the account that the trie belongs to - trie Trie // Trie being populated with nodes - - tasks [][]byte // Items queued up for retrieval - lock sync.Mutex // Lock protecting the task queue - wake chan struct{} // Wake channel if a new task is scheduled - stop chan struct{} // Channel to interrupt processing - term chan struct{} // Channel to signal interruption - copy chan chan Trie // Channel to request a copy of the current trie + to *trieOrchestrator // Orchestrate concurrent fetching of a single trie seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks @@ -246,139 +290,346 @@ type subfetcher struct { // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { +func newSubfetcher(p *triePrefetcher, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { sf := &subfetcher{ - db: db, - state: state, + p: p, + db: p.db, + state: p.root, owner: owner, root: root, addr: addr, - wake: make(chan struct{}, 1), - stop: make(chan struct{}), - term: make(chan struct{}), - copy: make(chan chan Trie), seen: make(map[string]struct{}), } - go sf.loop() + sf.to = newTrieOrchestrator(sf) + if sf.to != nil { + go sf.to.processTasks() + } + // We return [sf] here to ensure we don't try to re-create if + // we aren't able to setup a [newTrieOrchestrator] the first time. return sf } // schedule adds a batch of trie keys to the queue to prefetch. +// This should never block, so an array is used instead of a channel. +// +// This is not thread-safe. func (sf *subfetcher) schedule(keys [][]byte) { // Append the tasks to the current queue - sf.lock.Lock() - sf.tasks = append(sf.tasks, keys...) - sf.lock.Unlock() + tasks := make([][]byte, 0, len(keys)) + for _, key := range keys { + // Check if keys already seen + sk := string(key) + if _, ok := sf.seen[sk]; ok { + sf.dups++ + continue + } + sf.seen[sk] = struct{}{} + tasks = append(tasks, key) + } - // Notify the prefetcher, it's fine if it's already terminated - select { - case sf.wake <- struct{}{}: - default: + // After counting keys, exit if they can't be prefetched + if sf.to == nil { + return } + + // Add tasks to queue for prefetching + sf.to.enqueueTasks(tasks) } // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it // is currently. func (sf *subfetcher) peek() Trie { - ch := make(chan Trie) - select { - case sf.copy <- ch: - // Subfetcher still alive, return copy from it - return <-ch + if sf.to == nil { + return nil + } + return sf.to.copyBase() +} - case <-sf.term: - // Subfetcher already terminated, return a copy directly - if sf.trie == nil { - return nil - } - return sf.db.CopyTrie(sf.trie) +// wait must only be called if [triePrefetcher] has not been closed. If this happens, +// workers will not finish. +func (sf *subfetcher) wait() { + if sf.to == nil { + // Unable to open trie + return } + sf.to.wait() } -// abort interrupts the subfetcher immediately. It is safe to call abort multiple -// times but it is not thread safe. func (sf *subfetcher) abort() { - select { - case <-sf.stop: - default: - close(sf.stop) + if sf.to == nil { + // Unable to open trie + return + } + sf.to.abort() +} + +func (sf *subfetcher) skips() int { + if sf.to == nil { + // Unable to open trie + return 0 + } + return sf.to.skipCount() +} + +func (sf *subfetcher) copies() int { + if sf.to == nil { + // Unable to open trie + return 0 } - <-sf.term + return sf.to.copies } -// loop waits for new tasks to be scheduled and keeps loading them until it runs -// out of tasks or its underlying trie is retrieved for committing. -func (sf *subfetcher) loop() { - // No matter how the loop stops, signal anyone waiting that it's terminated - defer close(sf.term) +// trieOrchestrator is not thread-safe. +type trieOrchestrator struct { + sf *subfetcher + + // base is an unmodified Trie we keep for + // creating copies for each worker goroutine. + // + // We care more about quick copies than good copies + // because most (if not all) of the nodes that will be populated + // in the copy will come from the underlying triedb cache. Ones + // that don't come from this cache probably had to be fetched + // from disk anyways. + base Trie + baseLock sync.Mutex + + tasksAllowed bool + skips int // number of tasks skipped + pendingTasks [][]byte + taskLock sync.Mutex + + processingTasks sync.WaitGroup + + wake chan struct{} + stop chan struct{} + stopOnce sync.Once + loopTerm chan struct{} + + copies int + copyChan chan Trie + copySpawner chan struct{} +} +func newTrieOrchestrator(sf *subfetcher) *trieOrchestrator { // Start by opening the trie and stop processing if it fails + var ( + base Trie + err error + ) if sf.owner == (common.Hash{}) { - trie, err := sf.db.OpenTrie(sf.root) + base, err = sf.db.OpenTrie(sf.root) if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return + return nil } - sf.trie = trie } else { - trie, err := sf.db.OpenStorageTrie(sf.state, sf.owner, sf.root) + base, err = sf.db.OpenStorageTrie(sf.state, sf.owner, sf.root) if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return + return nil } - sf.trie = trie } - // Trie opened successfully, keep prefetching items + // Instantiate trieOrchestrator + to := &trieOrchestrator{ + sf: sf, + base: base, + + tasksAllowed: true, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + loopTerm: make(chan struct{}), + + copyChan: make(chan Trie, sf.p.maxConcurrency), + copySpawner: make(chan struct{}, sf.p.maxConcurrency), + } + + // Create initial trie copy + to.copies++ + to.copySpawner <- struct{}{} + to.copyChan <- to.copyBase() + return to +} + +func (to *trieOrchestrator) copyBase() Trie { + to.baseLock.Lock() + defer to.baseLock.Unlock() + + return to.sf.db.CopyTrie(to.base) +} + +func (to *trieOrchestrator) skipCount() int { + to.taskLock.Lock() + defer to.taskLock.Unlock() + + return to.skips +} + +func (to *trieOrchestrator) enqueueTasks(tasks [][]byte) { + to.taskLock.Lock() + defer to.taskLock.Unlock() + + if len(tasks) == 0 { + return + } + + // Add tasks to [pendingTasks] + if !to.tasksAllowed { + to.skips += len(tasks) + return + } + to.processingTasks.Add(len(tasks)) + to.pendingTasks = append(to.pendingTasks, tasks...) + + // Wake up processor + select { + case to.wake <- struct{}{}: + default: + } +} + +func (to *trieOrchestrator) handleStop(remaining int) { + to.taskLock.Lock() + to.skips += remaining + to.taskLock.Unlock() + to.processingTasks.Add(-remaining) +} + +func (to *trieOrchestrator) processTasks() { + defer close(to.loopTerm) + for { + // Determine if we should process or exit select { - case <-sf.wake: - // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock - sf.lock.Lock() - tasks := sf.tasks - sf.tasks = nil - sf.lock.Unlock() - - // Prefetch any tasks until the loop is interrupted - for i, task := range tasks { + case <-to.wake: + case <-to.stop: + return + } + + // Get current tasks + to.taskLock.Lock() + tasks := to.pendingTasks + to.pendingTasks = nil + to.taskLock.Unlock() + + // Enqueue more work as soon as trie copies are available + lt := len(tasks) + for i := 0; i < lt; i++ { + // Try to stop as soon as possible, if channel is closed + remaining := lt - i + select { + case <-to.stop: + to.handleStop(remaining) + return + default: + } + + // Try to create to get an active copy first (select is non-deterministic, + // so we may end up creating a new copy when we don't need to) + var t Trie + select { + case t = <-to.copyChan: + default: + // Wait for an available copy or create one, if we weren't + // able to get a previously created copy select { - case <-sf.stop: - // If termination is requested, add any leftover back and return - sf.lock.Lock() - sf.tasks = append(sf.tasks, tasks[i:]...) - sf.lock.Unlock() + case <-to.stop: + to.handleStop(remaining) return - - case ch := <-sf.copy: - // Somebody wants a copy of the current trie, grant them - ch <- sf.db.CopyTrie(sf.trie) - - default: - // No termination request yet, prefetch the next entry - if _, ok := sf.seen[string(task)]; ok { - sf.dups++ - } else { - var err error - if len(task) == common.AddressLength { - _, err = sf.trie.GetAccount(common.BytesToAddress(task)) - } else { - _, err = sf.trie.GetStorage(sf.addr, task) - } - if err != nil { - log.Error("Trie prefetcher failed fetching", "root", sf.root, "err", err) - } - sf.seen[string(task)] = struct{}{} - } + case t = <-to.copyChan: + case to.copySpawner <- struct{}{}: + to.copies++ + t = to.copyBase() } } - case ch := <-sf.copy: - // Somebody wants a copy of the current trie, grant them - ch <- sf.db.CopyTrie(sf.trie) + // Enqueue work, unless stopped. + fTask := tasks[i] + f := func() { + // Perform task + var err error + if len(fTask) == common.AddressLength { + _, err = t.GetAccount(common.BytesToAddress(fTask)) + } else { + _, err = t.GetStorage(to.sf.addr, fTask) + } + if err != nil { + log.Error("Trie prefetcher failed fetching", "root", to.sf.root, "err", err) + } + to.processingTasks.Done() + + // Return copy when we are done with it, so someone else can use it + // + // channel is buffered and will not block + to.copyChan <- t + } - case <-sf.stop: - // Termination is requested, abort and leave remaining tasks - return + // Enqueue task for processing (may spawn new goroutine + // if not at [maxConcurrency]) + // + // If workers are stopped before calling [Execute], this function may + // panic. + to.sf.p.workers.Execute(f) } } } + +func (to *trieOrchestrator) stopAcceptingTasks() { + to.taskLock.Lock() + defer to.taskLock.Unlock() + + if !to.tasksAllowed { + return + } + to.tasksAllowed = false + + // We don't clear [to.pendingTasks] here because + // it will be faster to prefetch them even though we + // are still waiting. +} + +// wait stops accepting new tasks and waits for ongoing tasks to complete. If +// wait is called, it is not necessary to call [abort]. +// +// It is safe to call wait multiple times. +func (to *trieOrchestrator) wait() { + // Prevent more tasks from being enqueued + to.stopAcceptingTasks() + + // Wait for processing tasks to complete + to.processingTasks.Wait() + + // Stop orchestrator loop + to.stopOnce.Do(func() { + close(to.stop) + }) + <-to.loopTerm +} + +// abort stops any ongoing tasks and shuts down the orchestrator loop. If abort +// is called, it is not necessary to call [wait]. +// +// It is safe to call abort multiple times. +func (to *trieOrchestrator) abort() { + // Prevent more tasks from being enqueued + to.stopAcceptingTasks() + + // Stop orchestrator loop + to.stopOnce.Do(func() { + close(to.stop) + }) + <-to.loopTerm + + // Capture any dangling pending tasks (processTasks + // may exit before enqueing all pendingTasks) + to.taskLock.Lock() + pendingCount := len(to.pendingTasks) + to.skips += pendingCount + to.pendingTasks = nil + to.taskLock.Unlock() + to.processingTasks.Add(-pendingCount) + + // Wait for processing tasks to complete + to.processingTasks.Wait() +} diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index ee119ef673..285a7b16da 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -36,6 +36,8 @@ import ( "github.com/ethereum/go-ethereum/common" ) +const maxConcurrency = 4 + func filledStateDB() *StateDB { state, _ := New(types.EmptyRootHash, NewDatabase(rawdb.NewMemoryDatabase()), nil) @@ -56,7 +58,7 @@ func filledStateDB() *StateDB { func TestCopyAndClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) @@ -81,7 +83,7 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) a := prefetcher.trie(common.Hash{}, db.originalRoot) @@ -97,7 +99,7 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", maxConcurrency) skey := common.HexToHash("aaa") prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}) cpy := prefetcher.copy() diff --git a/eth/backend.go b/eth/backend.go index 7cabb9333f..fa7af4552e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -190,6 +190,7 @@ func New( TrieCleanRejournal: config.TrieCleanRejournal, TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyCommitTarget: config.TrieDirtyCommitTarget, + TriePrefetcherParallelism: config.TriePrefetcherParallelism, Pruning: config.Pruning, AcceptorQueueLimit: config.AcceptorQueueLimit, CommitInterval: config.CommitInterval, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 49dc251ed5..65baaa634e 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -53,18 +53,19 @@ var DefaultConfig = NewDefaultConfig() func NewDefaultConfig() Config { return Config{ - NetworkId: 1, - TrieCleanCache: 512, - TrieDirtyCache: 256, - TrieDirtyCommitTarget: 20, - SnapshotCache: 256, - AcceptedCacheSize: 32, - Miner: miner.Config{}, - TxPool: txpool.DefaultConfig, - RPCGasCap: 25000000, - RPCEVMTimeout: 5 * time.Second, - GPO: DefaultFullGPOConfig, - RPCTxFeeCap: 1, // 1 AVAX + NetworkId: 1, + TrieCleanCache: 512, + TrieDirtyCache: 256, + TrieDirtyCommitTarget: 20, + TriePrefetcherParallelism: 16, + SnapshotCache: 256, + AcceptedCacheSize: 32, + Miner: miner.Config{}, + TxPool: txpool.DefaultConfig, + RPCGasCap: 25000000, + RPCEVMTimeout: 5 * time.Second, + GPO: DefaultFullGPOConfig, + RPCTxFeeCap: 1, // 1 AVAX } } @@ -94,13 +95,14 @@ type Config struct { SkipBcVersionCheck bool `toml:"-"` // TrieDB and snapshot options - TrieCleanCache int - TrieCleanJournal string - TrieCleanRejournal time.Duration - TrieDirtyCache int - TrieDirtyCommitTarget int - SnapshotCache int - Preimages bool + TrieCleanCache int + TrieCleanJournal string + TrieCleanRejournal time.Duration + TrieDirtyCache int + TrieDirtyCommitTarget int + TriePrefetcherParallelism int + SnapshotCache int + Preimages bool // AcceptedCacheSize is the depth of accepted headers cache and accepted // logs cache at the accepted tip. diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index fd460fc338..e337f57f1d 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -87,10 +87,11 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i i // Import the canonical chain cacheConfig := &core.CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - SnapshotLimit: 128, - Pruning: false, // Archive mode + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TriePrefetcherParallelism: 4, + SnapshotLimit: 128, + Pruning: false, // Archive mode } chain, err := core.NewBlockChain(backend.chaindb, cacheConfig, gspec, backend.engine, vm.Config{}, common.Hash{}, false) if err != nil { diff --git a/miner/worker.go b/miner/worker.go index 3569382bb5..60a75345f9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -162,10 +162,20 @@ func (w *worker) commitNewWork() (*types.Block, error) { return nil, fmt.Errorf("failed to prepare header for mining: %w", err) } + // Instantiate stateDB and kick off prefetcher. env, err := w.createCurrentEnvironment(parent, header, tstart) if err != nil { return nil, fmt.Errorf("failed to create new current environment: %w", err) } + + // Ensure we always stop prefetcher after block building is complete. + defer func() { + if env.state == nil { + return + } + env.state.StopPrefetcher() + }() + // Configure any stateful precompiles that should go into effect during this block. w.chainConfig.CheckConfigurePrecompiles(&parent.Time, types.NewBlockWithHeader(header), env.state) @@ -194,10 +204,13 @@ func (w *worker) commitNewWork() (*types.Block, error) { } func (w *worker) createCurrentEnvironment(parent *types.Header, header *types.Header, tstart time.Time) (*environment, error) { + // Retrieve the parent state to apply transactions to and kickoff prefetcher, + // which will retrieve intermediate nodes for all modified state. state, err := w.chain.StateAt(parent.Root) if err != nil { return nil, err } + state.StartPrefetcher("miner", w.eth.BlockChain().CacheConfig().TriePrefetcherParallelism) return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), state: state, diff --git a/plugin/evm/config.go b/plugin/evm/config.go index 7fa3eb33a0..6e861430f8 100644 --- a/plugin/evm/config.go +++ b/plugin/evm/config.go @@ -21,6 +21,7 @@ const ( defaultTrieCleanCache = 512 defaultTrieDirtyCache = 512 defaultTrieDirtyCommitTarget = 20 + defaultTriePrefetcherParallelism = 16 defaultSnapshotCache = 256 defaultSyncableCommitInterval = defaultCommitInterval * 4 defaultSnapshotWait = false @@ -97,12 +98,13 @@ type Config struct { RPCTxFeeCap float64 `json:"rpc-tx-fee-cap"` // Cache settings - TrieCleanCache int `json:"trie-clean-cache"` // Size of the trie clean cache (MB) - TrieCleanJournal string `json:"trie-clean-journal"` // Directory to use to save the trie clean cache (must be populated to enable journaling the trie clean cache) - TrieCleanRejournal Duration `json:"trie-clean-rejournal"` // Frequency to re-journal the trie clean cache to disk (minimum 1 minute, must be populated to enable journaling the trie clean cache) - TrieDirtyCache int `json:"trie-dirty-cache"` // Size of the trie dirty cache (MB) - TrieDirtyCommitTarget int `json:"trie-dirty-commit-target"` // Memory limit to target in the dirty cache before performing a commit (MB) - SnapshotCache int `json:"snapshot-cache"` // Size of the snapshot disk layer clean cache (MB) + TrieCleanCache int `json:"trie-clean-cache"` // Size of the trie clean cache (MB) + TrieCleanJournal string `json:"trie-clean-journal"` // Directory to use to save the trie clean cache (must be populated to enable journaling the trie clean cache) + TrieCleanRejournal Duration `json:"trie-clean-rejournal"` // Frequency to re-journal the trie clean cache to disk (minimum 1 minute, must be populated to enable journaling the trie clean cache) + TrieDirtyCache int `json:"trie-dirty-cache"` // Size of the trie dirty cache (MB) + TrieDirtyCommitTarget int `json:"trie-dirty-commit-target"` // Memory limit to target in the dirty cache before performing a commit (MB) + TriePrefetcherParallelism int `json:"trie-prefetcher-parallelism"` // Max concurrent disk reads trie prefetcher should perform at once + SnapshotCache int `json:"snapshot-cache"` // Size of the snapshot disk layer clean cache (MB) // Eth Settings Preimages bool `json:"preimages-enabled"` @@ -232,6 +234,7 @@ func (c *Config) SetDefaults() { c.TrieCleanCache = defaultTrieCleanCache c.TrieDirtyCache = defaultTrieDirtyCache c.TrieDirtyCommitTarget = defaultTrieDirtyCommitTarget + c.TriePrefetcherParallelism = defaultTriePrefetcherParallelism c.SnapshotCache = defaultSnapshotCache c.AcceptorQueueLimit = defaultAcceptorQueueLimit c.SnapshotWait = defaultSnapshotWait diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 286d5426a6..5947056a28 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -504,6 +504,7 @@ func (vm *VM) Initialize( vm.ethConfig.TrieCleanRejournal = vm.config.TrieCleanRejournal.Duration vm.ethConfig.TrieDirtyCache = vm.config.TrieDirtyCache vm.ethConfig.TrieDirtyCommitTarget = vm.config.TrieDirtyCommitTarget + vm.ethConfig.TriePrefetcherParallelism = vm.config.TriePrefetcherParallelism vm.ethConfig.SnapshotCache = vm.config.SnapshotCache vm.ethConfig.Pruning = vm.config.Pruning vm.ethConfig.AcceptorQueueLimit = vm.config.AcceptorQueueLimit diff --git a/utils/bounded_workers.go b/utils/bounded_workers.go new file mode 100644 index 0000000000..806f923fd4 --- /dev/null +++ b/utils/bounded_workers.go @@ -0,0 +1,81 @@ +// (c) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package utils + +import ( + "sync" + "sync/atomic" +) + +type BoundedWorkers struct { + workerCount atomic.Int32 + workerSpawner chan struct{} + outstandingWorkers sync.WaitGroup + + work chan func() + workClose sync.Once +} + +// NewBoundedWorkers returns an instance of [BoundedWorkers] that +// will spawn up to [max] goroutines. +func NewBoundedWorkers(max int) *BoundedWorkers { + return &BoundedWorkers{ + workerSpawner: make(chan struct{}, max), + work: make(chan func()), + } +} + +// startWorker creates a new goroutine to execute [f] immediately and then keeps the goroutine +// alive to continue executing new work. +func (b *BoundedWorkers) startWorker(f func()) { + b.workerCount.Add(1) + b.outstandingWorkers.Add(1) + + go func() { + defer b.outstandingWorkers.Done() + + if f != nil { + f() + } + for f := range b.work { + f() + } + }() +} + +// Execute the given function on an existing goroutine waiting for more work, a new goroutine, +// or return if the context is canceled. +// +// Execute must not be called after Wait, otherwise it might panic. +func (b *BoundedWorkers) Execute(f func()) { + // Ensure we feed idle workers first + select { + case b.work <- f: + return + default: + } + + // Fallback to waiting for an idle worker or allocating + // a new worker (if we aren't yet at max concurrency) + select { + case b.work <- f: + case b.workerSpawner <- struct{}{}: + b.startWorker(f) + } +} + +// Wait returns after all enqueued work finishes and all goroutines to exit. +// Wait returns the number of workers that were spawned during the run. +// +// Wait can only be called after ALL calls to [Execute] have returned. +// +// It is safe to call Wait multiple times but not safe to call [Execute] +// after [Wait] has been called. +func (b *BoundedWorkers) Wait() int { + b.workClose.Do(func() { + close(b.work) + }) + b.outstandingWorkers.Wait() + return int(b.workerCount.Load()) +}