diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bf86540bc..0d4f16a088 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,8 @@ configuration is as follows: ### Improvements +* [#5564](https://github.com/spacemeshos/go-spacemesh/pull/5564) Use decaying tags for fetch peers. This prevents + libp2p's Connection Manager from breaking sync. * [#5418](https://github.com/spacemeshos/go-spacemesh/pull/5418) Add `grpc-post-listener` to separate post service from `grpc-private-listener` and not require mTLS for the post service. diff --git a/fetch/fetch.go b/fetch/fetch.go index 47eebf9113..32c59dc456 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -108,7 +108,8 @@ type Config struct { ServersConfig map[string]ServerConfig `mapstructure:"servers"` PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"` // The maximum number of concurrent requests to get ATXs. - GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"` + GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"` + DecayingTag server.DecayingTagSpec `mapstructure:"decaying-tag"` } func (c Config) getServerConfig(protocol string) ServerConfig { @@ -151,6 +152,12 @@ func DefaultConfig() Config { }, PeersRateThreshold: 0.02, GetAtxsConcurrency: 100, + DecayingTag: server.DecayingTagSpec{ + Interval: time.Minute, + Inc: 1000, + Dec: 1000, + Cap: 10000, + }, } } @@ -290,6 +297,7 @@ func (f *Fetch) registerServer( server.WithTimeout(f.cfg.RequestTimeout), server.WithHardTimeout(f.cfg.RequestHardTimeout), server.WithLog(f.logger), + server.WithDecayingTag(f.cfg.DecayingTag), } if f.cfg.EnableServerMetrics { opts = append(opts, server.WithMetrics()) diff --git a/p2p/server/interface.go b/p2p/server/interface.go index 87702d4f43..99a527cf32 100644 --- a/p2p/server/interface.go +++ b/p2p/server/interface.go @@ -5,6 +5,7 @@ import ( "io" "time" + "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -17,6 +18,7 @@ type Host interface { SetStreamHandler(protocol.ID, network.StreamHandler) NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error) Network() network.Network + ConnManager() connmgr.ConnManager } type peerStream interface { diff --git a/p2p/server/mocks/mocks.go b/p2p/server/mocks/mocks.go index d76f0e16f9..7e496f27aa 100644 --- a/p2p/server/mocks/mocks.go +++ b/p2p/server/mocks/mocks.go @@ -13,6 +13,7 @@ import ( reflect "reflect" time "time" + connmgr "github.com/libp2p/go-libp2p/core/connmgr" network "github.com/libp2p/go-libp2p/core/network" peer "github.com/libp2p/go-libp2p/core/peer" protocol "github.com/libp2p/go-libp2p/core/protocol" @@ -42,6 +43,44 @@ func (m *MockHost) EXPECT() *MockHostMockRecorder { return m.recorder } +// ConnManager mocks base method. +func (m *MockHost) ConnManager() connmgr.ConnManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConnManager") + ret0, _ := ret[0].(connmgr.ConnManager) + return ret0 +} + +// ConnManager indicates an expected call of ConnManager. +func (mr *MockHostMockRecorder) ConnManager() *HostConnManagerCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnManager", reflect.TypeOf((*MockHost)(nil).ConnManager)) + return &HostConnManagerCall{Call: call} +} + +// HostConnManagerCall wrap *gomock.Call +type HostConnManagerCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *HostConnManagerCall) Return(arg0 connmgr.ConnManager) *HostConnManagerCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *HostConnManagerCall) Do(f func() connmgr.ConnManager) *HostConnManagerCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *HostConnManagerCall) DoAndReturn(f func() connmgr.ConnManager) *HostConnManagerCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Network mocks base method. func (m *MockHost) Network() network.Network { m.ctrl.T.Helper() diff --git a/p2p/server/server.go b/p2p/server/server.go index 033e95a431..3ab5ce229a 100644 --- a/p2p/server/server.go +++ b/p2p/server/server.go @@ -9,6 +9,7 @@ import ( "io" "time" + "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -20,6 +21,13 @@ import ( "github.com/spacemeshos/go-spacemesh/log" ) +type DecayingTagSpec struct { + Interval time.Duration `mapstructure:"interval"` + Inc int `mapstructure:"inc"` + Dec int `mapstructure:"dec"` + Cap int `mapstructure:"cap"` +} + // ErrNotConnected is returned when peer is not connected. var ErrNotConnected = errors.New("peer is not connected") @@ -88,6 +96,12 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt { } } +func WithDecayingTag(tag DecayingTagSpec) Opt { + return func(s *Server) { + s.decayingTagSpec = &tag + } +} + // Handler is the handler to be defined by the application. type Handler func(context.Context, []byte) ([]byte, error) @@ -110,6 +124,8 @@ type Server struct { queueSize int requestsPerInterval int interval time.Duration + decayingTagSpec *DecayingTagSpec + decayingTag connmgr.DecayingTag metrics *tracker // metrics can be nil @@ -133,6 +149,23 @@ func New(h Host, proto string, handler Handler, opts ...Opt) *Server { for _, opt := range opts { opt(srv) } + + if srv.decayingTagSpec != nil { + decayer, supported := connmgr.SupportsDecay(h.ConnManager()) + if supported { + tag, err := decayer.RegisterDecayingTag( + "server:"+proto, + srv.decayingTagSpec.Interval, + connmgr.DecayFixed(srv.decayingTagSpec.Dec), + connmgr.BumpSumBounded(0, srv.decayingTagSpec.Cap)) + if err != nil { + srv.logger.Error("error registering decaying tag", log.Err(err)) + } else { + srv.decayingTag = tag + } + } + } + return srv } @@ -176,6 +209,9 @@ func (s *Server) Run(ctx context.Context) error { return nil } eg.Go(func() error { + if s.decayingTag != nil { + s.decayingTag.Bump(req.stream.Conn().RemotePeer(), s.decayingTagSpec.Inc) + } ok := s.queueHandler(ctx, req.stream) if s.metrics != nil { s.metrics.serverLatency.Observe(time.Since(req.received).Seconds())