Skip to content

Commit

Permalink
Merge branch 'bnb-chain:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
krish-nr authored Feb 18, 2024
2 parents b6e0968 + 8377b0e commit aff69d2
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 22 deletions.
25 changes: 9 additions & 16 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,24 +274,17 @@ func (p *Pruner) Prune(root common.Hash) error {
if stateBloomRoot != (common.Hash{}) {
return RecoverPruning(p.config.Datadir, p.db, p.config.Cachedir, p.triesInMemory)
}
// If the target state root is not specified, use the HEAD-(n-1) as the
// If the target state root is not specified, use the HEAD as the
// target. The reason for picking it is:
// - in most of the normal cases, the related state is available
// - the probability of this layer being reorg is very low
// - At opBNB, the possibility of reorg is very small, we don't need to roll back the block height,
// we just need to wait for the block height to be finalized during pruning.
// - Rolling back the block height currently causes the node to get stuck after startup.
// Instead of fixing the complex logic that causes the node to get stuck,
// we choose a more gentle way to solve the problem.
var layers []snapshot.Snapshot
if root == (common.Hash{}) {
// Retrieve all snapshot layers from the current HEAD.
// In theory there are n difflayers + 1 disk layer present,
// so n diff layers are expected to be returned.
layers = p.snaptree.Snapshots(p.chainHeader.Root, int(p.triesInMemory), true)
if len(layers) != int(p.triesInMemory) {
// Reject if the accumulated diff layers are less than n. It
// means in most of normal cases, there is no associated state
// with bottom-most diff layer.
return fmt.Errorf("snapshot not old enough yet: need %d more blocks", int(p.triesInMemory)-len(layers))
}
// Use the bottom-most diff layer as the target
root = layers[len(layers)-1].Root()
// Use the latest block header root as the target
root = p.chainHeader.Root
}
// Ensure the root is really present. The weak assumption
// is the presence of root can indicate the presence of the
Expand Down Expand Up @@ -327,7 +320,7 @@ func (p *Pruner) Prune(root common.Hash) error {
if len(layers) > 0 {
log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.chainHeader.Number.Uint64()-(p.triesInMemory-1))
} else {
log.Info("Selecting user-specified state as the pruning target", "root", root)
log.Info("Selecting user-specified state as the pruning target", "root", root, "height", p.chainHeader.Number.Uint64())
}
}
// Before start the pruning, delete the clean trie cache first.
Expand Down
23 changes: 23 additions & 0 deletions eth/downloader/fetchers_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
// to each request. Failing to do so is considered a protocol violation.
var timeoutGracePeriod = 2 * time.Minute

// peersRetryInterval is the retry interval when all peers cannot get the request data.
var peersRetryInterval = 100 * time.Millisecond

// maxRetries is the max retry time for unreserved download task
var maxRetries = 5

