Skip to content

Commit

Permalink
feat: insert new slot in snapshot, prevent execution touch trie;
Browse files Browse the repository at this point in the history
feat: reuse prefetcher tire? it could fetch from remoteDB;
feat: revive from local first, then fetch from remote;
  • Loading branch information
0xbundler committed Sep 25, 2023
1 parent 5f30fc7 commit dbf9e33
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 236 deletions.
11 changes: 7 additions & 4 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,20 @@ type Trie interface {
// with the node that proves the absence of the key.
Prove(key []byte, proofDb ethdb.KeyValueWriter) error

// ProvePath generate proof state in trie.
ProvePath(key []byte, path []byte, proofDb ethdb.KeyValueWriter) error
// ProveByPath generate proof state in trie.
ProveByPath(key []byte, path []byte, proofDb ethdb.KeyValueWriter) error

// ReviveTrie revive expired state from proof.
ReviveTrie(key []byte, proof []*trie.MPTProofNub) ([]*trie.MPTProofNub, error)
// TryRevive revive expired state from proof.
TryRevive(key []byte, proof []*trie.MPTProofNub) ([]*trie.MPTProofNub, error)

// SetEpoch set current epoch in trie, it must set in initial period, or it will get error behavior.
SetEpoch(types.StateEpoch)

// Epoch get current epoch in trie
Epoch() types.StateEpoch

// TryLocalRevive it revive using local non-pruned states
TryLocalRevive(addr common.Address, key []byte) ([]byte, error)
}

// NewDatabase creates a backing store for state. The returned database is safe for
Expand Down
24 changes: 23 additions & 1 deletion core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,33 @@ import (

var (
reviveStorageTrieTimer = metrics.NewRegisteredTimer("state/revivetrie/rt", nil)
EnableLocalRevive = true // indicate if using local revive
)

// fetchExpiredStorageFromRemote request expired state from remote full state node;
func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash, addr common.Address, root common.Hash, tr Trie, prefixKey []byte, key common.Hash) (map[string][]byte, error) {
if EnableLocalRevive {
// if there need revive expired state, try to revive locally, when the node is not being pruned, just renew the epoch
val, err := tr.TryLocalRevive(addr, key.Bytes())
log.Debug("fetchExpiredStorageFromRemote TryLocalRevive", "addr", addr, "key", key, "val", val, "err", err)
if _, ok := err.(*trie.MissingNodeError); !ok {
return nil, err
}
switch err.(type) {
case *trie.MissingNodeError:
// cannot revive locally, request from remote
case nil:
ret := make(map[string][]byte, 1)
ret[key.String()] = val
return ret, nil
default:
return nil, err
}
}

// cannot revive locally, fetch remote proof
proofs, err := fullDB.GetStorageReviveProof(stateRoot, addr, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])})
log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "key", key, "proofs", len(proofs), "err", err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -59,7 +81,7 @@ func reviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStoragePr
return nil, err
}

nubs, err := tr.ReviveTrie(key, proofCache.CacheNubs())
nubs, err := tr.TryRevive(key, proofCache.CacheNubs())
if err != nil {
return nil, err
}
Expand Down
122 changes: 85 additions & 37 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package state

