Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State expiry mvp0.1: opt performance & opt prune; #125

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
// triggering range compaction. It's a quite arbitrary number but just
// to avoid triggering range compaction because of small deletion.
rangeCompactionThreshold = 100000

FixedPrefixAndAddrSize = 33
)

// Config includes all the configurations for pruning.
Expand Down Expand Up @@ -136,6 +138,9 @@ func NewPruner(db ethdb.Database, config Config, triesInMemory uint64) (*Pruner,

flattenBlockHash := rawdb.ReadCanonicalHash(db, headBlock.NumberU64()-triesInMemory)
flattenBlock := rawdb.ReadHeader(db, flattenBlockHash, headBlock.NumberU64()-triesInMemory)
if flattenBlock == nil {
return nil, fmt.Errorf("cannot find %v depth block, it cannot prune", triesInMemory)
}
return &Pruner{
config: config,
chainHeader: headBlock.Header(),
Expand Down Expand Up @@ -831,7 +836,7 @@ func asyncScanUnExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch ty
return err
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount.Load())
log.Info("Scan unexpired states", "trieNodes", trieCount.Load())
logged = time.Now()
}
}
Expand All @@ -850,7 +855,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
logged = time.Now()
)
for item := range expireContractCh {
log.Info("start scan trie expired state", "addrHash", item.Addr, "root", item.Root)
log.Debug("start scan trie expired state", "addrHash", item.Addr, "root", item.Root)
tr, err := trie.New(&trie.ID{
StateRoot: stateRoot,
Owner: item.Addr,
Expand All @@ -866,7 +871,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
return err
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount.Load())
log.Info("Scan unexpired states", "trieNodes", trieCount.Load())
logged = time.Now()
}
}
Expand All @@ -877,6 +882,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type

func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk chan *trie.NodeInfo, bloom *bloomfilter.Filter, scheme string) error {
var (
itemCount = 0
trieCount = 0
epochMetaCount = 0
snapCount = 0
Expand All @@ -891,37 +897,53 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
log.Debug("found expired state", "addr", info.Addr, "path",
hex.EncodeToString(info.Path), "epoch", info.Epoch, "isBranch",
info.IsBranch, "isLeaf", info.IsLeaf)
itemCount++
addr := info.Addr
// delete trie kv
trieCount++
switch scheme {
case rawdb.PathScheme:
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.PathScheme)
trieSize += common.StorageSize(len(val) + 33 + len(info.Path))
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
if len(val) == 0 {
log.Debug("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
} else {
trieCount++
trieSize += common.StorageSize(len(val) + FixedPrefixAndAddrSize + len(info.Path))
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
}
case rawdb.HashScheme:
// hbss has shared kv, so using bloom to filter them out.
if bloom == nil || !bloom.Contains(stateBloomHasher(info.Hash.Bytes())) {
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.HashScheme)
trieSize += common.StorageSize(len(val) + 33)
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
if len(val) == 0 {
log.Debug("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
} else {
trieCount++
trieSize += common.StorageSize(len(val) + FixedPrefixAndAddrSize)
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
}
}
// delete epoch meta
if info.IsBranch {
epochMetaCount++
val := rawdb.ReadEpochMetaPlainState(diskdb, addr, string(info.Path))
epochMetaSize += common.StorageSize(33 + len(info.Path) + len(val))
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
if len(val) == 0 && info.Epoch > types.StateEpoch0 {
log.Debug("cannot find source epochmeta?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
}
if len(val) > 0 {
epochMetaCount++
epochMetaSize += common.StorageSize(FixedPrefixAndAddrSize + len(info.Path) + len(val))
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
}
}
// replace snapshot kv only epoch
if info.IsLeaf {
snapCount++
size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, info.Epoch, scheme)
if err != nil {
log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err)
}
snapSize += common.StorageSize(size)
if size > 0 {
snapCount++
snapSize += common.StorageSize(size)
}
}
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
Expand All @@ -930,7 +952,7 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
batch.Reset()
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount, "trieSize", trieSize,
log.Info("Pruning expired states", "items", itemCount, "trieNodes", trieCount, "trieSize", trieSize,
"SnapKV", snapCount, "SnapKVSize", snapSize, "EpochMeta", epochMetaCount,
"EpochMetaSize", epochMetaSize)
logged = time.Now()
Expand All @@ -942,7 +964,7 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
}
batch.Reset()
}
log.Info("Pruned expired states", "trieNodes", trieCount, "trieSize", trieSize,
log.Info("Pruned expired states", "items", itemCount, "trieNodes", trieCount, "trieSize", trieSize,
"SnapKV", snapCount, "SnapKVSize", snapSize, "EpochMeta", epochMetaCount,
"EpochMetaSize", epochMetaSize, "elapsed", common.PrettyDuration(time.Since(start)))
// Start compactions, will remove the deleted data from the disk immediately.
Expand Down
11 changes: 10 additions & 1 deletion core/state/snapshot/snapshot_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)

