Skip to content

Commit

Permalink
p2p: use decaying tags for fetch peers (#5564)
Browse files Browse the repository at this point in the history
## Motivation

When there's some peer churn (connection manager enabled, which is the default), fetch can be interrupted by connection manager dropping connections which are currently used for fetching. This can happen both for fetch clients and servers, breaking sync.
  • Loading branch information
ivan4th committed Feb 14, 2024
1 parent 737c88d commit f99e31b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ configuration is as follows:

### Improvements

* [#5564](https://github.com/spacemeshos/go-spacemesh/pull/5564) Use decaying tags for fetch peers. This prevents
libp2p's Connection Manager from breaking sync.
* [#5418](https://github.com/spacemeshos/go-spacemesh/pull/5418) Add `grpc-post-listener` to separate post service from
`grpc-private-listener` and not require mTLS for the post service.

Expand Down
10 changes: 9 additions & 1 deletion fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ type Config struct {
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
// The maximum number of concurrent requests to get ATXs.
GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"`
GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"`
DecayingTag server.DecayingTagSpec `mapstructure:"decaying-tag"`
}

func (c Config) getServerConfig(protocol string) ServerConfig {
Expand Down Expand Up @@ -151,6 +152,12 @@ func DefaultConfig() Config {
},
PeersRateThreshold: 0.02,
GetAtxsConcurrency: 100,
DecayingTag: server.DecayingTagSpec{
Interval: time.Minute,
Inc: 1000,
Dec: 1000,
Cap: 10000,
},
}
}

Expand Down Expand Up @@ -290,6 +297,7 @@ func (f *Fetch) registerServer(
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger),
server.WithDecayingTag(f.cfg.DecayingTag),
}
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
Expand Down
2 changes: 2 additions & 0 deletions p2p/server/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"time"

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -17,6 +18,7 @@ type Host interface {
SetStreamHandler(protocol.ID, network.StreamHandler)
NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error)
Network() network.Network
ConnManager() connmgr.ConnManager
}

type peerStream interface {
Expand Down
39 changes: 39 additions & 0 deletions p2p/server/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"time"

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -20,6 +21,13 @@ import (
"github.com/spacemeshos/go-spacemesh/log"
)

type DecayingTagSpec struct {
Interval time.Duration `mapstructure:"interval"`
Inc int `mapstructure:"inc"`
Dec int `mapstructure:"dec"`
Cap int `mapstructure:"cap"`
}

// ErrNotConnected is returned when peer is not connected.
var ErrNotConnected = errors.New("peer is not connected")

Expand Down Expand Up @@ -88,6 +96,12 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt {
}
}

func WithDecayingTag(tag DecayingTagSpec) Opt {
return func(s *Server) {
s.decayingTagSpec = &tag
}
}

// Handler is the handler to be defined by the application.
type Handler func(context.Context, []byte) ([]byte, error)

Expand All @@ -110,6 +124,8 @@ type Server struct {
queueSize int
requestsPerInterval int
interval time.Duration
decayingTagSpec *DecayingTagSpec
decayingTag connmgr.DecayingTag

metrics *tracker // metrics can be nil

Expand All @@ -133,6 +149,23 @@ func New(h Host, proto string, handler Handler, opts ...Opt) *Server {
for _, opt := range opts {
opt(srv)
}

if srv.decayingTagSpec != nil {
decayer, supported := connmgr.SupportsDecay(h.ConnManager())
if supported {
tag, err := decayer.RegisterDecayingTag(
"server:"+proto,
srv.decayingTagSpec.Interval,
connmgr.DecayFixed(srv.decayingTagSpec.Dec),
connmgr.BumpSumBounded(0, srv.decayingTagSpec.Cap))
if err != nil {
srv.logger.Error("error registering decaying tag", log.Err(err))
} else {
srv.decayingTag = tag
}
}
}

return srv
}

Expand Down Expand Up @@ -176,6 +209,9 @@ func (s *Server) Run(ctx context.Context) error {
return nil
}
eg.Go(func() error {
if s.decayingTag != nil {
s.decayingTag.Bump(req.stream.Conn().RemotePeer(), s.decayingTagSpec.Inc)
}
ok := s.queueHandler(ctx, req.stream)
if s.metrics != nil {
s.metrics.serverLatency.Observe(time.Since(req.received).Seconds())
Expand Down

0 comments on commit f99e31b

Please sign in to comment.