Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tcpreuse): add options for sharing TCP listeners amongst TCP, WS and WSS transports #2984

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6281130
feat(tcpreuse): add options for sharing TCP listeners amongst TCP, WS…
aschmahmann Sep 25, 2024
2ae20f4
add tests for listener
sukunrt Oct 2, 2024
05ee445
create conn scope early to prevent DoS attacks
sukunrt Oct 6, 2024
3457a7b
add fx option, move rcmgr and upgrader to sharedtcp
sukunrt Oct 6, 2024
ac8b478
make fewer concurrent connections in test, it breaks mac and windows
sukunrt Oct 7, 2024
6d5fa53
add some comments
sukunrt Oct 7, 2024
0115a62
Add OS specific sampledconn
MarcoPolo Oct 29, 2024
cdbf16c
Return net.Conn unwrapped if possible
MarcoPolo Oct 29, 2024
7f4f875
feat(tcpreuse): add Windows sampledconn
aschmahmann Oct 30, 2024
00c9403
Add test for metrics+unix
MarcoPolo Oct 30, 2024
2c053d8
tcp transport: Parameterize metrics collector in TCP
MarcoPolo Oct 30, 2024
7e34d05
simplify demultiplex a bit
MarcoPolo Oct 30, 2024
e465b29
transport-testsuite(old): Parameterize subtests
MarcoPolo Oct 30, 2024
677222a
tcp-transport: selectively run only 1conn tests
MarcoPolo Oct 30, 2024
8c334af
sampledconn: update tests to be manet aware
MarcoPolo Oct 30, 2024
b686b80
Remove unused interface
MarcoPolo Oct 30, 2024
a28a244
sampledconn: Add test case back in
MarcoPolo Oct 30, 2024
a39490f
Comment nits
MarcoPolo Oct 30, 2024
76f560b
tcpreuse: flip reuseport bool
MarcoPolo Oct 31, 2024
c314a2a
tcpreuse: return an error on multiple listeners for the same addr+con…
MarcoPolo Oct 31, 2024
56dabd4
websocket: return consistent error
MarcoPolo Oct 31, 2024
9ebfc89
Remove todo
MarcoPolo Oct 31, 2024
a722a7c
tcp: revert parameterize metrics collector
MarcoPolo Oct 31, 2024
85be1f5
typo
MarcoPolo Oct 31, 2024
18af914
PR review
MarcoPolo Oct 31, 2024
f9f39b7
typo
MarcoPolo Oct 31, 2024
9ca4ae7
add timeout
MarcoPolo Oct 31, 2024
3d26583
pr nit
MarcoPolo Oct 31, 2024
3487369
remove unused option
MarcoPolo Oct 31, 2024
74e9c05
Expand comment
MarcoPolo Oct 31, 2024
fd53643
remove 0x160304 magic byte match
MarcoPolo Oct 31, 2024
ccd1609
Fix test to handle existing listener error
MarcoPolo Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -145,6 +146,8 @@ type Config struct {
CustomIPv6BlackHoleSuccessCounter bool

UserFxOptions []fx.Option

ShareTCPListener bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -289,6 +292,12 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func(gater connmgr.ConnectionGater, rcmgr network.ResourceManager) *tcpreuse.ConnMgr {
if !cfg.ShareTCPListener {
return nil
}
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, gater, rcmgr)
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
}),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
quicAddrPorts := map[string]struct{}{}
Expand Down
15 changes: 14 additions & 1 deletion libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTransportConstructor(t *testing.T) {
_ connmgr.ConnectionGater,
upgrader transport.Upgrader,
) transport.Transport {
tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
require.NoError(t, err)
return tpt
}
Expand Down Expand Up @@ -751,3 +751,16 @@ func getTLSConf(t *testing.T, ip net.IP, start, end time.Time) *tls.Config {
}},
}
}

func TestSharedTCPAddr(t *testing.T) {
h, err := New(
ShareTCPListener(),
Transport(tcp.NewTCPTransport),
Transport(websocket.New),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888"),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888/ws"),
)
require.NoError(t, err)
fmt.Println(h.Addrs())
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
h.Close()
}
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,11 @@ func WithFxOption(opts ...fx.Option) Option {
return nil
}
}

