Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core/state] Prefetch Trie Nodes Concurrently #372

Merged
merged 88 commits into from
Nov 21, 2023
Merged
Changes from 1 commit
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
29e0bb0
make prefetcher thread-safe
patrick-ogrady Oct 31, 2023
51e08fa
outline TODOs
patrick-ogrady Oct 31, 2023
9079234
add more TODOs
patrick-ogrady Oct 31, 2023
6e1db61
add more TODOs
patrick-ogrady Oct 31, 2023
e080e51
update miner to use prefetcher
patrick-ogrady Nov 1, 2023
9063e9a
remove Copy2
patrick-ogrady Nov 1, 2023
460d2ab
work on refactoring prefetcher for concurrent use
patrick-ogrady Nov 1, 2023
f2b97bf
keep on keeping on
patrick-ogrady Nov 1, 2023
5cbf77e
spec multi-trie
patrick-ogrady Nov 1, 2023
48bdd09
limit prefetching across all subfetchers
patrick-ogrady Nov 1, 2023
2df8bf4
making progress
patrick-ogrady Nov 1, 2023
a83b012
add functionality to wait
patrick-ogrady Nov 1, 2023
4a936ee
remove addressed TODOs
patrick-ogrady Nov 1, 2023
9844502
rename sm
patrick-ogrady Nov 1, 2023
42334cc
more cleanup
patrick-ogrady Nov 1, 2023
4889438
move term to mt
patrick-ogrady Nov 1, 2023
c4bc74a
more moving around
patrick-ogrady Nov 1, 2023
820e667
move loop into prefetcher
patrick-ogrady Nov 1, 2023
8f8cae9
more cleanup
patrick-ogrady Nov 1, 2023
c9c5131
outline todo for using a single queue
patrick-ogrady Nov 1, 2023
8d88eaf
working through single channel
patrick-ogrady Nov 1, 2023
dc5bc42
continuing refactor
patrick-ogrady Nov 1, 2023
023298e
make sure to return copy
patrick-ogrady Nov 1, 2023
77dad32
change name to trie orchestrator
patrick-ogrady Nov 1, 2023
bd6abd6
more progress on revamp
patrick-ogrady Nov 1, 2023
bdcc7ce
skip prefetch if to is nil
patrick-ogrady Nov 1, 2023
cddcab0
start task processing
patrick-ogrady Nov 1, 2023
96fa231
remove unnecessary changes
patrick-ogrady Nov 1, 2023
06a8b7a
add more stop behavior
patrick-ogrady Nov 1, 2023
7d92dbf
add comment for miner prefetcher
patrick-ogrady Nov 1, 2023
d98355b
add comments for const
patrick-ogrady Nov 2, 2023
3c6d1dc
tests passing locally
patrick-ogrady Nov 2, 2023
134d105
cleanup usage of wg
patrick-ogrady Nov 2, 2023
1d33c99
track time spent waiting on fetcher
patrick-ogrady Nov 2, 2023
5a910dd
track skips
patrick-ogrady Nov 2, 2023
506e762
nit on prefetcher
patrick-ogrady Nov 2, 2023
7664a16
don't clear state channel
patrick-ogrady Nov 2, 2023
4048bb6
cleanup restore outstanding requests
patrick-ogrady Nov 2, 2023
5bf07f1
add debugging logs
patrick-ogrady Nov 2, 2023
c208d41
remove verbose logging
patrick-ogrady Nov 2, 2023
101e0ec
ensure inner loop uses different variables
patrick-ogrady Nov 2, 2023
c385a40
clean up var naming
patrick-ogrady Nov 2, 2023
4e103de
replace panics with logs
patrick-ogrady Nov 2, 2023
807402c
set target task size as fixed
patrick-ogrady Nov 2, 2023
fd14e92
remove loop checks
patrick-ogrady Nov 2, 2023
fb5d8ff
clearer metrics tracking
patrick-ogrady Nov 2, 2023
00e5d62
use bounded workers
patrick-ogrady Nov 2, 2023
88fe25e
use cancelable context
patrick-ogrady Nov 2, 2023
6177b8d
copy meter
patrick-ogrady Nov 2, 2023
e6ba6d4
add doneLock
patrick-ogrady Nov 3, 2023
35746e4
add more comments
patrick-ogrady Nov 3, 2023
ca71b83
simplify bounded workers
patrick-ogrady Nov 3, 2023
6155f8c
track outstanding work
patrick-ogrady Nov 3, 2023
1e0b7b6
Stop -> Wait
patrick-ogrady Nov 3, 2023
3a38102
allow copies up to maxConcurrentReads
patrick-ogrady Nov 3, 2023
b35eea6
add largest load metric
patrick-ogrady Nov 3, 2023
0c33e40
lint
patrick-ogrady Nov 3, 2023
5b9590d
respect metrics enabled
patrick-ogrady Nov 3, 2023
85132f9
track num fetchers
patrick-ogrady Nov 3, 2023
3c13bb2
build works
patrick-ogrady Nov 3, 2023
6a383ea
fix prefetcher test
patrick-ogrady Nov 3, 2023
947cad8
don't clear pending tasks when waiting
patrick-ogrady Nov 3, 2023
cc3f8db
cleanup trie prefetcher scale up
patrick-ogrady Nov 3, 2023
c35669f
update tests with parallelism
patrick-ogrady Nov 3, 2023
82fd0ab
use spawner instead of spawn
patrick-ogrady Nov 3, 2023
dcfd8f2
fix more tests
patrick-ogrady Nov 3, 2023
c37f8f4
fix shutdown stall
patrick-ogrady Nov 3, 2023
e20cd79
update default parallelism to 48
patrick-ogrady Nov 3, 2023
27c58f8
spawn trie per key
patrick-ogrady Nov 3, 2023
7a607a9
change prefetcher parallelism to 16
patrick-ogrady Nov 18, 2023
5dad6a6
Merge pull request #394 from ava-labs/prefetch-tries-aggr-cons
patrick-ogrady Nov 20, 2023
267b9ce
add TODO
patrick-ogrady Nov 20, 2023
10c1846
remove context from bounded workers
patrick-ogrady Nov 20, 2023
bbdf849
remove context from bounded workers
patrick-ogrady Nov 20, 2023
5d6b265
remove delivery metrics
patrick-ogrady Nov 20, 2023
5fa0e9d
handle shutdown in wait
patrick-ogrady Nov 20, 2023
9360317
remove extra locking
patrick-ogrady Nov 20, 2023
097ef5b
rename var
patrick-ogrady Nov 20, 2023
4c6b3d3
update comments around shutdown path
patrick-ogrady Nov 20, 2023
a297573
reorder channel vars
patrick-ogrady Nov 20, 2023
ddb0d63
use processingTasks instead of outstandingRequests
patrick-ogrady Nov 20, 2023
a99d1a4
fix comment on panic
patrick-ogrady Nov 20, 2023
0ff7c10
Merge pull request #398 from ava-labs/prefetch-tries-nits
patrick-ogrady Nov 20, 2023
69fa652
Merge branch 'master' into prefetch-tries
patrick-ogrady Nov 20, 2023
4fc35d9
ensure idle workers are fed first
patrick-ogrady Nov 21, 2023
0209e1c
try live copy first
patrick-ogrady Nov 21, 2023
374fc18
remove unnecessary waitgroup
patrick-ogrady Nov 21, 2023
346f239
honor stop
patrick-ogrady Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
continuing refactor
patrick-ogrady committed Nov 1, 2023