// ShrinkExpiredLeaf tool function for snapshot kv prune
Expand All @@ -14,13 +15,21 @@ func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader,
//cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss.
case rawdb.PathScheme:
val := rawdb.ReadStorageSnapshot(reader, accountHash, storageHash)
if len(val) == 0 {
log.Warn("cannot find source snapshot?", "addr", accountHash, "key", storageHash, "epoch", epoch)
return 0, nil
}
valWithEpoch := NewValueWithEpoch(epoch, nil)
enc, err := EncodeValueToRLPBytes(valWithEpoch)
if err != nil {
return 0, err
}
rawdb.WriteStorageSnapshot(writer, accountHash, storageHash, enc)
return int64(65 + len(val)), nil
shrinkSize := len(val) - len(enc)
if shrinkSize < 0 {
shrinkSize = 0
}
return int64(shrinkSize), nil
}
return 0, nil
}
15 changes: 11 additions & 4 deletions core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
)

var (
reviveStorageTrieTimer = metrics.NewRegisteredTimer("state/revivetrie/rt", nil)
reviveTrieTimer = metrics.NewRegisteredTimer("state/revivetrie/rt", nil)
reviveTrieMeter = metrics.NewRegisteredMeter("state/revivetrie", nil)
reviveFromLocalMeter = metrics.NewRegisteredMeter("state/revivetrie/local", nil)
reviveFromRemoteMeter = metrics.NewRegisteredMeter("state/revivetrie/remote", nil)
)

// stateExpiryMeta it contains all state expiry meta for target block
Expand All @@ -33,6 +36,7 @@ func defaultStateExpiryMeta() *stateExpiryMeta {
// fetchExpiredStorageFromRemote request expired state from remote full state node;
func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, root common.Hash, tr Trie, prefixKey []byte, key common.Hash) (map[string][]byte, error) {
log.Debug("fetching expired storage from remoteDB", "addr", addr, "prefix", prefixKey, "key", key)
reviveTrieMeter.Mark(1)
if meta.enableLocalRevive {
// if there need revive expired state, try to revive locally, when the node is not being pruned, just renew the epoch
val, err := tr.TryLocalRevive(addr, key.Bytes())
Expand All @@ -44,6 +48,7 @@ func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, r
case *trie.MissingNodeError:
// cannot revive locally, request from remote
case nil:
reviveFromLocalMeter.Mark(1)
ret := make(map[string][]byte, 1)
ret[key.String()] = val
return ret, nil
Expand All @@ -52,6 +57,7 @@ func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, r
}
}

reviveFromRemoteMeter.Mark(1)
// cannot revive locally, fetch remote proof
proofs, err := meta.fullStateDB.GetStorageReviveProof(meta.originalRoot, addr, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])})
log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "key", key, "proofs", len(proofs), "err", err)
Expand All @@ -69,7 +75,7 @@ func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, r

// batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes.
func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Address, root common.Hash, tr Trie, prefixKeys [][]byte, keys []common.Hash) ([]map[string][]byte, error) {

reviveTrieMeter.Mark(int64(len(keys)))
ret := make([]map[string][]byte, len(keys))
prefixKeysStr := make([]string, len(prefixKeys))
keysStr := make([]string, len(keys))
Expand All @@ -95,7 +101,7 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
return nil, err
}
}

reviveFromLocalMeter.Mark(int64(len(keys) - len(expiredKeys)))
for i, prefix := range expiredPrefixKeys {
prefixKeysStr[i] = common.Bytes2Hex(prefix)
}
Expand All @@ -117,6 +123,7 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
}

