Skip to content

Commit

Permalink
Add a new config to speed up block sync (#244)
Browse files Browse the repository at this point in the history
* Never switch to consensus due to timeout

* Add blocksync peer config to speed up block sync rate

* Fix config parsing

* Add some logs
  • Loading branch information
yzang2019 authored Oct 17, 2024
1 parent e9348a9 commit 29b1ac3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 11 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,9 @@ type P2PConfig struct { //nolint: maligned
// Comma separated list of nodes to keep persistent connections to
PersistentPeers string `mapstructure:"persistent-peers"`

// Comma separated list of nodes for block sync only
BlockSyncPeers string `mapstructure:"blocksync-peers"`

// UPNP port forwarding
UPNP bool `mapstructure:"upnp"`

Expand Down Expand Up @@ -712,7 +715,7 @@ func DefaultP2PConfig() *P2PConfig {
RecvRate: 5120000, // 5 mB/s
PexReactor: true,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
HandshakeTimeout: 5 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "simple-priority",
Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ bootstrap-peers = "{{ .P2P.BootstrapPeers }}"
# Comma separated list of nodes to keep persistent connections to
persistent-peers = "{{ .P2P.PersistentPeers }}"
# Comma separated list of nodes for block sync only
blocksync-peers = "{{ .P2P.BlockSyncPeers }}"
# UPNP port forwarding
upnp = {{ .P2P.UPNP }}
Expand Down
16 changes: 11 additions & 5 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ eg, L = latency = 0.1s
*/

const (
requestInterval = 10 * time.Millisecond
inactiveSleepInterval = 1 * time.Second
maxTotalRequesters = 600
requestInterval = 100 * time.Millisecond
maxTotalRequesters = 50
maxPeerErrBuffer = 1000
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
Expand All @@ -54,7 +53,7 @@ const (
BadBlock RetryReason = "BadBlock"
)

var peerTimeout = 10 * time.Second // not const so we can override with tests
var peerTimeout = 2 * time.Second // not const so we can override with tests

/*
Peers self report their heights when we join the block pool.
Expand Down Expand Up @@ -356,6 +355,12 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6
pool.mtx.Lock()
defer pool.mtx.Unlock()

blockSyncPeers := pool.peerManager.GetBlockSyncPeers()
if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] {
pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers)))
return
}

peer := pool.peers[peerID]
if peer != nil {
peer.base = base
Expand All @@ -370,7 +375,7 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6
logger: pool.logger.With("peer", peerID),
startAt: time.Now(),
}

pool.logger.Info(fmt.Sprintf("Adding peer %s to blocksync pool", peerID))
pool.peers[peerID] = peer
}

Expand Down Expand Up @@ -431,6 +436,7 @@ func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.No
for peer := range peers {
sortedPeers = append(sortedPeers, peer)
}

// Sort from high to low score
sort.Slice(sortedPeers, func(i, j int) bool {
return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j])
Expand Down
6 changes: 3 additions & 3 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
continue

case r.pool.IsCaughtUp() && r.previousMaxPeerHeight <= r.pool.MaxPeerHeight():
r.logger.Info("switching to consensus reactor", "height", height)
r.logger.Info("switching to consensus reactor after caught up", "height", height)

case time.Since(lastAdvance) > syncTimeout:
r.logger.Error("no progress since last advance", "last_advance", lastAdvance)
continue

default:
r.logger.Info(
Expand Down Expand Up @@ -611,8 +612,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
// See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631
panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height))
} else if first == nil || second == nil {
// we need to have fetched two consecutive blocks in order to
// perform blocksync verification
// we need to have fetched two consecutive blocks in order to perform blocksync verification
continue
}

Expand Down
31 changes: 29 additions & 2 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const (
// retryNever is returned by retryDelay() when retries are disabled.
retryNever time.Duration = math.MaxInt64
// DefaultScore is the default score for a peer during initialization
// DefaultMutableScore is the default score for a peer during initialization
DefaultMutableScore int64 = 10
)

Expand Down Expand Up @@ -101,6 +101,9 @@ type PeerManagerOptions struct {
// Peers to which a connection will be (re)established ignoring any existing limits
UnconditionalPeers []types.NodeID

// Only include those peers for block sync
BlockSyncPeers []types.NodeID

// MaxPeers is the maximum number of peers to track information about, i.e.
// store in the peer store. When exceeded, the lowest-scored unconnected peers
// will be deleted. 0 means no limit.
Expand Down Expand Up @@ -157,6 +160,9 @@ type PeerManagerOptions struct {

// List of node IDs, to which a connection will be (re)established ignoring any existing limits
unconditionalPeers map[types.NodeID]struct{}

// blocksyncPeers provides fast blocksyncPeers lookups.
blocksyncPeers map[types.NodeID]bool
}

// Validate validates the options.
Expand Down Expand Up @@ -217,6 +223,13 @@ func (o *PeerManagerOptions) isPersistent(id types.NodeID) bool {
return o.persistentPeers[id]
}

func (o *PeerManagerOptions) isBlockSync(id types.NodeID) bool {
if o.blocksyncPeers == nil {
panic("isBlockSync() called before optimize()")
}
return o.blocksyncPeers[id]
}

func (o *PeerManagerOptions) isUnconditional(id types.NodeID) bool {
if o.unconditionalPeers == nil {
panic("isUnconditional() called before optimize()")
Expand All @@ -234,6 +247,11 @@ func (o *PeerManagerOptions) optimize() {
o.persistentPeers[p] = true
}

o.blocksyncPeers = make(map[types.NodeID]bool, len(o.BlockSyncPeers))
for _, p := range o.BlockSyncPeers {
o.blocksyncPeers[p] = true
}

o.unconditionalPeers = make(map[types.NodeID]struct{}, len(o.UnconditionalPeers))
for _, p := range o.UnconditionalPeers {
o.unconditionalPeers[p] = struct{}{}
Expand Down Expand Up @@ -367,6 +385,9 @@ func (m *PeerManager) configurePeers() error {
for _, id := range m.options.UnconditionalPeers {
configure[id] = true
}
for _, id := range m.options.BlockSyncPeers {
configure[id] = true
}
for id := range m.options.PeerScores {
configure[id] = true
}
Expand All @@ -384,6 +405,7 @@ func (m *PeerManager) configurePeers() error {
func (m *PeerManager) configurePeer(peer peerInfo) peerInfo {
peer.Persistent = m.options.isPersistent(peer.ID)
peer.Unconditional = m.options.isUnconditional(peer.ID)
peer.BlockSync = m.options.isBlockSync(peer.ID)
peer.FixedScore = m.options.PeerScores[peer.ID]
return peer
}
Expand Down Expand Up @@ -464,6 +486,10 @@ func (m *PeerManager) Add(address NodeAddress) (bool, error) {
return true, nil
}

func (m *PeerManager) GetBlockSyncPeers() map[types.NodeID]bool {
return m.options.blocksyncPeers
}

// PeerRatio returns the ratio of peer addresses stored to the maximum size.
func (m *PeerManager) PeerRatio() float64 {
m.mtx.Lock()
Expand Down Expand Up @@ -1318,6 +1344,7 @@ type peerInfo struct {
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
Unconditional bool
BlockSync bool
Seed bool
Height int64
FixedScore PeerScore // mainly for tests
Expand Down Expand Up @@ -1388,7 +1415,7 @@ func (p *peerInfo) Score() PeerScore {
}

score := p.MutableScore
if p.Persistent {
if p.Persistent || p.BlockSync {
score = int64(PeerScorePersistent)
}

Expand Down
10 changes: 10 additions & 0 deletions node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ func createPeerManager(
peers = append(peers, address)
}

for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BlockSyncPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}

peers = append(peers, address)
options.BlockSyncPeers = append(options.BlockSyncPeers, address.NodeID)
}

for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ") {
options.UnconditionalPeers = append(options.UnconditionalPeers, types.NodeID(p))
}
Expand Down

0 comments on commit 29b1ac3

Please sign in to comment.