Skip to content

Commit

Permalink
add server rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Aug 12, 2023
1 parent 8ae915d commit 7d524a9
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 100 deletions.
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,7 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
func (pn *peernet) ResourceManager() network.ResourceManager {
return &network.NullResourceManager{}
}

func (pn *peernet) CanDial(addr ma.Multiaddr) bool {
return true
}
35 changes: 22 additions & 13 deletions p2p/protocol/autonatv2/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2/pbv2"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"golang.org/x/exp/rand"
Expand All @@ -29,19 +30,27 @@ type AutoNAT struct {
allowAllAddrs bool // for testing
}

func New(h host.Host, dialer host.Host) (*AutoNAT, error) {
func New(h host.Host, dialer host.Host, opts ...AutoNATOption) (*AutoNAT, error) {
s := defaultSettings()
for _, o := range opts {
if err := o(s); err != nil {
return nil, err
}
}
sub, err := h.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
if err != nil {
return nil, fmt.Errorf("failed to subscribe to event.EvtLocalReachabilityChanged: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())

an := &AutoNAT{
host: h,
ctx: ctx,
cancel: cancel,
sub: sub,
srv: &Server{dialer: dialer, host: h},
cli: NewClient(h),
host: h,
ctx: ctx,
cancel: cancel,
sub: sub,
srv: NewServer(h, dialer, s),
cli: NewClient(h),
allowAllAddrs: s.allowAllAddrs,
}
an.cli.Register()

Expand All @@ -54,7 +63,7 @@ func (an *AutoNAT) background() {
for {
select {
case <-an.ctx.Done():
an.srv.Stop()
an.srv.Disable()
an.wg.Done()
return
case evt := <-an.sub.Out():
Expand All @@ -64,9 +73,9 @@ func (an *AutoNAT) background() {
log.Errorf("Unexpected event %s of type %T", evt, evt)
}
if revt.Reachability == network.ReachabilityPrivate {
an.srv.Stop()
an.srv.Disable()
} else {
an.srv.Start()
an.srv.Enable()
}
}
}
Expand Down Expand Up @@ -119,9 +128,9 @@ func (an *AutoNAT) validPeer() peer.ID {
}

type Result struct {
Addr ma.Multiaddr
Rch network.Reachability
Err error
Addr ma.Multiaddr
Reachability network.Reachability
Status pbv2.DialStatus
}

var (
Expand Down
39 changes: 22 additions & 17 deletions p2p/protocol/autonatv2/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
Expand All @@ -19,11 +20,13 @@ import (
"github.com/stretchr/testify/require"
)

func newAutoNAT(t *testing.T) *AutoNAT {
func newAutoNAT(t *testing.T, dialer host.Host, opts ...AutoNATOption) *AutoNAT {
t.Helper()
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
dialer := bhost.NewBlankHost(swarmt.GenSwarm(t))
an, err := New(h, dialer)
if dialer == nil {
dialer = bhost.NewBlankHost(swarmt.GenSwarm(t))
}
an, err := New(h, dialer, opts...)
if err != nil {
t.Error(err)
}
Expand All @@ -45,7 +48,7 @@ func parseAddrs(t *testing.T, msg *pbv2.Message) []ma.Multiaddr {
}

func TestValidPeer(t *testing.T) {
an := newAutoNAT(t)
an := newAutoNAT(t, nil)
require.Equal(t, an.validPeer(), peer.ID(""))
an.host.Peerstore().AddAddr("peer1", ma.StringCast("/ip4/127.0.0.1/tcp/1"), peerstore.PermanentAddrTTL)
an.host.Peerstore().AddAddr("peer2", ma.StringCast("/ip4/127.0.0.1/tcp/2"), peerstore.PermanentAddrTTL)
Expand All @@ -72,15 +75,14 @@ func TestValidPeer(t *testing.T) {
}

func TestAutoNATPrivateAddr(t *testing.T) {
an := newAutoNAT(t)
an := newAutoNAT(t, nil)
res, err := an.CheckReachability(context.Background(), []ma.Multiaddr{ma.StringCast("/ip4/192.168.0.1/udp/10/quic-v1")}, nil)
require.Nil(t, res)
require.NotNil(t, err)
}

func TestClientRequest(t *testing.T) {
an := newAutoNAT(t)
an.allowAllAddrs = true
an := newAutoNAT(t, nil)

addrs := an.host.Addrs()

Expand Down Expand Up @@ -111,8 +113,7 @@ func TestClientRequest(t *testing.T) {
}

func TestClientServerError(t *testing.T) {
an := newAutoNAT(t)
an.allowAllAddrs = true
an := newAutoNAT(t, nil, allowAll)
addrs := an.host.Addrs()

p := bhost.NewBlankHost(swarmt.GenSwarm(t))
Expand All @@ -122,7 +123,10 @@ func TestClientServerError(t *testing.T) {
tests := []struct {
handler func(network.Stream)
}{
{handler: func(s network.Stream) { s.Reset(); done <- true }},
{handler: func(s network.Stream) {
s.Reset()
done <- true
}},
{handler: func(s network.Stream) {
r := pbio.NewDelimitedReader(s, maxMsgSize)
var msg pbv2.Message
Expand Down Expand Up @@ -156,8 +160,7 @@ func TestClientServerError(t *testing.T) {
}

func TestClientDataRequest(t *testing.T) {
an := newAutoNAT(t)
an.allowAllAddrs = true
an := newAutoNAT(t, nil, allowAll)
addrs := an.host.Addrs()

p := bhost.NewBlankHost(swarmt.GenSwarm(t))
Expand Down Expand Up @@ -230,8 +233,7 @@ func TestClientDataRequest(t *testing.T) {
}

func TestClientDialAttempts(t *testing.T) {
an := newAutoNAT(t)
an.allowAllAddrs = true
an := newAutoNAT(t, nil, allowAll)
addrs := an.host.Addrs()

p := bhost.NewBlankHost(swarmt.GenSwarm(t))
Expand All @@ -245,6 +247,9 @@ func TestClientDialAttempts(t *testing.T) {
}{
{
handler: func(s network.Stream) {
r := pbio.NewDelimitedReader(s, maxMsgSize)
var msg pbv2.Message
r.ReadMsg(&msg)
resp := &pbv2.DialResponse{
Status: pbv2.DialResponse_ResponseStatus_OK,
DialStatuses: []pbv2.DialStatus{pbv2.DialStatus_OK},
Expand Down Expand Up @@ -399,13 +404,13 @@ func TestClientDialAttempts(t *testing.T) {
require.NoError(t, err)
if !tc.success {
for i := 0; i < len(res); i++ {
require.Error(t, res[i].Err)
require.Equal(t, res[i].Rch, network.ReachabilityUnknown)
require.NotEqual(t, res[i].Status, pbv2.DialStatus_OK)
require.Equal(t, res[i].Reachability, network.ReachabilityUnknown)
}
} else {
success := false
for i := 0; i < len(res); i++ {
if res[i].Rch == network.ReachabilityPublic {
if res[i].Reachability == network.ReachabilityPublic {
success = true
break
}
Expand Down
11 changes: 5 additions & 6 deletions p2p/protocol/autonatv2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,26 @@ func (ac *Client) newResults(ds []pbv2.DialStatus, highPriorityAddrs []ma.Multia
} else {
addr = lowPriorityAddrs[i-len(highPriorityAddrs)]
}
err := ErrDialNotAttempted
rch := network.ReachabilityUnknown
status := pbv2.DialStatus_SKIPPED
if i < len(ds) {
switch ds[i] {
case pbv2.DialStatus_OK:
if areAddrsConsistent(attempt, addr) {
err = nil
status = pbv2.DialStatus_OK
rch = network.ReachabilityPublic
} else {
err = errors.New("attempt error")
status = pbv2.DialStatus_E_ATTEMPT_ERROR
rch = network.ReachabilityUnknown
}
case pbv2.DialStatus_E_DIAL_ERROR:
err = errors.New("dial failed")
rch = network.ReachabilityPrivate
default:
err = errors.New("other")
status = ds[i]
rch = network.ReachabilityUnknown
}
}
res[i] = Result{Addr: addr, Rch: rch, Err: err}
res[i] = Result{Addr: addr, Reachability: rch, Status: status}
}
return res
}
Expand Down
50 changes: 50 additions & 0 deletions p2p/protocol/autonatv2/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package autonatv2

import "time"

type autoNATSettings struct {
allowAllAddrs bool
serverRPM int
serverRPMPerPeer int
dataRequestPolicy dataRequestPolicyFunc
now func() time.Time
}

func defaultSettings() *autoNATSettings {
return &autoNATSettings{
allowAllAddrs: false,
serverRPM: 20,
serverRPMPerPeer: 2,
dataRequestPolicy: defaultDataRequestPolicy,
now: time.Now,
}
}

type AutoNATOption func(s *autoNATSettings) error

func allowAll(s *autoNATSettings) error {
s.allowAllAddrs = true
return nil
}

func WithServerRateLimit(rpm, rpmPerPeer int) AutoNATOption {
return func(s *autoNATSettings) error {
s.serverRPM = rpm
s.serverRPMPerPeer = rpmPerPeer
return nil
}
}

func WithDataRequestPolicy(drp dataRequestPolicyFunc) AutoNATOption {
return func(s *autoNATSettings) error {
s.dataRequestPolicy = drp
return nil
}
}

func WithNow(now func() time.Time) AutoNATOption {
return func(s *autoNATSettings) error {
s.now = now
return nil
}
}
Loading

0 comments on commit 7d524a9

Please sign in to comment.