Skip to content

Commit

Permalink
Aggressive reconnect (#973)
Browse files Browse the repository at this point in the history
* Reconnect disconnected libp2p peers in 10 seconds, not 5 minutes

* Use direct peering arrangements to form the network backbone, rather than one-off connections that won't retry/reconnect on failure

* placate the linter - although I personally disagree with 'check-shadowing: true' in govet, not allowing shadowed variables leads to complaining on reuse of 'err' inside for loop, which results in less readable code
  • Loading branch information
lukemarsden authored Oct 27, 2022
1 parent 9028956 commit 4512c3f
Showing 1 changed file with 13 additions and 38 deletions.
51 changes: 13 additions & 38 deletions pkg/transport/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -97,9 +96,22 @@ func NewTransportFromOptions(ctx context.Context,
pubsub.ScoreParameterDecay(2*time.Minute), //nolint:gomnd
pubsub.ScoreParameterDecay(10*time.Minute), //nolint:gomnd
)

pis := []peer.AddrInfo{}
for _, p := range peers {
var pi *peer.AddrInfo
pi, err = peer.AddrInfoFromP2pAddr(p)
if err != nil {
return nil, err
}
pis = append(pis, *pi)
}

ps, err := pubsub.NewGossipSub(
ctx,
h,
pubsub.WithDirectPeers(pis),
pubsub.WithDirectConnectTicks(10), //nolint:gomnd
pubsub.WithFloodPublish(true),
pubsub.WithPeerExchange(true),
pubsub.WithPeerGater(pgParams),
Expand Down Expand Up @@ -177,11 +189,6 @@ func (t *LibP2PTransport) Start(ctx context.Context) error {
return t.Shutdown(ctx)
})

err := t.connectToPeers(ctx)
if err != nil {
return err
}

go t.listenForEvents(ctx)

log.Ctx(ctx).Trace().Msg("Libp2p transport has started")
Expand Down Expand Up @@ -270,38 +277,6 @@ func (t *LibP2PTransport) Decrypt(ctx context.Context, data []byte) ([]byte, err
)
}

/*
libp2p
*/

func (t *LibP2PTransport) connectToPeers(ctx context.Context) error {
ctx, span := system.GetTracer().Start(ctx, "pkg/transport/libp2p.Subscribe")
defer span.End()

if len(t.peers) == 0 {
return nil
}

for _, peerAddress := range t.peers {
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(peerAddress)
if err != nil {
return err
}

t.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
err = t.host.Connect(ctx, *info)
if err != nil {
return err
}
log.Ctx(ctx).Trace().Msgf("Libp2p transport connected to: %s", peerAddress)
}

return nil
}

/*
pub / sub
Expand Down

0 comments on commit 4512c3f

Please sign in to comment.