// typedQueue is an interface defining the adaptor needed to translate the type
// specific downloader/queue schedulers into the type-agnostic general concurrent
// fetcher algorithm calls.
Expand Down Expand Up @@ -125,6 +131,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {

// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false

requestRetried := 0
for {
// Short circuit if we lost all our peers
if d.peers.Len() == 0 && !beaconMode {
Expand Down Expand Up @@ -195,6 +203,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
// to the queue, that is async, and we can do better here by
// immediately pushing the unfulfilled requests.
queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
//reset progressed
if len(pending) == 0 {
progressed = false
}
continue
}
pending[peer.id] = req
Expand All @@ -212,6 +224,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
return errPeersUnavailable
}
// Retry the unreserved task in next loop
if beaconMode && len(pending) == 0 && queued > 0 && !progressed && !throttled && len(idles) == d.peers.Len() {
log.Warn("All idle peers are not valid for current task, will retry ...")
requestRetried++
if requestRetried > maxRetries {
log.Info("max retry exceeded, cancel request")
return errCanceled
}
time.Sleep(peersRetryInterval)
continue
}
}
// Wait for something to happen
select {
Expand Down
18 changes: 13 additions & 5 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/etherror"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -73,6 +74,9 @@ var errTerminated = errors.New("terminated")
// with a new header, but it does not link up to the existing sync.
var errReorgDenied = errors.New("non-forced head reorg denied")

// maxBlockNumGapTolerance is the max gap tolerance by peer
var maxBlockNumGapTolerance = uint64(30)

func init() {
// Tuning parameters is nice, but the scratch space must be assignable in
// full to peers. It's a useless cornercase to support a dangling half-group.
Expand Down Expand Up @@ -786,25 +790,29 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
case len(headers) == 0:
// No headers were delivered, reject the response and reschedule
peer.log.Debug("No headers delivered")
res.Done <- errors.New("no headers delivered")
res.Done <- etherror.ErrNoHeadersDelivered
s.scheduleRevertRequest(req)

case headers[0].Number.Uint64() != req.head:
// Header batch anchored at non-requested number
peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head)
res.Done <- errors.New("invalid header batch anchor")
if req.head-headers[0].Number.Uint64() < maxBlockNumGapTolerance {
res.Done <- etherror.ErrHeaderBatchAnchorLow
} else {
res.Done <- etherror.ErrInvalidHeaderBatchAnchor
}
s.scheduleRevertRequest(req)

case req.head >= requestHeaders && len(headers) != requestHeaders:
// Invalid number of non-genesis headers delivered, reject the response and reschedule
peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders)
res.Done <- errors.New("not enough non-genesis headers delivered")
res.Done <- etherror.ErrNotEnoughNonGenesisHeaders
s.scheduleRevertRequest(req)

case req.head < requestHeaders && uint64(len(headers)) != req.head:
// Invalid number of genesis headers delivered, reject the response and reschedule
peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64())
res.Done <- errors.New("not enough genesis headers delivered")
res.Done <- etherror.ErrNotEnoughGenesisHeaders
s.scheduleRevertRequest(req)

default:
Expand All @@ -813,7 +821,7 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
for i := 0; i < len(headers)-1; i++ {
if headers[i].ParentHash != headers[i+1].Hash() {
peer.log.Debug("Invalid hash progression", "index", i, "wantparenthash", headers[i].ParentHash, "haveparenthash", headers[i+1].Hash())
res.Done <- errors.New("invalid hash progression")
res.Done <- etherror.ErrInvalidHashProgression
s.scheduleRevertRequest(req)
return
}
Expand Down
12 changes: 12 additions & 0 deletions eth/etherror/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package etherror

import "errors"

var (
ErrNoHeadersDelivered = errors.New("no headers delivered")
ErrInvalidHeaderBatchAnchor = errors.New("invalid header batch anchor")
ErrNotEnoughNonGenesisHeaders = errors.New("not enough non-genesis headers delivered")
ErrNotEnoughGenesisHeaders = errors.New("not enough genesis headers delivered")
ErrInvalidHashProgression = errors.New("invalid hash progression")
ErrHeaderBatchAnchorLow = errors.New("header batch anchor is lower than requested")
)
16 changes: 15 additions & 1 deletion eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package eth

import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/etherror"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand Down Expand Up @@ -153,7 +155,19 @@ func nodeInfo(chain *core.BlockChain, network uint64) *NodeInfo {
// connection is torn down.
func Handle(backend Backend, peer *Peer) error {
for {
if err := handleMessage(backend, peer); err != nil {
err := handleMessage(backend, peer)
switch {
// TODO: currently no headers not ignored as it may leads to a dead peer not removing as expected
/*
case errors.Is(err, etherror.ErrNoHeadersDelivered):
// ignore no headers delivered
peer.Log().Warn("Message handling failed with no headers")
*/
case errors.Is(err, etherror.ErrHeaderBatchAnchorLow):
// ignore lower header anchor within tolerance
peer.Log().Warn("Message handling failed with lower batch anchor")

case err != nil:
peer.Log().Debug("Message handling failed in `eth`", "err", err)
return err
}
Expand Down

0 comments on commit aff69d2

Please sign in to comment.