Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: fix peer idleness tracking when restarting statesync #902

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
return err
}

if errors.Is(err, errInvalidChain) {
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errEmptyHeaderSet) ||
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
Expand All @@ -334,22 +336,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
}
return err
}

switch err {
case errTimeout, errBadPeer, errStallingPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor:
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
default:
log.Warn("Synchronisation failed, retrying", "err", err)
}
log.Warn("Synchronisation failed, retrying", "err", err)
return err
}

Expand Down Expand Up @@ -590,7 +577,7 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
head := headers[0]
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
Expand Down Expand Up @@ -799,7 +786,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
headers := packer.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return 0, errBadPeer
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
arrived = true

Expand All @@ -823,7 +810,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
}
start = check
hash = h
Expand Down Expand Up @@ -1008,7 +995,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
return fmt.Errorf("%w: header request timed out", errBadPeer)
}
}
}
Expand Down Expand Up @@ -1436,7 +1423,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
log.Debug("Stale headers")
return errBadPeer
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
Expand Down
70 changes: 58 additions & 12 deletions eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (d *Downloader) syncState(root common.Hash) *stateSync {
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
// If we tell the statesync to restart with a new root, we also need
// to wait for it to actually also start -- when old requests have timed
// out or been delivered
<-s.started
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
Expand Down Expand Up @@ -95,15 +99,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(len(req.items))
}
}()

// Run the state sync.
log.Trace("State sync starting", "root", s.root)
go s.run()
defer s.Cancel()

Expand All @@ -126,9 +124,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next

case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil

// Send the next finished request to the current sync:
Expand Down Expand Up @@ -189,11 +189,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)

// Make sure the previous one doesn't get siletly lost
// Move the previous request to the finished set
old.timer.Stop()
old.dropped = true

finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
Expand All @@ -210,6 +208,46 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
}
}

// spindownStateSync 'drains' the outstanding requests; some will be delivered and other
// will time out. This is to ensure that when the next stateSync starts working, all peers
// are marked as idle and de facto _are_ idle.
func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*stateReq, timeout chan *stateReq, peerDrop chan *peerConnection) {
log.Trace("State sync spinning down", "active", len(active), "finished", len(finished))

for len(active) > 0 {
var (
req *stateReq
reason string
)
select {
// Handle (drop) incoming state packs:
case pack := <-d.stateCh:
req = active[pack.PeerId()]
reason = "delivered"
// Handle dropped peer connections:
case p := <-peerDrop:
req = active[p.id]
reason = "peerdrop"
// Handle timed-out requests:
case req = <-timeout:
reason = "timeout"
}
if req == nil {
continue
}
req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason)
req.timer.Stop()
delete(active, req.peer.id)
req.peer.SetNodeDataIdle(len(req.items))
}
// The 'finished' set contains deliveries that we were going to pass to processing.
// Those are now moot, but we still need to set those peers as idle, which would
// otherwise have been done after processing
for _, req := range finished {
req.peer.SetNodeDataIdle(len(req.items))
}
}

// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
Expand All @@ -222,11 +260,15 @@ type stateSync struct {
numUncommitted int
bytesUncommitted int

started chan struct{} // Started is signalled once the sync loop starts

deliver chan *stateReq // Delivery channel multiplexing peer responses
cancel chan struct{} // Channel to signal a termination request
cancelOnce sync.Once // Ensures cancel only ever gets called once
done chan struct{} // Channel to signal termination completion
err error // Any error hit during sync (set before completion)

root common.Hash
}

// stateTask represents a single trie node download taks, containing a set of
Expand All @@ -247,6 +289,8 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
root: root,
}
}

Expand Down Expand Up @@ -277,6 +321,7 @@ func (s *stateSync) Cancel() error {
// pushed here async. The reason is to decouple processing from data receipt
// and timeouts.
func (s *stateSync) loop() error {
close(s.started)
// Listen for new peer events to assign tasks to them
newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
Expand Down Expand Up @@ -326,6 +371,7 @@ func (s *stateSync) loop() error {
}
// Process all the received blobs and check for stale delivery
delivered, err := s.process(req)
req.peer.SetNodeDataIdle(delivered)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
Expand Down Expand Up @@ -365,7 +411,7 @@ func (s *stateSync) assignTasks() {

// If the peer was assigned tasks to fetch, send the network request
if len(req.items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root)
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
Expand Down