// ShareTCPListener shares the same listen address between TCP and Websocket transports.
func ShareTCPListener() Option {
return func(cfg *Config) error {
cfg.ShareTCPListener = true
return nil
}
}
Copy link
Member

@sukunrt sukunrt Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API has the nice property of keeping all current code same and enabling sharing by just specifying

libp2p.New(
...
ShareTCPListener()
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As libp2p.New user I will wonder why I need to explicitly pass this Option to enable sharing for TCP ports, but don't have to do anything extra for UDP ones.

No chance of making this implicit? Or at least explain in godoc comment why this needs to be explicit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is that I want to get some confidence here before defaulting this behavior to on. In a future release it'll be the default and you wouldn't need this.

Aside: UDP transports don't currently share the port if you use port 0 across them (this does). That's also something that I'd like to change in a future release.

I'll update the comment.

2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func makeSwarmWithNoListenAddrs(t *testing.T, opts ...Option) *Swarm {
upgrader := makeUpgrader(t, s)
var tcpOpts []tcp.Option
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDialAddressSelection(t *testing.T) {
s, err := swarm.NewSwarm("local", nil, eventbus.NewBus())
require.NoError(t, err)

tcpTr, err := tcp.NewTCPTransport(nil, nil)
tcpTr, err := tcp.NewTCPTransport(nil, nil, nil)
require.NoError(t, err)
require.NoError(t, s.AddTransport(tcpTr))
reuse, err := quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
Expand Down
8 changes: 4 additions & 4 deletions p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestAddrsForDial(t *testing.T) {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

tpt, err := websocket.New(nil, &network.NullResourceManager{})
tpt, err := websocket.New(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(ResolverFromMaDNS{resolver}))
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestDedupAddrsForDial(t *testing.T) {
require.NoError(t, err)
defer s.Close()

tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
})

// Add a tcp transport so that we know we can dial a tcp multiaddr and we don't filter it out.
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand All @@ -151,7 +151,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
err = s.AddTransport(wtTpt)
require.NoError(t, err)

wsTpt, err := websocket.New(nil, &network.NullResourceManager{})
wsTpt, err := websocket.New(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(wsTpt)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func GenSwarm(t testing.TB, opts ...Option) *swarm.Swarm {
if cfg.disableReuseport {
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
}
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand Down
38 changes: 24 additions & 14 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,33 @@ func (l *listener) handleIncoming() {
}
catcher.Reset()

// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
// Check if we already have a connection scope. See the comment in tcpreuse/listener.go for an explanation.
var connScope network.ConnManagementScope
if sc, ok := maconn.(interface {
Scope() network.ConnManagementScope
}); ok {
connScope = sc.Scope()
}
if connScope == nil {
// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by resource manager: %s", err)
var err error
connScope, err = l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by resource manager: %s", err)
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
}
continue
}
continue
}

// The go routine below calls Release when the context is
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u
upgrader := swarmt.GenUpgrader(t, netw, nil)
upgraders = append(upgraders, upgrader)

tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
25 changes: 24 additions & 1 deletion p2p/test/transport/gating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package transport_integration

import (
"context"
"encoding/binary"
"net/netip"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -30,6 +32,23 @@ func stripCertHash(addr ma.Multiaddr) ma.Multiaddr {
return addr
}

func addrPort(addr ma.Multiaddr) netip.AddrPort {
a := netip.Addr{}
p := uint16(0)
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 || c.Protocol().Code == ma.P_IP6 {
a, _ = netip.AddrFromSlice(c.RawValue())
return false
}
if c.Protocol().Code == ma.P_UDP || c.Protocol().Code == ma.P_TCP {
p = binary.BigEndian.Uint16(c.RawValue())
return true
}
return false
})
return netip.AddrPortFrom(a, p)
}

func TestInterceptPeerDial(t *testing.T) {
if race.WithRace() {
t.Skip("The upgrader spawns a new Go routine, which leads to race conditions when using GoMock.")
Expand Down Expand Up @@ -173,10 +192,14 @@ func TestInterceptAccept(t *testing.T) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
}).AnyTimes()
} else if strings.Contains(tc.Name, "WebSocket-Shared") {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, addrPort(h2.Addrs()[0]), addrPort(addrs.LocalMultiaddr()))
})
} else {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr(), "%s\n%s", h2.Addrs()[0], addrs.LocalMultiaddr())
})
}

