Skip to content

Commit

Permalink
pruner: opt size statistic;
Browse files Browse the repository at this point in the history
trie/inspect: opt inspect in PBSS mode;
  • Loading branch information
0xbundler committed Oct 10, 2023
1 parent 9a0723c commit c2f70b9
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 28 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
stats.usedGas += usedGas

dirty, _ := bc.triedb.Size()
stats.report(chain, it.index, dirty, setHead)
stats.report(chain, it.index, dirty, setHead, bc.chainConfig)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
5 changes: 3 additions & 2 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"github.com/ethereum/go-ethereum/params"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -39,7 +40,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool, config *params.ChainConfig) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand All @@ -56,7 +57,7 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor

// Assemble the log context and send it to the logger
context := []interface{}{
"number", end.Number(), "hash", end.Hash(),
"number", end.Number(), "hash", end.Hash(), "stateEpoch", types.GetStateEpoch(config, end.Number()),
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
}
Expand Down
6 changes: 5 additions & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
accountSnaps stat
storageSnaps stat
snapJournal stat
trieJournal stat
preimages stat
bloomBits stat
cliqueSnaps stat
Expand Down Expand Up @@ -691,6 +692,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
epochMetaMetaSize.Add(size)
case bytes.Equal(key, snapshotJournalKey):
snapJournal.Add(size)
case bytes.Equal(key, trieJournalKey):
trieJournal.Add(size)
case bytes.Equal(key, epochMetaSnapshotJournalKey):
epochMetaSnapJournalSize.Add(size)
case bytes.HasPrefix(key, EpochMetaPlainStatePrefix) && len(key) >= (len(EpochMetaPlainStatePrefix)+common.HashLength):
Expand All @@ -702,7 +705,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey,
persistentStateIDKey, snapshotSyncStatusKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down Expand Up @@ -735,6 +738,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},
{"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()},
{"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()},
{"Key-Value store", "Path trie snap journal", trieJournal.Size(), trieJournal.Count()},
{"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()},
{"Key-Value store", "Account snapshot", accountSnaps.Size(), accountSnaps.Count()},
{"Key-Value store", "Storage snapshot", storageSnaps.Size(), storageSnaps.Count()},
Expand Down
36 changes: 28 additions & 8 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/tsdb/fileutil"
Expand Down Expand Up @@ -740,8 +741,13 @@ func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error {
// here are some issues when just delete it from hash-based storage, because it's shared kv in hbss
// but it's ok for pbss.
func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch types.StateEpoch, expireContractCh chan *snapshot.ContractItem, pruneExpiredInDisk chan *trie.NodeInfo) error {
var (
trieCount atomic.Uint64
start = time.Now()
logged = time.Now()
)
for item := range expireContractCh {
log.Info("start scan trie expired state", "addr", item.Addr, "root", item.Root)
log.Info("start scan trie expired state", "addrHash", item.Addr, "root", item.Root)
tr, err := trie.New(&trie.ID{
StateRoot: stateRoot,
Owner: item.Addr,
Expand All @@ -752,11 +758,16 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
return err
}
tr.SetEpoch(epoch)
if err = tr.PruneExpired(pruneExpiredInDisk); err != nil {
if err = tr.PruneExpired(pruneExpiredInDisk, &trieCount); err != nil {
log.Error("asyncScanExpiredInTrie, PruneExpired err", "id", item, "err", err)
return err
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount.Load())
logged = time.Now()
}
}
log.Info("Scan expired states", "trieNodes", trieCount.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
close(pruneExpiredInDisk)
return nil
}
Expand All @@ -780,32 +791,39 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
addr := info.Addr
// delete trie kv
trieCount++
trieSize += common.StorageSize(len(info.Key) + 32)
switch scheme {
case rawdb.PathScheme:
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.PathScheme)
trieSize += common.StorageSize(len(val) + 33 + len(info.Path))
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
case rawdb.HashScheme:
// hbss has shared kv, so using bloom to filter them out.
if !bloom.Contain(info.Hash.Bytes()) {
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.HashScheme)
trieSize += common.StorageSize(len(val) + 33)
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
}
// delete epoch meta
if info.IsBranch {
epochMetaCount++
epochMetaSize += common.StorageSize(32 + len(info.Path) + 32)
val := rawdb.ReadEpochMetaPlainState(diskdb, addr, string(info.Path))
epochMetaSize += common.StorageSize(33 + len(info.Path) + len(val))
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
}
// replace snapshot kv only epoch
if info.IsLeaf {
snapCount++
snapSize += common.StorageSize(32)
if err := snapshot.ShrinkExpiredLeaf(batch, addr, info.Key, info.Epoch, scheme); err != nil {
size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, info.Epoch, scheme)
if err != nil {
log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err)
}
snapSize += common.StorageSize(size)
}
if batch.ValueSize() >= ethdb.IdealBatchSize {
batch.Write()
if err := batch.Write(); err != nil {
log.Error("asyncPruneExpiredStorageInDisk, batch write err", "err", err)
}
batch.Reset()
}
if time.Since(logged) > 8*time.Second {
Expand All @@ -816,7 +834,9 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
}
}
if batch.ValueSize() > 0 {
batch.Write()
if err := batch.Write(); err != nil {
log.Error("asyncPruneExpiredStorageInDisk, batch write err", "err", err)
}
batch.Reset()
}
log.Info("Pruned expired states", "trieNodes", trieCount, "trieSize", trieSize,
Expand Down
10 changes: 6 additions & 4 deletions core/state/snapshot/snapshot_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import (
)

// ShrinkExpiredLeaf tool function for snapshot kv prune
func ShrinkExpiredLeaf(db ethdb.KeyValueWriter, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) error {
func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) (int64, error) {
switch scheme {
case rawdb.HashScheme:
//cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss.
case rawdb.PathScheme:
val := rawdb.ReadStorageSnapshot(reader, accountHash, storageHash)
valWithEpoch := NewValueWithEpoch(epoch, nil)
enc, err := EncodeValueToRLPBytes(valWithEpoch)
if err != nil {
return err
return 0, err
}
rawdb.WriteStorageSnapshot(db, accountHash, storageHash, enc)
rawdb.WriteStorageSnapshot(writer, accountHash, storageHash, enc)
return int64(65 + len(val)), nil
}
return nil
return 0, nil
}
2 changes: 1 addition & 1 deletion core/state/snapshot/snapshot_expire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestShrinkExpiredLeaf(t *testing.T) {
db := memorydb.New()
rawdb.WriteStorageSnapshot(db, accountHash, storageHash1, encodeSnapVal(NewRawValue([]byte("val1"))))

err := ShrinkExpiredLeaf(db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme)
_, err := ShrinkExpiredLeaf(db, db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme)
assert.NoError(t, err)

assert.Equal(t, encodeSnapVal(NewValueWithEpoch(types.StateEpoch0, nil)), rawdb.ReadStorageSnapshot(db, accountHash, storageHash1))
Expand Down
7 changes: 2 additions & 5 deletions trie/inspect_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,19 +229,16 @@ func (inspect *Inspector) ConcurrentTraversal(theTrie *Trie, theTrieTreeStat *Tr
case *shortNode:
path = append(path, current.Key...)
inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, current.Val, height+1, path)
path = path[:len(path)-len(current.Key)]
case *fullNode:
for idx, child := range current.Children {
if child == nil {
continue
}
childPath := path
childPath = append(childPath, byte(idx))
if len(inspect.concurrentQueue)*2 < cap(inspect.concurrentQueue) {
inspect.wg.Add(1)
go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, childPath)
go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copyNewSlice(path, []byte{byte(idx)}))
} else {
inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, childPath)
inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, append(path, byte(idx)))
}
}
case hashNode:
Expand Down
22 changes: 16 additions & 6 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/trie/trienode"
"sync/atomic"
)

