diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 3353cfbb1d..0b580a8ee4 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "math" + "math/big" "os" "path/filepath" "sync" @@ -62,6 +63,8 @@ type Config struct { BloomSize uint64 // The Megabytes of memory allocated to bloom-filter Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants + + ParallelStorageTraversal bool // Whether to prune in parallel per account } // Pruner is an offline tool to prune the stale state with the @@ -401,54 +404,75 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl output.Put(data.CodeHash, nil) } if data.Root != (common.Hash{}) { - // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader, - // we already check that when opening state trie and reading the account node - trieID := trie.StorageTrieID(data.Root, key, data.Root) - storageTr, err := trie.NewStateTrie(trieID, sdb.TrieDB()) - if err != nil { - return err - } - err = <-results - if err != nil { - return err + numParallelIteration := int64(1) + if config.ParallelStorageTraversal { + numParallelIteration = int64(32) } - go func() { - threadsRunning.Add(1) - defer threadsRunning.Add(-1) - var err error - defer func() { - results <- err - }() - threadStartedAt := time.Now() - threadLastLog := time.Now() - - storageIt, err := storageTr.NodeIterator(nil) + for i := int64(1); i <= numParallelIteration; i++ { + err = <-results if err != nil { - return + return err } - var processedNodes uint64 - for storageIt.Next(true) { - storageTrieHash := storageIt.Hash() - if storageTrieHash != (common.Hash{}) { - // The inner bloomfilter library has a mutex so concurrency is fine here - err = output.Put(storageTrieHash.Bytes(), nil) - if err != nil { - return + go func(iteration int64) { + threadsRunning.Add(1) + defer threadsRunning.Add(-1) + var processedNodes uint64 + threadStartedAt := time.Now() + threadLastLog := time.Now() + var err error + defer func() { + results <- err + }() + // note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader, + // we already check that when opening state trie and reading the account node + // Need to create a new trie for each iteration, to avoid race conditions. + trieID := trie.StorageTrieID(data.Root, key, data.Root) + var storageTr *trie.StateTrie + storageTr, err = trie.NewStateTrie(trieID, sdb.TrieDB()) + if err != nil { + return + } + var startIt trie.NodeIterator + startIt, err = storageTr.NodeIterator(big.NewInt((iteration - 1) << 3).Bytes()) + if err != nil { + return + } + // Traverse the storage trie, and stop if we reach the end of the trie or the end of the current part + var startItPath, endItPath []byte + + key := trie.KeybytesToHex(big.NewInt((iteration) << 3).Bytes()) + key = key[:len(key)-1] + for startIt.Next(true) { + if iteration != numParallelIteration && bytes.Compare(startIt.Path(), key) > 0 { + break + } + if startItPath == nil { + startItPath = startIt.Path() + } + endItPath = startIt.Path() + storageTrieHash := startIt.Hash() + if storageTrieHash != (common.Hash{}) { + // The inner bloomfilter library has a mutex so concurrency is fine here + err = output.Put(storageTrieHash.Bytes(), nil) + if err != nil { + return + } + } + processedNodes++ + if time.Since(threadLastLog) > 5*time.Minute { + elapsedTotal := time.Since(startedAt) + elapsedThread := time.Since(threadStartedAt) + log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load()) + threadLastLog = time.Now() } } - processedNodes++ - if time.Since(threadLastLog) > 5*time.Minute { - elapsedTotal := time.Since(startedAt) - elapsedThread := time.Since(threadStartedAt) - log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load()) - threadLastLog = time.Now() + err = startIt.Error() + if err != nil { + return } - } - err = storageIt.Error() - if err != nil { - return - } - }() + log.Trace("Finished traversing storage trie", "key", key, "startPath", startItPath, "endPath", endItPath) + }(i) + } } } } diff --git a/trie/encoding.go b/trie/encoding.go index 3284d3f8f0..4e21ee7e77 100644 --- a/trie/encoding.go +++ b/trie/encoding.go @@ -93,6 +93,8 @@ func compactToHex(compact []byte) []byte { return base[chop:] } +func KeybytesToHex(str []byte) []byte { return keybytesToHex(str) } + func keybytesToHex(str []byte) []byte { l := len(str)*2 + 1 var nibbles = make([]byte, l)