Expand Down
32 changes: 32 additions & 0 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@ var transportsToTest = []TransportTestCase{
return h
},
},
{
Name: "TCP-Shared / TLS / Yamux",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.ShareTCPListener())
libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New))
libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket-Shared",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.ShareTCPListener())
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
Expand Down
29 changes: 19 additions & 10 deletions p2p/transport/tcp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (

const collectFrequency = 10 * time.Second

var collector *aggregatingCollector
var defaultCollector *aggregatingCollector
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved

var initMetricsOnce sync.Once

Expand All @@ -34,8 +34,8 @@ func initMetrics() {
bytesSentDesc = prometheus.NewDesc("tcp_sent_bytes", "TCP bytes sent", nil, nil)
bytesRcvdDesc = prometheus.NewDesc("tcp_rcvd_bytes", "TCP bytes received", nil, nil)

collector = newAggregatingCollector()
prometheus.MustRegister(collector)
defaultCollector = newAggregatingCollector()
prometheus.MustRegister(defaultCollector)

const direction = "direction"

Expand Down Expand Up @@ -196,14 +196,16 @@ func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {

func (c *aggregatingCollector) ClosedConn(conn *tracingConn, direction string) {
c.mutex.Lock()
collector.removeConn(conn.id)
c.removeConn(conn.id)
c.mutex.Unlock()
closedConns.WithLabelValues(direction).Inc()
}

type tracingConn struct {
id uint64

collector *aggregatingCollector

startTime time.Time
isClient bool

Expand All @@ -213,7 +215,8 @@ type tracingConn struct {
closeErr error
}

func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
// newTracingConn wraps a manet.Conn with a tracingConn. A nil collector will use the default collector.
func newTracingConn(c manet.Conn, collector *aggregatingCollector, isClient bool) (*tracingConn, error) {
initMetricsOnce.Do(func() { initMetrics() })
conn, err := tcp.NewConn(c)
if err != nil {
Expand All @@ -224,8 +227,12 @@ func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
isClient: isClient,
Conn: c,
tcpConn: conn,
collector: collector,
}
if tc.collector == nil {
tc.collector = defaultCollector
}
tc.id = collector.AddConn(tc)
tc.id = tc.collector.AddConn(tc)
newConns.WithLabelValues(tc.getDirection()).Inc()
return tc, nil
}
Expand All @@ -239,7 +246,7 @@ func (c *tracingConn) getDirection() string {

func (c *tracingConn) Close() error {
c.closeOnce.Do(func() {
collector.ClosedConn(c, c.getDirection())
c.collector.ClosedConn(c, c.getDirection())
c.closeErr = c.Conn.Close()
})
return c.closeErr
Expand All @@ -258,16 +265,18 @@ func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {

type tracingListener struct {
manet.Listener
collector *aggregatingCollector
}

func newTracingListener(l manet.Listener) *tracingListener {
return &tracingListener{Listener: l}
// newTracingListener wraps a manet.Listener with a tracingListener. A nil collector will use the default collector.
func newTracingListener(l manet.Listener, collector *aggregatingCollector) *tracingListener {
return &tracingListener{Listener: l, collector: collector}
}

func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return newTracingConn(conn, false)
return newTracingConn(conn, l.collector, false)
}
8 changes: 6 additions & 2 deletions p2p/transport/tcp/metrics_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ package tcp

import manet "github.com/multiformats/go-multiaddr/net"

func newTracingConn(c manet.Conn, _ bool) (manet.Conn, error) { return c, nil }
func newTracingListener(l manet.Listener) manet.Listener { return l }
type aggregatingCollector struct{}

func newTracingConn(c manet.Conn, collector *aggregatingCollector, isClient bool) (manet.Conn, error) {
return c, nil
}
func newTracingListener(l manet.Listener, collector *aggregatingCollector) manet.Listener { return l }
Loading
Loading