Verified

This commit was signed with the committer’s verified signature. The key has expired.
patrick-ogrady Patrick O'Grady
commit dc5bc42bdf507cf56f2f344f4c8a73ab95926eae
247 changes: 83 additions & 164 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
package state

import (
"context"
"sync"

"github.com/ava-labs/coreth/metrics"
@@ -131,11 +130,17 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
// TODO: stop workers, if they were spawned
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
ceyonur marked this conversation as resolved.
Show resolved Hide resolved

for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times
// Stop all workers
close(p.stopWorkers)
<-p.workersTerm

// Collect stats from all fetchers
for _, fetcher := range p.fetchers {
if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
@@ -156,6 +161,7 @@ func (p *triePrefetcher) close() {
}
}
}

// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
@@ -204,12 +210,12 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
if p.fetches != nil {
return
}

// Active fetcher, schedule the retrievals
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
// TODO: use single [p] instead of each
fetcher = newSubfetcher(p, p.db, p.requestLimiter, p.root, owner, root, addr)
fetcher = newSubfetcher(p, owner, root, addr)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
@@ -228,18 +234,19 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
}
return p.db.CopyTrie(trie)
}

// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil {
p.deliveryRequestMissMeter.Mark(1)
return nil
}

// Wait for the fetcher to finish, if it exists.
// Wait for the fetcher to finish, if it exists (this will prevent any future tasks from
// being enqueued).
fetcher.wait()
patrick-ogrady marked this conversation as resolved.
Show resolved Hide resolved

