Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snap): add state expiry support to snap sync #121

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, r
return nil, fmt.Errorf("cannot find any revive proof from remoteDB")
}

return reviveStorageTrie(addr, tr, proofs[0], key)
return ReviveStorageTrie(addr, tr, proofs[0], key)
}

// batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes.
Expand Down Expand Up @@ -126,8 +126,8 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
}

for i, proof := range proofs {
// kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate
kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key))
// kvs, err := ReviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate
kvs, err := ReviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key))
if err != nil {
log.Error("reviveStorageTrie failed", "addr", addr, "key", keys[i], "err", err)
continue
Expand All @@ -138,8 +138,8 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
return ret, nil
}

// reviveStorageTrie revive trie's expired state from proof
func reviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
// ReviveStorageTrie revive trie's expired state from proof
func ReviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
defer func(start time.Time) {
reviveStorageTrieTimer.Update(time.Since(start))
}(time.Now())
Expand Down
29 changes: 29 additions & 0 deletions core/state/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,32 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(k
syncer = trie.NewSync(root, database, onAccount, scheme)
return syncer
}

func NewStateSyncWithExpiry(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error, scheme string, epoch types.StateEpoch) *trie.Sync {
// Register the storage slot callback if the external callback is specified.
var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error
if onLeaf != nil {
onSlot = func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
return onLeaf(keys, leaf)
}
}
// Register the account callback to connect the state trie and the storage
// trie belongs to the contract.
var syncer *trie.Sync
onAccount := func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
if onLeaf != nil {
if err := onLeaf(keys, leaf); err != nil {
return err
}
}
var obj types.StateAccount
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
syncer.AddSubTrie(obj.Root, path, parent, parentPath, onSlot)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent, parentPath)
return nil
}
syncer = trie.NewSyncWithEpoch(root, database, onAccount, scheme, epoch)
return syncer
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
DirectBroadcast: config.DirectBroadcast,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
EnableStateExpiry: config.StateExpiryCfg.Enable,
}); err != nil {
return nil, err
}
Expand Down
41 changes: 38 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type Downloader struct {
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync

enableStateExpiry bool

// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
Expand Down Expand Up @@ -207,6 +209,10 @@ type BlockChain interface {
// TrieDB retrieves the low level trie database used for interacting
// with trie nodes.
TrieDB() *trie.Database

Config() *params.ChainConfig

StateExpiryConfig() *types.StateExpiryConfig
}

type DownloadOption func(downloader *Downloader) *Downloader
Expand Down Expand Up @@ -235,6 +241,30 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai
return dl
}

func NewWithExpiry(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, enableStateExpiry bool, dropPeer peerDropFn, options ...DownloadOption) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
enableStateExpiry: enableStateExpiry,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncerWithStateExpiry(stateDb, chain.TrieDB().Scheme(), enableStateExpiry),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
}

go dl.stateFetcher()
return dl
}

// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
Expand Down Expand Up @@ -1464,10 +1494,14 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
func (d *Downloader) processSnapSyncContent() error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
var epoch types.StateEpoch

d.pivotLock.RLock()
sync := d.syncState(d.pivotHeader.Root)
sync := d.syncStateWithEpoch(d.pivotHeader.Root, epoch)
d.pivotLock.RUnlock()

epoch = types.GetStateEpoch(d.blockchain.StateExpiryConfig(), new(big.Int).SetUint64(d.pivotHeader.Number.Uint64()))

defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
Expand Down Expand Up @@ -1516,11 +1550,12 @@ func (d *Downloader) processSnapSyncContent() error {
d.pivotLock.RLock()
pivot := d.pivotHeader
d.pivotLock.RUnlock()
epoch = types.GetStateEpoch(d.blockchain.StateExpiryConfig(), new(big.Int).SetUint64(pivot.Number.Uint64()))

if oldPivot == nil {
if pivot.Root != sync.root {
sync.Cancel()
sync = d.syncState(pivot.Root)
sync = d.syncStateWithEpoch(pivot.Root, epoch)

go closeOnErr(sync)
}
Expand Down Expand Up @@ -1558,7 +1593,7 @@ func (d *Downloader) processSnapSyncContent() error {
// If new pivot block found, cancel old state retrieval and restart
if oldPivot != P {
sync.Cancel()
sync = d.syncState(P.Header.Root)
sync = d.syncStateWithEpoch(P.Header.Root, types.GetStateEpoch(d.blockchain.StateExpiryConfig(), new(big.Int).SetUint64(P.Header.Number.Uint64())))

go closeOnErr(sync)
oldPivot = P
Expand Down
38 changes: 36 additions & 2 deletions eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -40,6 +41,22 @@ func (d *Downloader) syncState(root common.Hash) *stateSync {
return s
}

func (d *Downloader) syncStateWithEpoch(root common.Hash, epoch types.StateEpoch) *stateSync {
// Create the state sync
s := newStateSyncWithEpoch(d, root, epoch)
select {
case d.stateSyncStart <- s:
// If we tell the statesync to restart with a new root, we also need
// to wait for it to actually also start -- when old requests have timed
// out or been delivered
<-s.started
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
}
return s
}

// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
Expand Down Expand Up @@ -77,8 +94,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
root common.Hash // State root currently being synced
d *Downloader // Downloader instance to access and manage current peerset
root common.Hash // State root currently being synced
epoch types.StateEpoch
enableStateExpiry bool

started chan struct{} // Started is signalled once the sync loop starts
cancel chan struct{} // Channel to signal a termination request
Expand All @@ -99,11 +118,26 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
}
}

func newStateSyncWithEpoch(d *Downloader, root common.Hash, epoch types.StateEpoch) *stateSync {
return &stateSync{
d: d,
root: root,
epoch: epoch,
enableStateExpiry: true,
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
}
}

// run starts the task assignment and response processing loop, blocking until
// it finishes, and finally notifying any goroutines waiting for the loop to
// finish.
func (s *stateSync) run() {
close(s.started)
if s.enableStateExpiry {
s.d.SnapSyncer.UpdateEpoch(s.epoch)
}
s.err = s.d.SnapSyncer.Sync(s.root, s.cancel)
close(s.done)
}
Expand Down
6 changes: 5 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type handlerConfig struct {
DirectBroadcast bool
DisablePeerTxBroadcast bool
PeerSet *peerSet
EnableStateExpiry bool
}

type handler struct {
Expand All @@ -134,6 +135,8 @@ type handler struct {
acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
directBroadcast bool

enableStateExpiry bool

database ethdb.Database
txpool txPool
votepool votePool
Expand Down Expand Up @@ -196,6 +199,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
peersPerIP: make(map[string]int),
requiredBlocks: config.RequiredBlocks,
directBroadcast: config.DirectBroadcast,
enableStateExpiry: config.EnableStateExpiry,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
Expand Down Expand Up @@ -249,7 +253,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
downloadOptions = append(downloadOptions, success)
*/

h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...)
h.downloader = downloader.NewWithExpiry(config.Database, h.eventMux, h.chain, nil, config.EnableStateExpiry, h.removePeer, downloadOptions...)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
54 changes: 49 additions & 5 deletions eth/protocols/snap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -381,13 +384,26 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
storage []*StorageData
last common.Hash
abort bool
sv snapshot.SnapValue
hash common.Hash
slot []byte
enc []byte
)
for it.Next() {
if size >= hardLimit {
abort = true
break
}
hash, slot := it.Hash(), common.CopyBytes(it.Slot())
hash, enc = it.Hash(), common.CopyBytes(it.Slot())
if len(enc) > 0 {
sv, err = snapshot.DecodeValueFromRLPBytes(enc)
if err != nil || sv == nil {
log.Warn("Failed to decode storage slot", "err", err)
return nil, nil
}
}

slot = sv.GetVal()

// Track the returned interval for the Merkle proofs
last = hash
Expand Down Expand Up @@ -429,13 +445,23 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
}
proof := light.NewNodeSet()
if err := stTrie.Prove(origin[:], proof); err != nil {
log.Warn("Failed to prove storage range", "origin", req.Origin, "err", err)
return nil, nil
if enErr, ok := err.(*trie.ExpiredNodeError); ok {
err := reviveAndGetProof(chain.FullStateDB(), stTrie, req.Root, common.BytesToAddress(account[:]), acc.Root, enErr.Path, origin, proof)
if err != nil {
log.Warn("Failed to prove storage range", "origin", origin, "err", err)
return nil, nil
}
}
}
if last != (common.Hash{}) {
if err := stTrie.Prove(last[:], proof); err != nil {
log.Warn("Failed to prove storage range", "last", last, "err", err)
return nil, nil
if enErr, ok := err.(*trie.ExpiredNodeError); ok {
err := reviveAndGetProof(chain.FullStateDB(), stTrie, req.Root, common.BytesToAddress(account[:]), acc.Root, enErr.Path, last, proof)
if err != nil {
log.Warn("Failed to prove storage range", "origin", origin, "err", err)
return nil, nil
}
}
}
}
for _, blob := range proof.NodeList() {
Expand Down Expand Up @@ -567,6 +593,24 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
return nodes, nil
}

func reviveAndGetProof(fullStateDB ethdb.FullStateDB, tr *trie.StateTrie, stateRoot common.Hash, account common.Address, root common.Hash, prefixKey []byte, key common.Hash, proofDb *light.NodeSet) error {
proofs, err := fullStateDB.GetStorageReviveProof(stateRoot, account, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])})
if err != nil || len(proofs) == 0 {
return err
}

_, err = state.ReviveStorageTrie(account, tr, proofs[0], key)
if err != nil {
return err
}

if err := tr.Prove(key[:], proofDb); err != nil {
return err
}

return nil
}

// NodeInfo represents a short summary of the `snap` sub-protocol metadata
// known about the host peer.
type NodeInfo struct{}
Expand Down
Loading
Loading