var (
Expand Down Expand Up @@ -1430,7 +1431,7 @@ type NodeInfo struct {
}

// PruneExpired traverses the storage trie and prunes all expired nodes.
func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error {
func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo, stats *atomic.Uint64) error {

if !t.enableExpiry {
return nil
Expand All @@ -1444,15 +1445,15 @@ func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error {
if pruneErr := t.recursePruneExpiredNode(n, path, epoch, pruneItemCh); pruneErr != nil {
log.Error("recursePruneExpiredNode err", "Path", path, "err", pruneErr)
}
})
}, stats)
if err != nil {
return err
}

return nil
}

func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, pruner func(n node, path []byte, epoch types.StateEpoch)) error {
func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, pruner func(n node, path []byte, epoch types.StateEpoch), stats *atomic.Uint64) error {
// Upon reaching expired node, it will recursively traverse downwards to all the child nodes
// and collect their hashes. Then, the corresponding key-value pairs will be deleted from the
// database by batches.
Expand All @@ -1463,16 +1464,22 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p

switch n := n.(type) {
case *shortNode:
err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner)
if stats != nil {
stats.Add(1)
}
err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner, stats)
if err != nil {
return err
}
return nil
case *fullNode:
if stats != nil {
stats.Add(1)
}
var err error
// Go through every child and recursively delete expired nodes
for i, child := range n.Children {
err = t.findExpiredSubTree(child, append(path, byte(i)), epoch, pruner)
err = t.findExpiredSubTree(child, append(path, byte(i)), epoch, pruner, stats)
if err != nil {
return err
}
Expand All @@ -1481,14 +1488,17 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p

case hashNode:
resolve, err := t.resolveAndTrack(n, path)
if _, ok := err.(*MissingNodeError); ok {
return nil
}
if err != nil {
return err
}
if err = t.resolveEpochMeta(resolve, epoch, path); err != nil {
return err
}

return t.findExpiredSubTree(resolve, path, epoch, pruner)
return t.findExpiredSubTree(resolve, path, epoch, pruner, stats)
case valueNode:
return nil
case nil:
Expand Down

0 comments on commit c2f70b9

Please sign in to comment.