Skip to content

Commit

Permalink
feat: add try duration from last conn
Browse files Browse the repository at this point in the history
  • Loading branch information
cody committed May 28, 2024
1 parent 4a3b040 commit 1ea44e6
Showing 1 changed file with 128 additions and 62 deletions.
190 changes: 128 additions & 62 deletions core/node/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"
"math/rand"
"sync/atomic"
"time"
)

// Peering constructs the peering service and hooks it into fx's lifetime
Expand Down Expand Up @@ -38,74 +39,33 @@ func PeerWith(peers ...peer.AddrInfo) fx.Option {
}

const (
maxNLastConn = 10
maxTryLimit = 1000
maxNLastConn = 10
maxTryLimit = 100
maxTimeDuration = 20 * time.Second
connTimeout = 3 * time.Second
)

// PeerWithLastConn try to connect to last peers
func PeerWithLastConn() fx.Option {
return fx.Invoke(func(host host.Host, cfg *config.Config) {
peerIds := host.Peerstore().PeersWithAddrs()

bootstrap, err := cfg.BootstrapPeers()
if err != nil {
logger.Warn("failed to parse bootstrap peers from config")
}

filter := make(map[peer.ID]bool, len(bootstrap))
for _, id := range bootstrap {
filter[id.ID] = true
}

connection := make(map[peer.ID]bool)
for _, id := range peerIds {
if host.Network().Connectedness(id) == network.Connected || id == host.ID() || filter[id] {
continue
}
connection[id] = true
}

g := errgroup.Group{}
g.SetLimit(maxNLastConn)
needConnect := int32(maxNLastConn)
tryCount := 0

for {
if tryCount >= maxTryLimit {
logger.Infof("max try count limited.")
break
}

if needConnect <= 0 {
break
}
func loadConnPeers(host host.Host, cfg *config.Config) map[peer.ID]bool {
peerIds := host.Peerstore().PeersWithAddrs()

randomSubSet := randomSubsetOfPeers(connection, int(needConnect))
tryCount += len(randomSubSet)
bootstrap, err := cfg.BootstrapPeers()
if err != nil {
logger.Warn("failed to parse bootstrap peers from config")
}

if len(randomSubSet) == 0 {
break
}
filter := make(map[peer.ID]bool, len(bootstrap))
for _, id := range bootstrap {
filter[id.ID] = true
}

for id, _ := range randomSubSet {
connection[id] = false
peerId := id
g.Go(func() error {
if err = host.Connect(context.Background(), host.Peerstore().PeerInfo(peerId)); err != nil {
logger.Debugf("connect to last connection peer %s, error %v", peerId, err)
return nil
}
atomic.AddInt32(&needConnect, -1)
return nil
})
}
err = g.Wait()
if err != nil {
logger.Debugf("connect to last connection error %v", err)
return
}
canConnect := make(map[peer.ID]bool)
for _, id := range peerIds {
if host.Network().Connectedness(id) == network.Connected || id == host.ID() || filter[id] {
continue
}
})
canConnect[id] = true
}
return canConnect
}

func randomSubsetOfPeers(in map[peer.ID]bool, max int) map[peer.ID]bool {
Expand Down Expand Up @@ -135,3 +95,109 @@ func randomSubsetOfPeers(in map[peer.ID]bool, max int) map[peer.ID]bool {

return out
}

func clearTriedPeer(peers map[peer.ID]bool, triedPeer map[peer.ID]bool) map[peer.ID]bool {
for k := range triedPeer {
peers[k] = false
}
return peers
}

func doConcurrentConn(ctx context.Context, host host.Host, peers map[peer.ID]bool) int32 {
if len(peers) < 1 {
return 0
}

g := errgroup.Group{}
g.SetLimit(maxNLastConn)

connected := int32(0)

for id := range peers {
peerId := id
g.Go(func() error {
if err := host.Connect(ctx, host.Peerstore().PeerInfo(peerId)); err != nil {
logger.Debugf("connect to last connection peer %s, error %v", peerId, err)
return nil
}
atomic.AddInt32(&connected, 1)
return nil
})
}
_ = g.Wait()

return connected
}

func tryConn(host host.Host, peers map[peer.ID]bool) {

success := make(chan struct{})
useOut := make(chan struct{})
maxTry := make(chan struct{})
timeout := make(chan struct{})

timer := time.NewTimer(maxTimeDuration)

needPeerCount := maxNLastConn
canTryPeerCount := maxTryLimit

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, connTimeout)
defer cancel()

go func() {
for {
select {
case <-timer.C:
timeout <- struct{}{}
return
default:
conns := randomSubsetOfPeers(peers, needPeerCount)
peers = clearTriedPeer(peers, conns)

connCount := doConcurrentConn(ctx, host, conns)

needPeerCount -= int(connCount)
canTryPeerCount -= len(conns)

if len(conns) <= 0 {
useOut <- struct{}{}
return
}

if needPeerCount <= 0 {
success <- struct{}{}
return
}

if canTryPeerCount <= 0 {
maxTry <- struct{}{}
return
}
}
}
}()

select {
case <-timeout:
logger.Debugf("connect to last connection timeout")
return
case <-success:
logger.Debugf("connect to last connection success")
return
case <-useOut:
logger.Debugf("connect to last connection use out")
return
case <-maxTry:
logger.Debugf("connect to last connection try limited")
return
}
}

// PeerWithLastConn tryConn to connect to last peers
func PeerWithLastConn() fx.Option {
return fx.Invoke(func(host host.Host, cfg *config.Config) {
peers := loadConnPeers(host, cfg)
tryConn(host, peers)
})
}

0 comments on commit 1ea44e6

Please sign in to comment.