From 812a9ba708ff7be71df3c713ac522595f7d7baa7 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 19 Jul 2022 07:31:10 -0700 Subject: [PATCH] [AV-1919] addTrieInProgress before segments to the work chan (#860) * addTrieInProgress before segments to the work chan * fix * more explicit fix * rm unneeded file * add DrainAcceptorQueue to test * add CopyBytes for db invariant * nit * use channel as semaphore * Add comments * Remove bad comment --- plugin/evm/syncervm_test.go | 1 + sync/statesync/state_syncer.go | 83 +++++++++++++++---------------- sync/statesync/trie_queue.go | 9 ++++ sync/statesync/trie_segments.go | 10 +++- sync/statesync/trie_sync_tasks.go | 2 +- 5 files changed, 58 insertions(+), 47 deletions(-) diff --git a/plugin/evm/syncervm_test.go b/plugin/evm/syncervm_test.go index 4abd4d7fd2..6498007cf9 100644 --- a/plugin/evm/syncervm_test.go +++ b/plugin/evm/syncervm_test.go @@ -178,6 +178,7 @@ func TestStateSyncToggleEnabledToDisabled(t *testing.T) { if err := syncDisabledVM.blockChain.Snapshots().Verify(lastRoot); err != nil { t.Fatal(err) } + syncDisabledVM.blockChain.DrainAcceptorQueue() // Create a new VM from the same database with state sync enabled. syncReEnabledVM := &VM{} diff --git a/sync/statesync/state_syncer.go b/sync/statesync/state_syncer.go index a99467c535..de56677a8a 100644 --- a/sync/statesync/state_syncer.go +++ b/sync/statesync/state_syncer.go @@ -5,6 +5,7 @@ package statesync import ( "context" + "fmt" "sync" "github.com/ava-labs/coreth/core/state/snapshot" @@ -53,10 +54,10 @@ type stateSync struct { triesInProgress map[common.Hash]*trieToSync // track completion and progress of work - mainTrieDone chan struct{} - trieCompleted chan struct{} - done chan error - stats *trieSyncStats + mainTrieDone chan struct{} + triesInProgressSem chan struct{} + done chan error + stats *trieSyncStats } func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) { @@ -70,13 +71,16 @@ func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) { stats: newTrieSyncStats(), triesInProgress: make(map[common.Hash]*trieToSync), + // [triesInProgressSem] is used to keep the number of tries syncing + // less than or equal to [defaultNumThreads]. + triesInProgressSem: make(chan struct{}, defaultNumThreads), + // Each [trieToSync] will have a maximum of [numSegments] segments. // We set the capacity of [segments] such that [defaultNumThreads] // storage tries can sync concurrently. - segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments), - mainTrieDone: make(chan struct{}), - trieCompleted: make(chan struct{}, 1), - done: make(chan error, 1), + segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments), + mainTrieDone: make(chan struct{}), + done: make(chan error, 1), } ss.syncer = syncclient.NewCallbackLeafSyncer(config.Client, ss.segments) ss.codeSyncer = newCodeSyncer(CodeSyncerConfig{ @@ -98,14 +102,19 @@ func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) { return nil, err } ss.addTrieInProgress(ss.root, ss.mainTrie) + ss.mainTrie.startSyncing() // start syncing after tracking the trie as in progress return ss, nil } // onStorageTrieFinished is called after a storage trie finishes syncing. func (t *stateSync) onStorageTrieFinished(root common.Hash) error { + <-t.triesInProgressSem // allow another trie to start (release the semaphore) + // mark the storage trie as done in trieQueue + if err := t.trieQueue.StorageTrieDone(root); err != nil { + return err + } // track the completion of this storage trie - t.removeTrieInProgress(root) - return t.trieQueue.StorageTrieDone(root) + return t.removeTrieInProgress(root) } // onMainTrieFinishes is called after the main trie finishes syncing. @@ -119,10 +128,9 @@ func (t *stateSync) onMainTrieFinished() error { } t.stats.setTriesRemaining(numStorageTries) - // mark the main trie done and check if the sync operation is complete + // mark the main trie done close(t.mainTrieDone) - t.removeTrieInProgress(t.root) - return nil + return t.removeTrieInProgress(t.root) } // onSyncComplete is called after the account trie and @@ -147,22 +155,7 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error { return ctx.Err() } - // Each storage trie may split up to [numStorageTrieSegments], - // so we keep the number of storage tries in progress limited - // to ensure the segments channel has capacity when segments - // are created. - maxStorageTriesInProgess := cap(t.segments) / numStorageTrieSegments for { - - for t.countTriesInProgress() >= maxStorageTriesInProgess { - // wait for a trie to complete to avoid spinning - select { - case <-t.trieCompleted: - case <-ctx.Done(): - return ctx.Err() - } - } - // check ctx here to exit the loop early if err := ctx.Err(); err != nil { return err @@ -172,15 +165,26 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error { if err != nil { return err } - // it is possible there are no storage tries. + // If there are no storage tries, then root will be the empty hash on the first pass. if root != (common.Hash{}) { + // acquire semaphore (to keep number of tries in progress limited) + select { + case t.triesInProgressSem <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + // Arbitrarily use the first account for making requests to the server. + // Note: getNextTrie guarantees that if a non-nil storage root is returned, then the + // slice of account hashes is non-empty. + syncAccount := accounts[0] // create a trieToSync for the storage trie and mark it as in progress. - syncAccount := accounts[0] // arbitrarily use the first account for making requests to the server storageTrie, err := NewTrieToSync(t, root, syncAccount, NewStorageTrieTask(t, root, accounts)) if err != nil { return err } t.addTrieInProgress(root, storageTrie) + storageTrie.startSyncing() // start syncing after tracking the trie as in progress } // if there are no more storage tries, close // the task queue and exit the producer. @@ -232,25 +236,16 @@ func (t *stateSync) addTrieInProgress(root common.Hash, trie *trieToSync) { // tries in progress and notifies the storage root producer // so it can continue in case it was paused due to the // maximum number of tries in progress being previously reached. -func (t *stateSync) removeTrieInProgress(root common.Hash) { +func (t *stateSync) removeTrieInProgress(root common.Hash) error { t.lock.Lock() defer t.lock.Unlock() t.stats.trieDone(root) - delete(t.triesInProgress, root) - - select { - case t.trieCompleted <- struct{}{}: - default: + if _, ok := t.triesInProgress[root]; !ok { + return fmt.Errorf("removeTrieInProgress for unexpected root: %s", root) } -} - -// countTriesInProgress returns the number of tries in progress. -func (t *stateSync) countTriesInProgress() int { - t.lock.RLock() - defer t.lock.RUnlock() - - return len(t.triesInProgress) + delete(t.triesInProgress, root) + return nil } // onSyncFailure is called if the sync fails, this writes all diff --git a/sync/statesync/trie_queue.go b/sync/statesync/trie_queue.go index f069711c78..feb26b2258 100644 --- a/sync/statesync/trie_queue.go +++ b/sync/statesync/trie_queue.go @@ -59,6 +59,9 @@ func (t *trieQueue) StorageTrieDone(root common.Hash) error { // getNextTrie returns the next storage trie to sync, along with a slice // of accounts that point to the returned storage trie. +// Returns true if there are more storage tries to sync and false otherwise. +// Note: if a non-nil root is returned, getNextTrie guarantees that there will be at least +// one account hash in the returned slice. func (t *trieQueue) getNextTrie() (common.Hash, []common.Hash, bool, error) { it := rawdb.NewSyncStorageTriesIterator(t.db, t.nextStorageRoot) defer it.Release() @@ -69,16 +72,22 @@ func (t *trieQueue) getNextTrie() (common.Hash, []common.Hash, bool, error) { more bool ) + // Iterate over the keys to find the next storage trie root and all of the account hashes that contain the same storage root. for it.Next() { + // Unpack the state root and account hash from the current key nextRoot, nextAccount := rawdb.UnpackSyncStorageTrieKey(it.Key()) + // Set the root for the first pass if root == (common.Hash{}) { root = nextRoot } + // If the next root is different than the originally set root, then we've iterated over all of the account hashes that + // have the same storage trie root. Set more to be true, since there is at least one more storage trie. if root != nextRoot { t.nextStorageRoot = nextRoot[:] more = true break } + // If we found another account with the same root, add the accountHash. accounts = append(accounts, nextAccount) } diff --git a/sync/statesync/trie_segments.go b/sync/statesync/trie_segments.go index 0c7f17fa03..d0feda61c4 100644 --- a/sync/statesync/trie_segments.go +++ b/sync/statesync/trie_segments.go @@ -121,7 +121,7 @@ func (t *trieToSync) loadSegments() error { // don't go past the end of the segment break } - lastKey = it.Key() + lastKey = common.CopyBytes(it.Key()) segment.leafs++ } if lastKey != nil { @@ -129,11 +129,17 @@ func (t *trieToSync) loadSegments() error { segment.pos = lastKey // syncing will start from this key } log.Debug("statesync: loading segment", "segment", segment) - t.sync.segments <- segment // this will queue the segment for syncing } return it.Error() } +// startSyncing adds the trieToSync's segments to the work queue +func (t *trieToSync) startSyncing() { + for _, segment := range t.segments { + t.sync.segments <- segment // this will queue the segment for syncing + } +} + // addSegment appends a newly created segment specified by [start] and // [end] to [t.segments] and returns it. // note: addSegment does not take a lock and therefore is called only diff --git a/sync/statesync/trie_sync_tasks.go b/sync/statesync/trie_sync_tasks.go index 82c237a14d..9ecb755912 100644 --- a/sync/statesync/trie_sync_tasks.go +++ b/sync/statesync/trie_sync_tasks.go @@ -70,7 +70,7 @@ func (m *mainTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) err } // persist the account data - writeAccountSnapshot(db, common.BytesToHash(key), acc) + writeAccountSnapshot(db, accountHash, acc) // check if this account has storage root that we need to fetch if acc.Root != (common.Hash{}) && acc.Root != types.EmptyRootHash {