import (
"bytes"
"encoding/hex"
"fmt"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -81,10 +82,11 @@ type stateObject struct {
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution, reset for every transaction

// for state expiry feature
pendingReviveTrie Trie // pendingReviveTrie it contains pending revive trie nodes, could update & commit later
pendingReviveState map[string]common.Hash // pendingReviveState for block, when R&W, access revive state first, saved in hash key
pendingAccessedState map[common.Hash]int // pendingAccessedState record which state is accessed(only read now, update/delete/insert will auto update epoch), it will update epoch index late
originStorageEpoch map[common.Hash]types.StateEpoch // originStorageEpoch record origin state epoch, prevent frequency epoch update
pendingReviveTrie Trie // pendingReviveTrie it contains pending revive trie nodes, could update & commit later
pendingReviveState map[string]common.Hash // pendingReviveState for block, when R&W, access revive state first, saved in hash key
pendingAccessedState map[common.Hash]int // pendingAccessedState record which state is accessed(only read now, update/delete/insert will auto update epoch), it will update epoch index late
originStorageEpoch map[common.Hash]types.StateEpoch // originStorageEpoch record origin state epoch, prevent frequency epoch update
pendingFutureReviveState map[common.Hash]int // pendingFutureReviveState record empty state in snapshot. it should preftech first, and allow check in updateTrie

// Cache flags.
dirtyCode bool // true if the code was updated
Expand Down Expand Up @@ -120,18 +122,19 @@ func newObject(db *StateDB, address common.Address, acct *types.StateAccount) *s
}

return &stateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
origin: origin,
data: *acct,
sharedOriginStorage: storageMap,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
pendingReviveState: make(map[string]common.Hash),
pendingAccessedState: make(map[common.Hash]int),
originStorageEpoch: make(map[common.Hash]types.StateEpoch),
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
origin: origin,
data: *acct,
sharedOriginStorage: storageMap,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
pendingReviveState: make(map[string]common.Hash),
pendingAccessedState: make(map[common.Hash]int),
pendingFutureReviveState: make(map[common.Hash]int),
originStorageEpoch: make(map[common.Hash]types.StateEpoch),
}
}

