Skip to content

Commit

Permalink
complete poc
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jul 23, 2023
1 parent 27cd718 commit d8c2516
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 41 deletions.
3 changes: 3 additions & 0 deletions core/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ type Dialer interface {
// Notify/StopNotify register and unregister a notifiee for signals
Notify(Notifiee)
StopNotify(Notifiee)

// CanDial returns whether an address is dialable
CanDial(a ma.Multiaddr) bool
}

// AddrDelay provides an address along with the delay after which the address
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr,
return nil
}

func (s *Swarm) canDial(addr ma.Multiaddr) bool {
func (s *Swarm) CanDial(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return t != nil && t.CanDial(addr)
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul
// address

// filter addresses we cannot dial
addrs = ma.FilterAddrs(addrs, s.canDial)
addrs = ma.FilterAddrs(addrs, s.CanDial)

// filter low priority addresses among the addresses we can dial
addrs = filterLowPriorityAddresses(addrs)
Expand Down
3 changes: 2 additions & 1 deletion p2p/protocol/autonatv2/autonat.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package autonatv2

const protocol = "/libp2p/autonat/2"
const dialProtocol = "/libp2p/autonat/2/dial"
const attemptProtocol = "/libp2p/autonat/2/attempt"
const maxMsgSize = 4096
112 changes: 80 additions & 32 deletions p2p/protocol/autonatv2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package autonatv2

import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -17,29 +19,31 @@ import (

//go:generate protoc --go_out=. --go_opt=Mpb/autonat.proto=./pb pb/autonat.proto

type AutoNAT struct {
var (
ErrNoValidPeers = errors.New("autonat v2: No valid peers")
)

type Client struct {
h host.Host
dialCharge []byte

mu sync.Mutex
attemptChs map[uint64]chan attempt
mu sync.Mutex
attemptQueues map[uint64]chan attempt
}

func (c *AutoNAT) DialRequest(addrs []ma.Multiaddr) (DialResponse, error) {
func (c *Client) DialRequest(addrs []ma.Multiaddr) (DialResponse, error) {
peers := c.validPeers()
for _, p := range peers {
if resp, err := c.tryPeer(p, addrs); err == nil {
return resp, nil
}
if len(peers) == 0 {
return DialResponse{}, ErrNoValidPeers
}
return DialResponse{}, nil
return c.tryPeer(peers[0], addrs)
}

func (c *AutoNAT) validPeers() []peer.ID {
func (c *Client) validPeers() []peer.ID {
peers := c.h.Peerstore().Peers()
idx := 0
for _, p := range c.h.Peerstore().Peers() {
if proto, err := c.h.Peerstore().SupportsProtocols(p, protocol); len(proto) == 0 || err != nil {
if proto, err := c.h.Peerstore().SupportsProtocols(p, dialProtocol); len(proto) == 0 || err != nil {
continue
}
peers[idx] = p
Expand All @@ -50,35 +54,43 @@ func (c *AutoNAT) validPeers() []peer.ID {
return peers
}

func (c *AutoNAT) tryPeer(p peer.ID, addrs []ma.Multiaddr) (DialResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
func (c *Client) tryPeer(p peer.ID, addrs []ma.Multiaddr) (DialResponse, error) {
c.h.SetStreamHandler(attemptProtocol, c.handleDialAttempt)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
s, err := c.h.NewStream(ctx, p, protocol)
s, err := c.h.NewStream(ctx, p, dialProtocol)
if err != nil {
return DialResponse{}, err
}
s.SetDeadline(time.Now().Add(1 * time.Minute))
if err != nil {
return DialResponse{}, err
}
nonce := rand.Uint64()
ch := make(chan attempt, 1)
c.mu.Lock()
c.attemptChs[nonce] = ch
c.attemptQueues[nonce] = ch
c.mu.Unlock()

addrsb := make([][]byte, len(addrs))
for i, a := range addrs {
addrsb[i] = a.Bytes()
}
msg := &pb.Message{
Msg: &pb.Message_DialRequest{
DialRequest: &pb.DialRequest{
Addrs: make([][]byte, len(addrs)),
Addrs: addrsb,
Nonce: nonce,
},
},
}
r := pbio.NewDelimitedReader(s, maxMsgSize)
w := pbio.NewDelimitedWriter(s)
if err := w.WriteMsg(msg); err != nil {
return DialResponse{}, err
}

msg.Reset()
r := pbio.NewDelimitedReader(s, maxMsgSize)
if err := r.ReadMsg(msg); err != nil {
return DialResponse{}, err
}
Expand All @@ -90,12 +102,14 @@ func (c *AutoNAT) tryPeer(p peer.ID, addrs []ma.Multiaddr) (DialResponse, error)
if rem > len(c.dialCharge) {
n, err := s.Write(c.dialCharge)
if err != nil {
fmt.Println("done here")
return DialResponse{}, err
}
rem -= n
} else {
n, err := s.Write(c.dialCharge[:rem])
if err != nil {
fmt.Println("done for here")
return DialResponse{}, err
}
rem -= n
Expand All @@ -107,28 +121,62 @@ func (c *AutoNAT) tryPeer(p peer.ID, addrs []ma.Multiaddr) (DialResponse, error)
}
}

select {
case <-ch:
return DialResponse{
Status: msg.GetDialResponse().GetStatus(),
Results: nil,
}, nil
case <-ctx.Done():
return DialResponse{}, ctx.Err()
if msg.GetDialResponse().GetStatus() != pb.DialResponse_ResponseStatus_OK {
return DialResponse{Status: msg.GetDialResponse().GetStatus()}, nil
}

dialSuccess := false
statuses := msg.GetDialResponse().GetDialStatuses()
for _, st := range statuses {
if st == pb.DialResponse_DialStaus_OK {
dialSuccess = true
break
}
}

var attemptLocalAddr ma.Multiaddr
if dialSuccess {
select {
case at := <-ch:
fmt.Println("received nonce")
if at.nonce == nonce {
attemptLocalAddr = at.addr
}
case <-ctx.Done():
}
}
fmt.Println("writing")
results := make([]DialResult, len(addrs))
for i := 0; i < len(addrs); i++ {
if i >= len(statuses) {
results[i] = DialResult{Status: -1}
continue
}
results[i] = DialResult{Status: statuses[i], Addr: addrs[i]}
if results[i].Status == pb.DialResponse_DialStaus_OK {
if attemptLocalAddr != nil {
results[i].LocalAddr = attemptLocalAddr
} else {
results[i].Status = pb.DialResponse_E_ATTEMPT_ERROR
}
}
}
return DialResponse{
Status: msg.GetDialResponse().GetStatus(),
Results: results,
}, nil
}

func (c *AutoNAT) handleDialAttempt(s network.Stream) {
func (c *Client) handleDialAttempt(s network.Stream) {
r := pbio.NewDelimitedReader(s, maxMsgSize)
msg := &pb.DialAttempt{}
if err := r.ReadMsg(msg); err != nil {
return
}
// This is necessary to allow implementations that choose to keep a
// separate dialer with a different peerID from the nodes peerID

nonce := msg.GetNonce()
c.mu.Lock()
ch := c.attemptChs[nonce]
ch := c.attemptQueues[nonce]
c.mu.Unlock()
select {
case ch <- attempt{addr: s.Conn().LocalMultiaddr(), nonce: nonce}:
Expand All @@ -142,9 +190,9 @@ type DialResponse struct {
}

type DialResult struct {
ExternalAddr ma.Multiaddr
LocalAddr ma.Multiaddr
Status pb.DialResponse_DialStatus
Addr ma.Multiaddr
LocalAddr ma.Multiaddr
Status pb.DialResponse_DialStatus
}

type attempt struct {
Expand Down
125 changes: 125 additions & 0 deletions p2p/protocol/autonatv2/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package autonatv2

import (
"context"
"fmt"
"testing"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2/pb"
"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
)

func TestClient(t *testing.T) {
h := blankhost.NewBlankHost(swarmt.GenSwarm(t))
cli := &Client{
h: h,
attemptQueues: make(map[uint64]chan attempt),
}

p := blankhost.NewBlankHost(swarmt.GenSwarm(t))
pp := blankhost.NewBlankHost(swarmt.GenSwarm(t))
p.SetStreamHandler(dialProtocol, func(s network.Stream) {
defer s.Close()
r := pbio.NewDelimitedReader(s, maxMsgSize)
msg := new(pb.Message)
if err := r.ReadMsg(msg); err != nil {
t.Fatal(err)
}
var addrs []ma.Multiaddr
for _, a := range msg.GetDialRequest().GetAddrs() {
addr, err := ma.NewMultiaddrBytes(a)
if err != nil {
s.Close()
t.Fatal(err, addr)
break
}
addrs = append(addrs, addr)
}
peer := s.Conn().RemotePeer()
for _, a := range addrs {
pp.Peerstore().ClearAddrs(s.Conn().RemotePeer())
pp.Peerstore().AddAddr(peer, a, peerstore.PermanentAddrTTL)
s, err := pp.NewStream(context.Background(), peer, attemptProtocol)
if err != nil {
continue
}
amsg := &pb.DialAttempt{Nonce: msg.GetDialRequest().Nonce}
w := pbio.NewDelimitedWriter(s)
err = w.WriteMsg(amsg)
if err != nil {
t.Fatal(err)
break
}
break
}
msg.Reset()
msg.Msg = &pb.Message_DialResponse{
DialResponse: &pb.DialResponse{
Status: pb.DialResponse_ResponseStatus_OK,
DialStatuses: []pb.DialResponse_DialStatus{pb.DialResponse_E_ADDRESS_UNKNOWN, pb.DialResponse_DialStaus_OK},
},
}
w := pbio.NewDelimitedWriter(s)
if err := w.WriteMsg(msg); err != nil {
t.Fatal(err)
}
})

h.Peerstore().AddAddrs(p.ID(), p.Addrs(), peerstore.PermanentAddrTTL)
h.Peerstore().AddProtocols(p.ID(), dialProtocol)

resp, err := cli.DialRequest(h.Addrs())
if err != nil {
t.Fatal(err)
}
fmt.Println(resp)
}

func TestClientWithServer(t *testing.T) {
h := blankhost.NewBlankHost(swarmt.GenSwarm(t))
cli := &Client{
h: h,
attemptQueues: make(map[uint64]chan attempt),
}

p := blankhost.NewBlankHost(swarmt.GenSwarm(t))
pp := blankhost.NewBlankHost(swarmt.GenSwarm(t))
NewService(p, pp)

h.Peerstore().AddAddrs(p.ID(), p.Addrs(), peerstore.PermanentAddrTTL)
h.Peerstore().AddProtocols(p.ID(), dialProtocol)

resp, err := cli.DialRequest(h.Addrs())
if err != nil {
t.Fatal(err)
}
fmt.Println(resp)
}

func TestClientWithServerWithCharge(t *testing.T) {
h := blankhost.NewBlankHost(swarmt.GenSwarm(t))
cli := &Client{
h: h,
attemptQueues: make(map[uint64]chan attempt),
dialCharge: make([]byte, 1024),
}

p := blankhost.NewBlankHost(swarmt.GenSwarm(t))
pp := blankhost.NewBlankHost(swarmt.GenSwarm(t))
NewService(p, pp)

h.Peerstore().AddAddrs(p.ID(), p.Addrs(), peerstore.PermanentAddrTTL)
h.Peerstore().AddProtocols(p.ID(), dialProtocol)

addr := ma.StringCast("/ip4/127.0.0.1/tcp/12345")
resp, err := cli.DialRequest([]ma.Multiaddr{addr})
if err != nil {
t.Fatal(err)
}
fmt.Println(resp)
}
Loading

0 comments on commit d8c2516

Please sign in to comment.