Skip to content

Commit

Permalink
Fix handshake and dial timeout (#247)
Browse files Browse the repository at this point in the history
* Fix handshake and dial timeout

* Add some logs

* Fix bpr requester different error

* fix locking
  • Loading branch information
yzang2019 authored Oct 28, 2024
1 parent e5eddb0 commit 3ee0e7f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 21 deletions.
41 changes: 22 additions & 19 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,15 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm
return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height)
}

if requester.setBlock(block, extCommit, peerID) {
setBlockResult := requester.setBlock(block, extCommit, peerID)
if setBlockResult == 0 {
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
} else {
err := errors.New("requester is different or block already exists")
} else if setBlockResult < 0 {
err := errors.New("bpr requester peer is different from original peer")
pool.sendError(err, peerID)
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
}
Expand Down Expand Up @@ -671,24 +672,26 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error {

func (*bpRequester) OnStop() {}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID types.NodeID) bool {
// Returns 0 if block doesn't already exist.
// Returns -1 if block exist but peers doesn't match.
// Return 1 if block exist and peer matches.
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID types.NodeID) int {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
if extCommit != nil {
bpr.extCommit = extCommit
}
bpr.mtx.Unlock()

select {
case bpr.gotBlockCh <- struct{}{}:
default:
defer bpr.mtx.Unlock()
if bpr.block == nil {
bpr.block = block
if extCommit != nil {
bpr.extCommit = extCommit
}
select {
case bpr.gotBlockCh <- struct{}{}:
default:
}
return 0
} else if bpr.peerID == peerID {
return 1
}
return true
return -1
}

func (bpr *bpRequester) getBlock() *types.Block {
Expand Down
3 changes: 2 additions & 1 deletion internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {

// We now have an eligible address to dial. If we're full but have
// upgrade capacity (as checked above), we find a lower-scored peer
// we can replace and mark it as upgrading so noone else claims it.
// we can replace and mark it as upgrading so no one else claims it.
//
// If we don't find one, there is no point in trying additional
// peers, since they will all have the same or lower score than this
Expand All @@ -567,6 +567,7 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
}

m.dialing[peer.ID] = true
m.logger.Info(fmt.Sprintf("Going to dial peer %s with address %s", peer.ID, addressInfo.Address))
return addressInfo.Address, nil
}
}
Expand Down
5 changes: 4 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,10 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene

func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions {
opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType,
QueueType: conf.P2P.QueueType,
DialTimeout: conf.P2P.DialTimeout,
HandshakeTimeout: conf.P2P.HandshakeTimeout,
ResolveTimeout: conf.P2P.HandshakeTimeout,
}

if conf.FilterPeers && appClient != nil {
Expand Down

0 comments on commit 3ee0e7f

Please sign in to comment.