// cannot revive locally, fetch remote proof
reviveFromRemoteMeter.Mark(int64(len(keysStr)))
proofs, err := expiryMeta.fullStateDB.GetStorageReviveProof(expiryMeta.originalRoot, addr, root, prefixKeysStr, keysStr)
log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "keys", keysStr, "prefixKeys", prefixKeysStr, "proofs", len(proofs), "err", err)
if err != nil {
Expand Down Expand Up @@ -144,7 +151,7 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
// ReviveStorageTrie revive trie's expired state from proof
func ReviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
defer func(start time.Time) {
reviveStorageTrieTimer.Update(time.Since(start))
reviveTrieTimer.Update(time.Since(start))
}(time.Now())

// Decode keys and proofs
Expand Down
13 changes: 10 additions & 3 deletions trie/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,25 @@ func (c *committer) store(path []byte, n node) node {
}
// Collect the dirty node to nodeset for return.
nhash := common.BytesToHash(hash)
c.nodes.AddNode(path, trienode.New(nhash, nodeToBytes(n)))
blob := nodeToBytes(n)
changed := c.tracer.checkNodeChanged(path, blob)
if changed {
c.nodes.AddNode(path, trienode.New(nhash, blob))
}
if c.enableStateExpiry {
switch n := n.(type) {
case *fullNode:
c.nodes.AddBranchNodeEpochMeta(path, epochmeta.NewBranchNodeEpochMeta(n.EpochMap))
metaBlob := epochmeta.BranchMeta2Bytes(epochmeta.NewBranchNodeEpochMeta(n.EpochMap))
if c.tracer.checkEpochMetaChanged(path, metaBlob) {
c.nodes.AddBranchNodeEpochMeta(path, metaBlob)
}
}
}

// Collect the corresponding leaf node if it's required. We don't check
// full node since it's impossible to store value in fullNode. The key
// length of leaves should be exactly same.
if c.collectLeaf {
if changed && c.collectLeaf {
if sn, ok := n.(*shortNode); ok {
if val, ok := sn.Val.(valueNode); ok {
c.nodes.AddLeaf(nhash, val)
Expand Down
16 changes: 16 additions & 0 deletions trie/epochmeta/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,19 @@ func (s *Reader) Get(addr common.Hash, path string) ([]byte, error) {
metaAccessMeter.Mark(1)
return s.snap.EpochMeta(addr, path)
}

func BranchMeta2Bytes(meta *BranchNodeEpochMeta) []byte {
if meta == nil || *meta == (BranchNodeEpochMeta{}) {
return []byte{}
}
buf := rlp.NewEncoderBuffer(nil)
meta.Encode(buf)
return buf.ToBytes()
}

func AccountMeta2Bytes(meta types.StateMeta) ([]byte, error) {
if meta == nil {
return []byte{}, nil
}
return meta.EncodeToRLPBytes()
}
2 changes: 1 addition & 1 deletion trie/epochmeta/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
defaultDiskLayerCacheSize = 100000
defaultDiskLayerCacheSize = 1024000
)

type diskLayer struct {
Expand Down
2 changes: 1 addition & 1 deletion trie/inspect_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
trie *Trie // traverse trie
blocknum uint64
root node // root of triedb
num uint64 // block number

Check failure on line 39 in trie/inspect_trie.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.19.x, ubuntu-latest)

field `num` is unused (unused)
result *TotalTrieTreeStat // inspector result
totalNum uint64
concurrentQueue chan struct{}
Expand Down Expand Up @@ -236,7 +236,7 @@
}
if len(inspect.concurrentQueue)*2 < cap(inspect.concurrentQueue) {
inspect.wg.Add(1)
go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copyNewSlice(path, []byte{byte(idx)}))
go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copy2NewBytes(path, []byte{byte(idx)}))
} else {
inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, append(path, byte(idx)))
}
Expand Down
12 changes: 9 additions & 3 deletions trie/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func (m *MPTProofCache) VerifyProof() error {
prefix := m.RootKeyHex
for i := 0; i < len(m.cacheNodes); i++ {
if i-1 >= 0 {
prefix = copyNewSlice(prefix, m.cacheHexPath[i-1])
prefix = copy2NewBytes(prefix, m.cacheHexPath[i-1])
}
// prefix = append(prefix, m.cacheHexPath[i]...)
n1 := m.cacheNodes[i]
Expand All @@ -938,7 +938,7 @@ func (m *MPTProofCache) VerifyProof() error {
}
if merge {
i++
prefix = copyNewSlice(prefix, m.cacheHexPath[i-1])
prefix = copy2NewBytes(prefix, m.cacheHexPath[i-1])
nub.n2 = m.cacheNodes[i]
nub.n2PrefixKey = prefix
}
Expand All @@ -948,13 +948,19 @@ func (m *MPTProofCache) VerifyProof() error {
return nil
}

func copyNewSlice(s1, s2 []byte) []byte {
func copy2NewBytes(s1, s2 []byte) []byte {
ret := make([]byte, len(s1)+len(s2))
copy(ret, s1)
copy(ret[len(s1):], s2)
return ret
}

func renewBytes(s []byte) []byte {
ret := make([]byte, len(s))
copy(ret, s)
return ret
}

func (m *MPTProofCache) CacheNubs() []*MPTProofNub {
return m.cacheNubs
}
Expand Down
Loading
Loading