Expand Down Expand Up @@ -264,7 +267,6 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
// If no live objects are available, attempt to use snapshots
var (
enc []byte
sv snapshot.SnapValue
err error
value common.Hash
)
Expand All @@ -274,15 +276,13 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
// handle state expiry situation
if s.db.EnableExpire() {
var dbError error
sv, err, dbError = s.getExpirySnapStorage(key)
enc, err, dbError = s.getExpirySnapStorage(key)
if dbError != nil {
s.db.setError(fmt.Errorf("state expiry getExpirySnapStorage, contract: %v, key: %v, err: %v", s.address, key, dbError))
return common.Hash{}
}
// if query success, just set val, otherwise request from trie
if err == nil && sv != nil {
value.SetBytes(sv.GetVal())
s.originStorageEpoch[key] = sv.GetEpoch()
if len(enc) > 0 {
value.SetBytes(enc)
}
} else {
enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
Expand All @@ -300,7 +300,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
}

// If the snapshot is unavailable or reading from it fails, load from the database.
if s.needLoadFromTrie(err, sv) {
if s.db.snap == nil || err != nil {
getCommittedStorageTrieMeter.Mark(1)
start := time.Now()
var tr Trie
Expand Down Expand Up @@ -383,6 +383,17 @@ func (s *stateObject) finalise(prefetch bool) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}

// try prefetch future revive states
for key := range s.pendingFutureReviveState {
if val, ok := s.dirtyStorage[key]; ok {
if val != s.originStorage[key] {
continue
}
}
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch)
}
Expand Down Expand Up @@ -417,6 +428,8 @@ func (s *stateObject) updateTrie() (Trie, error) {
err error
)
if s.db.EnableExpire() {
// if EnableExpire, just use PendingReviveTrie, but prefetcher.trie is useful too, it warms up the db cache.
// and when no state expired or pruned, it will directly use prefetcher.trie too.
tr, err = s.getPendingReviveTrie()
} else {
tr, err = s.getTrie()
Expand Down Expand Up @@ -458,6 +471,23 @@ func (s *stateObject) updateTrie() (Trie, error) {
wg.Add(1)
go func() {
defer wg.Done()
if s.db.EnableExpire() {
// revive state first, to figure out if there have conflict expiry path or local revive
for key := range s.pendingFutureReviveState {
_, err = tr.GetStorage(s.address, key.Bytes())
if err == nil {
continue
}
enErr, ok := err.(*trie.ExpiredNodeError)
if !ok {
s.db.setError(fmt.Errorf("state object pendingFutureReviveState err, contract: %v, key: %v, err: %v", s.address, key, err))
continue
}
if _, err = fetchExpiredStorageFromRemote(s.db.fullStateDB, s.db.originalRoot, s.address, s.data.Root, tr, enErr.Path, key); err != nil {
s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, err: %v", s.address, key, err))
}
}
}
for key, value := range dirtyStorage {
if len(value) == 0 {
if err := tr.DeleteStorage(s.address, key[:]); err != nil {
Expand Down Expand Up @@ -535,6 +565,9 @@ func (s *stateObject) updateTrie() (Trie, error) {
if len(s.pendingAccessedState) > 0 {
s.pendingAccessedState = make(map[common.Hash]int)
}
if len(s.pendingFutureReviveState) > 0 {
s.pendingFutureReviveState = make(map[common.Hash]int)
}
if len(s.originStorageEpoch) > 0 {
s.originStorageEpoch = make(map[common.Hash]types.StateEpoch)
}
Expand Down Expand Up @@ -682,6 +715,10 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject {
for k, v := range s.pendingAccessedState {
obj.pendingAccessedState[k] = v
}
obj.pendingFutureReviveState = make(map[common.Hash]int, len(s.pendingFutureReviveState))
for k, v := range s.pendingFutureReviveState {
obj.pendingFutureReviveState[k] = v
}
obj.originStorageEpoch = make(map[common.Hash]types.StateEpoch, len(s.originStorageEpoch))
for k, v := range s.originStorageEpoch {
obj.originStorageEpoch[k] = v
Expand Down Expand Up @@ -784,6 +821,16 @@ func (s *stateObject) accessState(key common.Hash) {
}
}

// futureReviveState record future revive state, it will load on prefetcher or updateTrie
func (s *stateObject) futureReviveState(key common.Hash) {
if !s.db.EnableExpire() {
return
}

count := s.pendingFutureReviveState[key]
s.pendingFutureReviveState[key] = count + 1
}

// TODO(0xbundler): add hash key cache later
func (s *stateObject) queryFromReviveState(reviveState map[string]common.Hash, key common.Hash) (common.Hash, bool) {
khash := crypto.HashData(s.db.hasher, key[:])
Expand All @@ -809,14 +856,10 @@ func (s *stateObject) fetchExpiredFromRemote(prefixKey []byte, key common.Hash,
}

kvs, err := fetchExpiredStorageFromRemote(s.db.fullStateDB, s.db.originalRoot, s.address, s.data.Root, tr, prefixKey, key)

if err != nil {
// Keys may not exist in the trie, so they can't be revived.
if _, ok := err.(*trie.KeyDoesNotExistError); ok {
return nil, nil
}
return nil, fmt.Errorf("revive storage trie failed, err: %v", err)
return nil, err
}

for k, v := range kvs {
s.pendingReviveState[k] = common.BytesToHash(v)
}
Expand All @@ -826,7 +869,7 @@ func (s *stateObject) fetchExpiredFromRemote(prefixKey []byte, key common.Hash,
return val.Bytes(), nil
}

func (s *stateObject) getExpirySnapStorage(key common.Hash) (snapshot.SnapValue, error, error) {
func (s *stateObject) getExpirySnapStorage(key common.Hash) ([]byte, error, error) {
enc, err := s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
if err != nil {
return nil, err, nil
Expand All @@ -840,23 +883,28 @@ func (s *stateObject) getExpirySnapStorage(key common.Hash) (snapshot.SnapValue,
}

if val == nil {
// record access empty kv, try touch in updateTrie for duplication
s.futureReviveState(key)
return nil, nil, nil
}

s.originStorageEpoch[key] = val.GetEpoch()
if !types.EpochExpired(val.GetEpoch(), s.db.epoch) {
return val, nil, nil
return val.GetVal(), nil, nil
}

// TODO(0xbundler): if found value not been pruned, just return
//if len(val.GetVal()) > 0 {
// return val, nil, nil
//}
// if found value not been pruned, just return, local revive later
if EnableLocalRevive && len(val.GetVal()) > 0 {
s.futureReviveState(key)
log.Debug("getExpirySnapStorage GetVal", "addr", s.address, "key", key, "val", hex.EncodeToString(val.GetVal()))
return val.GetVal(), nil, nil
}

// handle from remoteDB, if got err just setError, just return to revert in consensus version.
// handle from remoteDB, if got err just setError, or return to revert in consensus version.
valRaw, err := s.fetchExpiredFromRemote(nil, key, true)
if err != nil {
return nil, nil, err
}

return snapshot.NewValueWithEpoch(val.GetEpoch(), valRaw), nil, nil
return valRaw, nil, nil
}
2 changes: 1 addition & 1 deletion internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (s *BlockChainAPI) GetStorageReviveProof(ctx context.Context, stateRoot com

var proof proofList
prefixKey := prefixKeys[i]
if err := storageTrie.ProvePath(crypto.Keccak256(key.Bytes()), prefixKey, &proof); err != nil {
if err := storageTrie.ProveByPath(crypto.Keccak256(key.Bytes()), prefixKey, &proof); err != nil {
return nil, err
}
storageProof[i] = types.ReviveStorageProof{
Expand Down
22 changes: 13 additions & 9 deletions light/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ type odrTrie struct {
trie *trie.Trie
}

func (t *odrTrie) ReviveTrie(key []byte, proof []*trie.MPTProofNub) ([]*trie.MPTProofNub, error) {
panic("not implemented")
}

func (t *odrTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) {
key = crypto.Keccak256(key)
var enc []byte
Expand Down Expand Up @@ -224,10 +220,6 @@ func (t *odrTrie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error {
return errors.New("not implemented, needs client/server interface split")
}

func (t *odrTrie) ProvePath(key []byte, path []byte, proofDb ethdb.KeyValueWriter) error {
return errors.New("not implemented, needs client/server interface split")
}

func (t *odrTrie) Epoch() types.StateEpoch {
return types.StateEpoch0
}
Expand Down Expand Up @@ -259,10 +251,22 @@ func (t *odrTrie) do(key []byte, fn func() error) error {
}
}

func (db *odrTrie) NoTries() bool {
func (t *odrTrie) NoTries() bool {
return false
}

func (t *odrTrie) ProveByPath(key []byte, path []byte, proofDb ethdb.KeyValueWriter) error {
return errors.New("not implemented, needs client/server interface split")
}

func (t *odrTrie) TryRevive(key []byte, proof []*trie.MPTProofNub) ([]*trie.MPTProofNub, error) {
return nil, errors.New("not implemented, needs client/server interface split")
}

func (t *odrTrie) TryLocalRevive(addr common.Address, key []byte) ([]byte, error) {
return nil, errors.New("not implemented, needs client/server interface split")
}

type nodeIterator struct {
trie.NodeIterator
t *odrTrie
Expand Down
10 changes: 7 additions & 3 deletions trie/dummy_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ func (t *EmptyTrie) GetStorageAndUpdateEpoch(addr common.Address, key []byte) ([
return nil, nil
}

func (t *EmptyTrie) ProvePath(key []byte, path []byte, proofDb ethdb.KeyValueWriter) error {
func (t *EmptyTrie) SetEpoch(epoch types.StateEpoch) {
}

func (t *EmptyTrie) ProveByPath(key []byte, path []byte, proofDb ethdb.KeyValueWriter) error {
return nil
}

func (t *EmptyTrie) SetEpoch(epoch types.StateEpoch) {
func (t *EmptyTrie) TryRevive(key []byte, proof []*MPTProofNub) ([]*MPTProofNub, error) {
return nil, nil
}

func (t *EmptyTrie) ReviveTrie(key []byte, proof []*MPTProofNub) ([]*MPTProofNub, error) {
func (t *EmptyTrie) TryLocalRevive(addr common.Address, key []byte) ([]byte, error) {
return nil, nil
}

Expand Down
Loading

0 comments on commit dbf9e33

Please sign in to comment.