From 214f7794c5d1b386e1947becb77f6eb2150c9b6d Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 30 Dec 2024 15:57:06 +0000 Subject: [PATCH] swarm_test: support more transports for GenSwarm This adds support for `/webtransport` andn `/webrtc-direct` to GenSwarm. Ideally, we should rewrite this to have the same semantics, opt into transports not opt out, as `libp2p.New`. But I need webtransport and webrtc support to write address inference tests for https://github.com/libp2p/go-libp2p/pull/3075 Depending on how disruptive this is to users, we can decide on whether to merge or drop this. --- p2p/net/swarm/swarm_notif_test.go | 24 ++++++- p2p/net/swarm/swarm_test.go | 7 +- p2p/net/swarm/testing/testing.go | 105 +++++++++++++++++++++++++++--- 3 files changed, 121 insertions(+), 15 deletions(-) diff --git a/p2p/net/swarm/swarm_notif_test.go b/p2p/net/swarm/swarm_notif_test.go index 0b6d56122e..fb4e119700 100644 --- a/p2p/net/swarm/swarm_notif_test.go +++ b/p2p/net/swarm/swarm_notif_test.go @@ -10,6 +10,7 @@ import ( . "github.com/libp2p/go-libp2p/p2p/net/swarm" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) @@ -93,11 +94,30 @@ func TestNotifications(t *testing.T) { } } + normalizeAddrs := func(a ma.Multiaddr, isLocal bool) ma.Multiaddr { + x, _ := ma.SplitFunc(a, func(c ma.Component) bool { + if c.Protocol().Code == ma.P_WEBTRANSPORT { + return true + } + return false + }) + if isLocal { + if manet.IsIPUnspecified(x) { + ip, rest := ma.SplitFirst(x) + if ip.Protocol().Code == ma.P_IP4 { + return ma.StringCast("/ip4/127.0.0.1").Encapsulate(rest) + } else { + return ma.StringCast("/ip6/::1").Encapsulate(rest) + } + } + } + return x + } complement := func(c network.Conn) (*Swarm, *netNotifiee, *Conn) { for i, s := range swarms { for _, c2 := range s.Conns() { - if c.LocalMultiaddr().Equal(c2.RemoteMultiaddr()) && - c2.LocalMultiaddr().Equal(c.RemoteMultiaddr()) { + if normalizeAddrs(c.LocalMultiaddr(), true).Equal(normalizeAddrs(c2.RemoteMultiaddr(), false)) && + normalizeAddrs(c2.LocalMultiaddr(), true).Equal(normalizeAddrs(c.RemoteMultiaddr(), false)) { return s, notifiees[i], c2.(*Conn) } } diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 3d92690b98..3b1fdeff1b 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -79,9 +79,8 @@ func makeSwarms(t *testing.T, num int, opts ...Option) []*swarm.Swarm { func connectSwarms(t *testing.T, ctx context.Context, swarms []*swarm.Swarm) { var wg sync.WaitGroup - connect := func(s *swarm.Swarm, dst peer.ID, addr ma.Multiaddr) { - // TODO: make a DialAddr func. - s.Peerstore().AddAddr(dst, addr, peerstore.PermanentAddrTTL) + connect := func(s *swarm.Swarm, dst peer.ID, addrs []ma.Multiaddr) { + s.Peerstore().AddAddrs(dst, addrs, peerstore.TempAddrTTL) if _, err := s.DialPeer(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } @@ -92,7 +91,7 @@ func connectSwarms(t *testing.T, ctx context.Context, swarms []*swarm.Swarm) { for i, s1 := range swarms { for _, s2 := range swarms[i+1:] { wg.Add(1) - connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0]) // try the first. + connect(s1, s2.LocalPeer(), s2.ListenAddresses()) } } wg.Wait() diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index 773314a1b8..e0619e0929 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -2,6 +2,7 @@ package testing import ( "crypto/rand" + "net" "testing" "time" @@ -24,21 +25,26 @@ import ( libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" "github.com/libp2p/go-libp2p/p2p/transport/tcp" + libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" + libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/quic-go/quic-go" "github.com/stretchr/testify/require" ) type config struct { - disableReuseport bool - dialOnly bool - disableTCP bool - disableQUIC bool - connectionGater connmgr.ConnectionGater - sk crypto.PrivKey - swarmOpts []swarm.Option - eventBus event.Bus + disableReuseport bool + dialOnly bool + disableTCP bool + disableQUIC bool + disableWebTransport bool + disableWebRTC bool + connectionGater connmgr.ConnectionGater + sk crypto.PrivKey + swarmOpts []swarm.Option + eventBus event.Bus clock } @@ -88,6 +94,16 @@ var OptDisableQUIC Option = func(_ testing.TB, c *config) { c.disableQUIC = true } +// OptDisableWebTransport disables WebTransport. +var OptDisableWebTransport Option = func(_ testing.TB, c *config) { + c.disableWebTransport = true +} + +// OptDisableWebRTC disables WebRTC. +var OptDisableWebRTC Option = func(_ testing.TB, c *config) { + c.disableWebRTC = true +} + // OptConnGater configures the given connection gater on the test func OptConnGater(cg connmgr.ConnectionGater) Option { return func(_ testing.TB, c *config) { @@ -175,8 +191,10 @@ func GenSwarm(t testing.TB, opts ...Option) *swarm.Swarm { } } } + var quicListenAddr ma.Multiaddr + var reuse *quicreuse.ConnManager if !cfg.disableQUIC { - reuse, err := quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) + reuse, err = quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) if err != nil { t.Fatal(err) } @@ -191,6 +209,75 @@ func GenSwarm(t testing.TB, opts ...Option) *swarm.Swarm { if err := s.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1")); err != nil { t.Fatal(err) } + for _, a := range s.ListenAddresses() { + if _, err := a.ValueForProtocol(ma.P_QUIC_V1); err == nil { + quicListenAddr = a + break + } + } + } + } + if !cfg.disableWebTransport { + if reuse == nil { + reuse, err = quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) + if err != nil { + t.Fatal(err) + } + } + wtTransport, err := libp2pwebtransport.New(priv, nil, reuse, cfg.connectionGater, nil) + if err != nil { + t.Fatal(err) + } + if err := s.AddTransport(wtTransport); err != nil { + t.Fatal(err) + } + if !cfg.dialOnly { + listenAddr := ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1/webtransport") + if quicListenAddr != nil { + listenAddr = quicListenAddr.Encapsulate(ma.StringCast("/webtransport")) + } + if err := s.Listen(listenAddr); err != nil { + t.Fatal(err) + } + } + } + + if !cfg.disableWebRTC { + listenUDPFn := func(network string, laddr *net.UDPAddr) (net.PacketConn, error) { + hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool { + quicAddrPorts := map[string]struct{}{} + for _, addr := range s.ListenAddresses() { + if _, err := addr.ValueForProtocol(ma.P_QUIC_V1); err == nil { + netw, addr, err := manet.DialArgs(addr) + if err != nil { + return false + } + quicAddrPorts[netw+"_"+addr] = struct{}{} + } + } + _, ok := quicAddrPorts[network+"_"+laddr.String()] + return ok + } + if hasQuicAddrPortFor(network, laddr) { + return reuse.SharedNonQUICPacketConn(network, laddr) + } + return net.ListenUDP(network, laddr) + } + wrtcTransport, err := libp2pwebrtc.New(priv, nil, cfg.connectionGater, nil, listenUDPFn) + if err != nil { + t.Fatal(err) + } + if err := s.AddTransport(wrtcTransport); err != nil { + t.Fatal(err) + } + if !cfg.dialOnly { + listenAddr := ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct") + if quicListenAddr != nil { + listenAddr = quicListenAddr.Decapsulate(ma.StringCast("/quic-v1")).Encapsulate(ma.StringCast("/webrtc-direct")) + } + if err := s.Listen(listenAddr); err != nil { + t.Fatal(err) + } } } if !cfg.dialOnly {