// Return a copy of one of the prefetched tries (this is still backed
// by a node cache).
// Return a copy of one of the prefetched tries
trie := fetcher.peek()
if trie == nil {
p.deliveryWaitMissMeter.Mark(1)
@@ -274,11 +281,8 @@ type subfetcher struct {
root common.Hash // Root hash of the trie to prefetch
addr common.Address // Address of the account that the trie belongs to

mt *multiTrie
requestLimiter *semaphore.Weighted

stop chan struct{} // Channel to interrupt processing
stopOnce sync.Once
mt *multiTrie
outstandingRequests sync.WaitGroup

seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
@@ -287,17 +291,15 @@ type subfetcher struct {

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(p *triePrefetcher, db Database, requestLimiter *semaphore.Weighted, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
func newSubfetcher(p *triePrefetcher, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
sf := &subfetcher{
p: p,
requestLimiter: requestLimiter,
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
stop: make(chan struct{}),
seen: make(map[string]struct{}),
p: p,
db: p.db,
state: p.root,
owner: owner,
root: root,
addr: addr,
seen: make(map[string]struct{}),
}
sf.mt = newMultiTrie(sf)
return sf
patrick-ogrady marked this conversation as resolved.
Show resolved Hide resolved
@@ -318,6 +320,7 @@ func (sf *subfetcher) schedule(keys [][]byte) {
sf.seen[sk] = struct{}{}
tasks = append(tasks, key)
}
sf.outstandingRequests.Add(len(tasks))
sf.mt.enqueueTasks(tasks)
}

@@ -332,42 +335,27 @@ func (sf *subfetcher) peek() Trie {

func (sf *subfetcher) wait() {
if sf.mt == nil {
// Unable to open trie
return
}
sf.mt.Wait()

// TODO: handle case where work aborted
sf.outstandingRequests.Wait()
}

// abort interrupts the subfetcher immediately. It is safe to call abort multiple
// times but it is not thread safe.
func (sf *subfetcher) abort() {
if sf.mt == nil {
// If a multiTrie was never created, we can exit right away because
// we never started a loop.
return
}
sf.stopOnce.Do(func() {
close(sf.stop)
})
// TODO: term must include loop
<-sf.mt.term
type copiableTrie struct {
t Trie
l sync.Mutex
}

// multiTrie is not thread-safe.
type multiTrie struct {
sf *subfetcher

term chan struct{} // Channel to signal interruption

base Trie
baseLock sync.Mutex
base *copiableTrie

workers int
wg sync.WaitGroup

taskChunks chan [][]byte
tasks chan []byte

closeTasks sync.Once
copies int
copyChan chan *copiableTrie
}

func newMultiTrie(sf *subfetcher) *multiTrie {
@@ -390,139 +378,70 @@ func newMultiTrie(sf *subfetcher) *multiTrie {
}
}

// Start primary fetcher
// Create initial trie copy
ct := &copiableTrie{t: base}
mt := &multiTrie{
sf: sf,
term: make(chan struct{}),
base: base,
taskChunks: make(chan [][]byte, 1024), // TODO: make a const, should never block?
tasks: make(chan []byte),
sf: sf,
base: ct,

copies: 1,
copyChan: make(chan *copiableTrie, subfetcherMaxConcurrency),
}
mt.wg.Add(2)
go func() {
mt.wg.Wait()
close(mt.term)
}()
go mt.handleTasks()
mt.workers++
go mt.work(true)
mt.copyChan <- ct
return mt
}

func (mt *multiTrie) copyBase() Trie {
mt.baseLock.Lock()
defer mt.baseLock.Unlock()
mt.base.l.Lock()
defer mt.base.l.Unlock()

return mt.sf.db.CopyTrie(mt.base)
return mt.sf.db.CopyTrie(mt.base.t)
}

func (mt *multiTrie) enqueueTasks(tasks [][]byte) {
if len(tasks) == 0 {
lt := len(tasks)
if lt == 0 {
return
}
mt.taskChunks <- tasks
}

// loop waits for new tasks to be scheduled and keeps loading them until it runs
// out of tasks or its underlying trie is retrieved for committing.
func (mt *multiTrie) handleTasks() {
defer mt.wg.Done()

for {
select {
case <-mt.sf.stop:
// Termination is requested, abort and leave remaining tasks
return
case tasks, ok := <-mt.taskChunks:
if !ok {
return
}
// We don't do this in [enqueueTasks] because it can block.
// Determine if we need to spawn more workers
mt.addWorkers(len(tasks))

// Enqueue work
for _, key := range tasks {
select {
case mt.tasks <- key:
case <-mt.sf.stop:
return
}
}
// Create more copies if we have a lot of work
tasksPerWorker := lt / mt.copies
if tasksPerWorker > targetTasksPerWorker && mt.copies < subfetcherMaxConcurrency {
extraWork := (tasksPerWorker - targetTasksPerWorker) * mt.copies
newWorkers := extraWork / targetTasksPerWorker
for i := 0; i < newWorkers && mt.copies+1 <= subfetcherMaxConcurrency; i++ {
mt.copies++
mt.copyChan <- &copiableTrie{t: mt.copyBase()}
}
}
}
workSize := lt / mt.copies

func (mt *multiTrie) work(base bool) {
defer mt.wg.Done()
// Enqueue more work as soon as trie copies are available
current := 0
for current < lt {
var theseTasks [][]byte
if len(tasks[current:]) < workSize {
theseTasks = tasks[current:]
} else {

// Create reference to Trie used for prefetching
var t Trie
if base {
t = mt.base
} else {
t = mt.copyBase()
}

// Wait for prefetching requests or for tasks to be done
for {
select {
case <-mt.sf.stop:
return
case task, ok := <-mt.tasks:
// Exit because there are no more tasks to do.
if !ok {
return
}

// Ensure we don't perform more than the permitted reads concurrently
_ = mt.sf.requestLimiter.Acquire(context.TODO(), 1)
defer mt.sf.requestLimiter.Release(1)

// No termination request yet, prefetch the next entry
//
// TODO: save trie in each goroutine that is run concurrently rather than
// creating a new one for each key.
var err error
if len(task) == common.AddressLength {
_, err = t.GetAccount(common.BytesToAddress(task))
} else {
_, err = t.GetStorage(mt.sf.addr, task)
}
if err != nil {
log.Error("Trie prefetcher failed fetching", "root", mt.sf.root, "err", err)
}
}
}
}

// addWorkers determines if more workers should be spawned to process
// [tasks].
func (mt *multiTrie) addWorkers(tasks int) {
tasksPerWorker := tasks / mt.workers
if tasksPerWorker > targetTasksPerWorker {
extraWork := (tasksPerWorker - targetTasksPerWorker) * mt.workers
newWorkers := extraWork / targetTasksPerWorker
for i := 0; i < newWorkers && mt.workers+1 <= subfetcherMaxConcurrency; i++ {
mt.wg.Add(1)
mt.workers++
go mt.work(false)
t := <-mt.copyChan
mt.sf.p.taskQueue <- func() {
// TODO: add option to abort here

for _, task := range theseTasks {
t.l.Lock()
var err error
if len(task) == common.AddressLength {
_, err = t.t.GetAccount(common.BytesToAddress(task))
} else {
_, err = t.t.GetStorage(mt.sf.addr, task)
}
t.l.Unlock()
if err != nil {
log.Error("Trie prefetcher failed fetching", "root", mt.sf.root, "err", err)
}
}
}
}
}

func (mt *multiTrie) Wait() {
// Return if already terminated (it is ok if didn't complete)
select {
case <-mt.term:
return
default:
}

// Otherwise, wait for shutdown
mt.closeTasks.Do(func() {
close(mt.tasks)
close(mt.taskChunks)
})
<-mt.term
}