Skip to content

Commit

Permalink
Backport p2p fixes to v1.3 (#5500)
Browse files Browse the repository at this point in the history
Need to backport urgent p2p-related changes to v1.3
  • Loading branch information
ivan4th authored Jan 25, 2024
1 parent 1cd8627 commit f348c1e
Show file tree
Hide file tree
Showing 15 changed files with 697 additions and 95 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ See [RELEASE](./RELEASE.md) for workflow instructions.
* [#5498](https://github.com/spacemeshos/go-spacemesh/pull/5498)
Reduce the default number of CPU cores that are used for verifying incoming ATXs to half of the available cores.

* [#5500](https://github.com/spacemeshos/go-spacemesh/pull/5500)
Make fetch request timeout configurable.
Add separate metric for failed p2p server requests.
Adjust deadline during long reads and writes, reducing "i/o deadline exceeded" errors.
Make routing discovery more configurable and less spammy by default.

## Release v1.3.5

### Improvements
Expand All @@ -38,8 +44,6 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

### Improvements

* [#5464](https://github.com/spacemeshos/go-spacemesh/pull/5464) Make fetch request timeout configurable.

* [#5467](https://github.com/spacemeshos/go-spacemesh/pull/5467)
Fix a bug that could cause ATX sync to stall because of exhausted limit of concurrent requests for dependencies.
Fetching dependencies of an ATX is not limited anymore.
Expand Down
9 changes: 6 additions & 3 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func (s ServerConfig) toOpts() []server.Opt {
type Config struct {
BatchTimeout time.Duration
BatchSize, QueueSize int
RequestTimeout time.Duration
MaxRetriesForRequest int
EnableServesMetrics bool `mapstructure:"servers-metrics"`
RequestTimeout time.Duration `mapstructure:"request-timeout"`
RequestHardTimeout time.Duration `mapstructure:"request-hard-timeout"`
EnableServerMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
GetAtxsConcurrency int64 // The maximum number of concurrent requests to get ATXs.
Expand All @@ -127,6 +128,7 @@ func DefaultConfig() Config {
QueueSize: 20,
BatchSize: 10,
RequestTimeout: 25 * time.Second,
RequestHardTimeout: 5 * time.Minute,
MaxRetriesForRequest: 100,
ServersConfig: map[string]ServerConfig{
// serves 1 MB of data
Expand Down Expand Up @@ -287,9 +289,10 @@ func (f *Fetch) registerServer(
) {
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger),
}
if f.cfg.EnableServesMetrics {
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
Expand Down
4 changes: 3 additions & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func createFetch(tb testing.TB) *testFetch {
BatchSize: 3,
QueueSize: 1000,
RequestTimeout: 3 * time.Second,
RequestHardTimeout: 10 * time.Second,
MaxRetriesForRequest: 3,
GetAtxsConcurrency: DefaultConfig().GetAtxsConcurrency,
}
Expand Down Expand Up @@ -335,7 +336,8 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
BatchTimeout: 2000 * time.Minute, // make sure we never hit the batch timeout
BatchSize: 3,
QueueSize: 1000,
RequestTimeout: time.Second * time.Duration(3),
RequestTimeout: 3 * time.Second,
RequestHardTimeout: 10 * time.Second,
MaxRetriesForRequest: 3,
}
p2pconf := p2p.DefaultConfig()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-msgio v0.3.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/multiformats/go-multiaddr v0.12.0
github.com/multiformats/go-varint v0.0.7
Expand Down Expand Up @@ -144,7 +145,6 @@ require (
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
Expand Down
5 changes: 3 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,9 +1320,10 @@ func getTestDefaultConfig(tb testing.TB) *config.Config {
cfg.DataDirParent = tmp
cfg.FileLock = filepath.Join(tmp, "LOCK")

cfg.FETCH.RequestTimeout = 10
cfg.FETCH.RequestTimeout = 10 * time.Second
cfg.FETCH.RequestHardTimeout = 20 * time.Second
cfg.FETCH.BatchSize = 5
cfg.FETCH.BatchTimeout = 5
cfg.FETCH.BatchTimeout = 5 * time.Second

cfg.Beacon = beacon.NodeSimUnitTestConfig()

Expand Down
60 changes: 44 additions & 16 deletions p2p/dhtdiscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const (
discoveryHighPeersDelay = 10 * time.Second
protocolPrefix = "/spacekad"
ProtocolID = protocolPrefix + "/kad/1.0.0"
findPeersRetryDelay = time.Second
advertiseRetryInterval = time.Second
)

type Opt func(*Discovery)
Expand Down Expand Up @@ -124,9 +122,27 @@ func AdvertiseForPeerDiscovery() Opt {
}
}

func WithAdvertiseInterval(aint time.Duration) Opt {
func WithAdvertiseRetryDelay(value time.Duration) Opt {
return func(d *Discovery) {
d.advertiseInterval = aint
d.advertiseRetryDelay = value
}
}

func WithAdvertiseDelay(value time.Duration) Opt {
return func(d *Discovery) {
d.advertiseDelay = value
}
}

func WithAdvertiseInterval(value time.Duration) Opt {
return func(d *Discovery) {
d.advertiseInterval = value
}
}

func WithFindPeersRetryDelay(value time.Duration) Opt {
return func(d *Discovery) {
d.findPeersRetryDelay = value
}
}

Expand All @@ -138,15 +154,17 @@ type DiscoveryHost interface {

func New(h DiscoveryHost, opts ...Opt) (*Discovery, error) {
d := Discovery{
public: true,
logger: zap.NewNop(),
h: h,
period: 10 * time.Second,
timeout: 30 * time.Second,
bootstrapDuration: 30 * time.Second,
minPeers: 20,
highPeers: 40,
advertiseInterval: time.Minute,
public: true,
logger: zap.NewNop(),
h: h,
period: 10 * time.Second,
timeout: 30 * time.Second,
bootstrapDuration: 30 * time.Second,
minPeers: 20,
highPeers: 40,
advertiseInterval: time.Minute,
advertiseRetryDelay: time.Minute,
findPeersRetryDelay: time.Minute,
}
for _, opt := range opts {
opt(&d)
Expand Down Expand Up @@ -183,7 +201,10 @@ type Discovery struct {
bootstrapDuration time.Duration
minPeers, highPeers int
backup, bootnodes []peer.AddrInfo
advertiseDelay time.Duration
advertiseInterval time.Duration
advertiseRetryDelay time.Duration
findPeersRetryDelay time.Duration
}

func (d *Discovery) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
Expand Down Expand Up @@ -376,6 +397,13 @@ func (d *Discovery) peerHasTag(p peer.ID) bool {
}

func (d *Discovery) advertiseNS(ctx context.Context, ns string, active func() bool) error {
if d.advertiseDelay != 0 {
select {
case <-ctx.Done():
return nil
case <-time.After(d.advertiseDelay):
}
}
for {
var ttl time.Duration
if active == nil || active() {
Expand All @@ -384,7 +412,7 @@ func (d *Discovery) advertiseNS(ctx context.Context, ns string, active func() bo
ttl, err = d.disc.Advertise(ctx, ns, p2pdisc.TTL(d.advertiseInterval))
if err != nil {
d.logger.Error("failed to re-advertise for discovery", zap.String("ns", ns), zap.Error(err))
ttl = advertiseRetryInterval
ttl = d.advertiseRetryDelay
}
} else {
ttl = d.advertiseInterval
Expand Down Expand Up @@ -465,7 +493,7 @@ func (d *Discovery) findPeersContinuously(ctx context.Context, ns string) <-chan
select {
case <-ctx.Done():
return nil
case <-time.After(findPeersRetryDelay):
case <-time.After(d.findPeersRetryDelay):
}
peerCh = nil
continue
Expand All @@ -481,7 +509,7 @@ func (d *Discovery) findPeersContinuously(ctx context.Context, ns string) <-chan
select {
case <-ctx.Done():
return nil
case <-time.After(findPeersRetryDelay):
case <-time.After(d.findPeersRetryDelay):
}
continue
}
Expand Down
5 changes: 5 additions & 0 deletions p2p/dhtdiscovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestSanity(t *testing.T) {
Private(),
WithLogger(logger),
WithMode(dht.ModeServer),
WithFindPeersRetryDelay(1*time.Second),
)
require.NoError(t, err)
bootdisc.Start()
Expand Down Expand Up @@ -111,6 +112,10 @@ func TestSanity(t *testing.T) {
EnableRoutingDiscovery(),
AdvertiseForPeerDiscovery(),
WithMode(nodeOpts[i].dhtMode),
WithAdvertiseDelay(10 * time.Millisecond),
WithAdvertiseInterval(1 * time.Minute),
WithAdvertiseRetryDelay(1 * time.Second),
WithFindPeersRetryDelay(1 * time.Second),
}
if nodeOpts[i].lookForRelays {
relayCh := make(chan peer.AddrInfo)
Expand Down
92 changes: 52 additions & 40 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ func DefaultConfig() Config {
PingInterval: time.Second,
EnableTCPTransport: true,
EnableQUICTransport: false,
AdvertiseInterval: time.Minute,
AutoNATServer: AutoNATServer{
// Defaults taken from libp2p
GlobalMax: 30,
PeerMax: 3,
ResetPeriod: time.Minute,
},
DiscoveryTimings: DiscoveryTimings{
AdvertiseDelay: time.Hour,
AdvertiseInterval: time.Hour,
AdvertiseRetryDelay: time.Minute,
FindPeersRetryDelay: time.Minute,
},
}
}

Expand All @@ -112,45 +117,52 @@ type Config struct {
MaxMessageSize int

// see https://lwn.net/Articles/542629/ for reuseport explanation
DisableReusePort bool `mapstructure:"disable-reuseport"`
DisableNatPort bool `mapstructure:"disable-natport"`
DisableConnectionManager bool `mapstructure:"disable-connection-manager"`
DisableResourceManager bool `mapstructure:"disable-resource-manager"`
DisableDHT bool `mapstructure:"disable-dht"`
Flood bool `mapstructure:"flood"`
Listen AddressList `mapstructure:"listen"`
Bootnodes []string `mapstructure:"bootnodes"`
Direct []string `mapstructure:"direct"`
MinPeers int `mapstructure:"min-peers"`
LowPeers int `mapstructure:"low-peers"`
HighPeers int `mapstructure:"high-peers"`
InboundFraction float64 `mapstructure:"inbound-fraction"`
OutboundFraction float64 `mapstructure:"outbound-fraction"`
AutoscalePeers bool `mapstructure:"autoscale-peers"`
AdvertiseAddress AddressList `mapstructure:"advertise-address"`
AcceptQueue int `mapstructure:"p2p-accept-queue"`
Metrics bool `mapstructure:"p2p-metrics"`
Bootnode bool `mapstructure:"p2p-bootnode"`
ForceReachability string `mapstructure:"p2p-reachability"`
ForceDHTServer bool `mapstructure:"force-dht-server"`
EnableHolepunching bool `mapstructure:"p2p-holepunching"`
PrivateNetwork bool `mapstructure:"p2p-private-network"`
RelayServer RelayServer `mapstructure:"relay-server"`
IP4Blocklist []string `mapstructure:"ip4-blocklist"`
IP6Blocklist []string `mapstructure:"ip6-blocklist"`
GossipQueueSize int `mapstructure:"gossip-queue-size"`
GossipValidationThrottle int `mapstructure:"gossip-validation-throttle"`
GossipAtxValidationThrottle int `mapstructure:"gossip-atx-validation-throttle"`
PingPeers []string `mapstructure:"ping-peers"`
PingInterval time.Duration `mapstructure:"ping-interval"`
Relay bool `mapstructure:"relay"`
StaticRelays []string `mapstructure:"static-relays"`
EnableTCPTransport bool `mapstructure:"enable-tcp-transport"`
EnableQUICTransport bool `mapstructure:"enable-quic-transport"`
EnableRoutingDiscovery bool `mapstructure:"enable-routing-discovery"`
RoutingDiscoveryAdvertise bool `mapstructure:"routing-discovery-advertise"`
AdvertiseInterval time.Duration `mapstructure:"advertise-interval"`
AutoNATServer AutoNATServer `mapstructure:"auto-nat-server"`
DisableReusePort bool `mapstructure:"disable-reuseport"`
DisableNatPort bool `mapstructure:"disable-natport"`
DisableConnectionManager bool `mapstructure:"disable-connection-manager"`
DisableResourceManager bool `mapstructure:"disable-resource-manager"`
DisableDHT bool `mapstructure:"disable-dht"`
Flood bool `mapstructure:"flood"`
Listen AddressList `mapstructure:"listen"`
Bootnodes []string `mapstructure:"bootnodes"`
Direct []string `mapstructure:"direct"`
MinPeers int `mapstructure:"min-peers"`
LowPeers int `mapstructure:"low-peers"`
HighPeers int `mapstructure:"high-peers"`
InboundFraction float64 `mapstructure:"inbound-fraction"`
OutboundFraction float64 `mapstructure:"outbound-fraction"`
AutoscalePeers bool `mapstructure:"autoscale-peers"`
AdvertiseAddress AddressList `mapstructure:"advertise-address"`
AcceptQueue int `mapstructure:"p2p-accept-queue"`
Metrics bool `mapstructure:"p2p-metrics"`
Bootnode bool `mapstructure:"p2p-bootnode"`
ForceReachability string `mapstructure:"p2p-reachability"`
ForceDHTServer bool `mapstructure:"force-dht-server"`
EnableHolepunching bool `mapstructure:"p2p-holepunching"`
PrivateNetwork bool `mapstructure:"p2p-private-network"`
RelayServer RelayServer `mapstructure:"relay-server"`
IP4Blocklist []string `mapstructure:"ip4-blocklist"`
IP6Blocklist []string `mapstructure:"ip6-blocklist"`
GossipQueueSize int `mapstructure:"gossip-queue-size"`
GossipValidationThrottle int `mapstructure:"gossip-validation-throttle"`
GossipAtxValidationThrottle int `mapstructure:"gossip-atx-validation-throttle"`
PingPeers []string `mapstructure:"ping-peers"`
PingInterval time.Duration `mapstructure:"ping-interval"`
Relay bool `mapstructure:"relay"`
StaticRelays []string `mapstructure:"static-relays"`
EnableTCPTransport bool `mapstructure:"enable-tcp-transport"`
EnableQUICTransport bool `mapstructure:"enable-quic-transport"`
EnableRoutingDiscovery bool `mapstructure:"enable-routing-discovery"`
RoutingDiscoveryAdvertise bool `mapstructure:"routing-discovery-advertise"`
DiscoveryTimings DiscoveryTimings `mapstructure:"discovery-timings"`
AutoNATServer AutoNATServer `mapstructure:"auto-nat-server"`
}

type DiscoveryTimings struct {
AdvertiseDelay time.Duration `mapstructure:"advertise-delay"`
AdvertiseInterval time.Duration `mapstructure:"advertise-interval"`
AdvertiseRetryDelay time.Duration `mapstructure:"advertise-retry-delay"`
FindPeersRetryDelay time.Duration `mapstructure:"find-peers-retry-delay"`
}

type AutoNATServer struct {
Expand Down
Loading

0 comments on commit f348c1e

Please sign in to comment.