Skip to content

Commit

Permalink
feat: libp2p: Lotus stream cleanup (#11993)
Browse files Browse the repository at this point in the history
* set stream deadlines in Lotus

* reduce timeout

* whitelist bootstrappers

* fix tests
  • Loading branch information
aarshkshah1992 authored May 14, 2024
1 parent d0bbb0b commit af31126
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
1 change: 1 addition & 0 deletions chain/exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
streamReadDeadline = 10 * time.Second
)

// FIXME: Rename. Make private.
Expand Down
4 changes: 4 additions & 0 deletions chain/exchange/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ func (s *server) HandleStream(stream inet.Stream) {

defer stream.Close() //nolint:errcheck

_ = stream.SetReadDeadline(time.Now().Add(streamReadDeadline))
var req Request
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
_ = stream.SetReadDeadline(time.Time{})
log.Warnf("failed to read block sync request: %s", err)
return
}
_ = stream.SetReadDeadline(time.Time{})

log.Debugw("block sync request",
"start", req.Head, "len", req.Length)

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-shed/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var helloCmd = &cli.Command{

func HandleStream(s inet.Stream) {
var hmsg hello.HelloMessage
_ = s.SetReadDeadline(time.Now().Add(30 * time.Second))
if err := cborutil.ReadCborRPC(s, &hmsg); err != nil {
log.Infow("failed to read hello message, disconnecting", "error", err)
_ = s.Conn().Close()
Expand Down
10 changes: 10 additions & 0 deletions node/hello/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
const ProtocolID = "/fil/hello/1.0.0"

var log = logging.Logger("hello")
var streamDeadline = 10 * time.Second

type HelloMessage struct {
HeaviestTipSet []cid.Cid
Expand Down Expand Up @@ -70,11 +71,15 @@ func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, co

func (hs *Service) HandleStream(s inet.Stream) {
var hmsg HelloMessage
_ = s.SetReadDeadline(time.Now().Add(streamDeadline))
if err := cborutil.ReadCborRPC(s, &hmsg); err != nil {
_ = s.SetReadDeadline(time.Time{})
log.Infow("failed to read hello message, disconnecting", "error", err)
_ = s.Conn().Close()
return
}
_ = s.SetReadDeadline(time.Time{})

arrived := build.Clock.Now()

log.Debugw("genesis from hello",
Expand All @@ -95,9 +100,11 @@ func (hs *Service) HandleStream(s inet.Stream) {
TArrival: arrived.UnixNano(),
TSent: sent.UnixNano(),
}
_ = s.SetWriteDeadline(time.Now().Add(streamDeadline))
if err := cborutil.WriteCborRPC(s, msg); err != nil {
log.Debugf("error while responding to latency: %v", err)
}
_ = s.SetWriteDeadline(time.Time{})
}()

protos, err := hs.h.Peerstore().GetProtocols(s.Conn().RemotePeer())
Expand Down Expand Up @@ -155,9 +162,12 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())

t0 := build.Clock.Now()
_ = s.SetWriteDeadline(time.Now().Add(streamDeadline))
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
_ = s.SetWriteDeadline(time.Time{})
return xerrors.Errorf("writing rpc to peer: %w", err)
}
_ = s.SetWriteDeadline(time.Time{})
if err := s.CloseWrite(); err != nil {
log.Warnw("CloseWrite err", "error", err)
}
Expand Down
21 changes: 19 additions & 2 deletions node/modules/lp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/fx"

"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)

var rcmgrMetricsOnce sync.Once

func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo, bs dtypes.BootstrapPeers) (network.ResourceManager, error) {
return func(lc fx.Lifecycle, repo repo.LockedRepo, bs dtypes.BootstrapPeers) (network.ResourceManager, error) {
isFullNode := repo.RepoType().Type() == "FullNode"
envvar := os.Getenv("LOTUS_RCMGR")
if (isFullNode && envvar == "0") || // only set NullResourceManager if envvar is explicitly "0"
Expand Down Expand Up @@ -133,6 +136,20 @@ func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo)
opts = append(opts, rcmgr.WithTrace(traceFile))
}

resolver := madns.DefaultResolver
var bootstrapperMaddrs []ma.Multiaddr
for _, pi := range bs {
for _, addr := range pi.Addrs {
resolved, err := resolver.Resolve(context.Background(), addr)
if err != nil {
continue
}
bootstrapperMaddrs = append(bootstrapperMaddrs, resolved...)
}
}

opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(bootstrapperMaddrs))

mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, fmt.Errorf("error creating resource manager: %w", err)
Expand Down

0 comments on commit af31126

Please sign in to comment.