Skip to content

Commit

Permalink
[core/state] Prefetch Trie Nodes Concurrently (#372)
Browse files Browse the repository at this point in the history
* make prefetcher thread-safe

* outline TODOs

* add more TODOs

* add more TODOs

* update miner to use prefetcher

* remove Copy2

* work on refactoring prefetcher for concurrent use

* keep on keeping on

* spec multi-trie

* limit prefetching across all subfetchers

* making progress

* add functionality to wait

* remove addressed TODOs

* rename sm

* more cleanup

* move term to mt

* more moving around

* move loop into prefetcher

* more cleanup

* outline todo for using a single queue

* working through single channel

* continuing refactor

* make sure to return copy

* change name to trie orchestrator

* more progress on revamp

* skip prefetch if to is nil

* start task processing

* remove unnecessary changes

* add more stop behavior

* add comment for miner prefetcher

* add comments for const

* tests passing locally

* cleanup usage of wg

* track time spent waiting on fetcher

* track skips

* nit on prefetcher

* don't clear state channel

* cleanup restore outstanding requests

* add debugging logs

* remove verbose logging

* ensure inner loop uses different variables

* clean up var naming

* replace panics with logs

* set target task size as fixed

* remove loop checks

* clearer metrics tracking

* use bounded workers

* use cancelable context

* copy meter

* add doneLock

* add more comments

* simplify bounded workers

* track outstanding work

* Stop -> Wait

* allow copies up to maxConcurrentReads

* add largest load metric

* lint

* respect metrics enabled

* track num fetchers

* build works

* fix prefetcher test

* don't clear pending tasks when waiting

* cleanup trie prefetcher scale up

* update tests with parallelism

* use spawner instead of spawn

* fix more tests

* fix shutdown stall

* update default parallelism to 48

* spawn trie per key

* change prefetcher parallelism to 16

* add TODO

* remove context from bounded workers

* remove context from bounded workers

* remove delivery metrics

* handle shutdown in wait

* remove extra locking

* rename var

* update comments around shutdown path

* reorder channel vars

* use processingTasks instead of outstandingRequests

* fix comment on panic

* ensure idle workers are fed first

* try live copy first

* remove unnecessary waitgroup

* honor stop
  • Loading branch information
patrick-ogrady authored Nov 21, 2023
1 parent c609216 commit 25699fc
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 247 deletions.
32 changes: 21 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ type CacheConfig struct {
TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically
TrieDirtyLimit int // Memory limit (MB) at which to block on insert and force a flush of dirty trie nodes to disk
TrieDirtyCommitTarget int // Memory limit (MB) to target for the dirties cache before invoking commit
TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once
CommitInterval uint64 // Commit the trie every [CommitInterval] blocks.
Pruning bool // Whether to disable trie write caching and GC altogether (archive node)
AcceptorQueueLimit int // Blocks to queue before blocking during acceptance
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
PopulateMissingTriesParallelism int // Is the number of readers to use when trying to populate missing tries.
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Expand All @@ -166,14 +167,15 @@ type CacheConfig struct {
}

var DefaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20, // 20% overhead in memory counting (this targets 16 MB)
Pruning: true,
CommitInterval: 4096,
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay
SnapshotLimit: 256,
AcceptedCacheSize: 32,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20, // 20% overhead in memory counting (this targets 16 MB)
TriePrefetcherParallelism: 16,
Pruning: true,
CommitInterval: 4096,
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay
SnapshotLimit: 256,
AcceptedCacheSize: 32,
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -1312,7 +1314,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
Expand Down Expand Up @@ -1687,7 +1689,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
defer func() {
statedb.StopPrefetcher()
}()
Expand Down Expand Up @@ -2094,3 +2096,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
bc.initSnapshot(head)
return nil
}

// CacheConfig returns a reference to [bc.cacheConfig]
//
// This is used by [miner] to set prefetch parallelism
// during block building.
func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}
7 changes: 4 additions & 3 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,10 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
}
engine = dummy.NewFullFaker()
config = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
SnapshotLimit: 0, // Disable snapshot by default
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TriePrefetcherParallelism: 4,
SnapshotLimit: 0, // Disable snapshot by default
}
)
defer engine.Close()
Expand Down
140 changes: 75 additions & 65 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@ import (

var (
archiveConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: false, // Archive mode
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
}

pruningConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
}
)

Expand Down Expand Up @@ -180,12 +182,13 @@ func TestArchiveBlockChainSnapsDisabled(t *testing.T) {
return createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: false, // Archive mode
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -214,13 +217,14 @@ func TestPruningBlockChainSnapsDisabled(t *testing.T) {
return createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -263,13 +267,14 @@ func TestPruningBlockChainUngracefulShutdownSnapsDisabled(t *testing.T) {
blockchain, err := createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -298,13 +303,14 @@ func TestEnableSnapshots(t *testing.T) {
blockchain, err := createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: snapLimit,
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: snapLimit,
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -455,6 +461,7 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) {
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 256,
PopulateMissingTries: &startHeight, // Starting point for re-populating.
Expand Down Expand Up @@ -487,14 +494,15 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
var (
create = func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
blockchain, err := createBlockChain(db, &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 1000, // ensure channel doesn't block
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 1000, // ensure channel doesn't block
}, gspec, lastAcceptedHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -681,14 +689,15 @@ func TestTransactionIndices(t *testing.T) {
}

conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
}

// Init block chain and check all needed indices has been indexed.
Expand Down Expand Up @@ -851,15 +860,16 @@ func TestCanonicalHashMarker(t *testing.T) {

func TestTxLookupBlockChain(t *testing.T) {
cacheConf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
}
createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
return createBlockChain(db, cacheConf, gspec, lastAcceptedHash)
Expand Down
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency)
}
}

Expand Down
Loading

0 comments on commit 25699fc

Please sign in to comment.