-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(tcpreuse): add options for sharing TCP listeners amongst TCP, WS and WSS transports #2984
base: master
Are you sure you want to change the base?
Conversation
const ( | ||
Unknown DemultiplexedConnType = iota | ||
MultistreamSelect | ||
HTTP | ||
TLS | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This setup is hardcoded and doesn't allow others to add new protocols to demultiplex from the same listener without making a PR. This seems ok given we have to be careful about making sure we don't get magic byte collisions between different protocols, but if we want to set this up as a registry object instead that seems fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. Thought needs to put into extending this. Compared to that, adding a new enum here is easy.
|
||
func IsTLS(s Sample) bool { | ||
switch string(s[:]) { | ||
case "\x16\x03\x01", "\x16\x03\x02", "\x16\x03\x03", "\x16\x03\x04": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure from reading https://datatracker.ietf.org/doc/html/rfc8446#section-4.1.2 it seems like \x16\x03\x04
is never expected in the client hello and so should be here. (fyi @Jorropo since I took this from magiselect)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean so should not be here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, thanks
@@ -122,6 +133,9 @@ type TcpTransport struct { | |||
disableReuseport bool // Explicitly disable reuseport. | |||
enableMetrics bool | |||
|
|||
// share and demultiplex TCP listeners across multiple transports | |||
sharedTcp *tcpreuse.ConnMgr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestions on what to call this feature/option? I grabbed the ConnMgr
name because it's used in the quicreuse package, but it doesn't seem quite right and tcpreuse seems likely to be confused with reuseport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. I also don't like the connmgr name in quicreuse. But I don't have a better name that isn't also confusing. At least this is consistently confusing (?)
p2p/transport/tcpreuse/listener.go
Outdated
} | ||
|
||
func (m *multiplexedListener) Run() error { | ||
const numWorkers = 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have preset workers, options, etc. but I wonder if the resource manager might also help with the job here. My understanding is that it's used starting the multistream level because it won't play nicely with just a net.Conn, maybe this means we can move the resource management lower in the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was refactored to use a channel semaphore with N=64. That should be good enough for most (all?) use cases.
This thing should be very fast we are reading the first 3 bytes of the first packet.
b539b9e
to
b6f1fb0
Compare
// EnvReuseportVal stores the value of envReuseport. defaults to true. | ||
var EnvReuseportVal = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel good to export as it's only used for tests and is the result of me moving things to avoid dependency cycles (and separate concerns).
Any suggestions on where this reuseport logic lives? Could potentially move the tests or have enough options to thread the testing logic through
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a refactor to be done here, but maybe we do that in a separate PR? This one is already quite big
Overall it looks quite good. The only thing major thing that we must fix before merging is this:
We need a way to limit the number of connections being upgraded per IP/subnet. Otherwise, a single node can make the target node unresponsive. We don't have the endpoint multiaddr at this point. This makes it difficult to use the resource manager as is. It's not too bad though, The only useful part of
I think 1 is fine. AllowList anyway works with a IP / Subnet. We can make this clear in the documentation for Resource Manager. |
6f21a3f
to
60cef4d
Compare
func NewTCPTransport(upgrader transport.Upgrader, rcmgr network.ResourceManager, sharedTCP *tcpreuse.ConnMgr, opts ...Option) (*TcpTransport, error) { | ||
if rcmgr == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change.
It simplifies the top level basic host api a lot. You can do:
libp2p.New(Transport(tcp.NewTransport), Transport(websocket.New), ShareTCPListener())
And not need to specify the rcmgr, or the connection gater.
I'm confused how to provide this API, enabling sharing by just specifying ShareTCPListener
, and pass in the sharedTCP objects as one of the optional arguments TCP Transport. If we can pass in the sharedTCP object as an optional argument that will prevent breaking this.
The simplest alternative implementation wise, without any breaking change, is an API like:
tcmgr := tcpreuse.NewConnMgr(false, gater, rcmgr)
libp2p.New(
Transport(tcp.NewTransport, tcp.WithSharedTCP(tcmgr)),
Transport(websocket.New, websocket.WithSharedTCP(tcmgr)),
)
// Gate and resource limit the connection here. | ||
// If done after sampling the connection, we'll be vulnerable to DOS attacks by a single peer | ||
// which clogs up our entire connection queue. | ||
// This duplicates the responsibility of gating and resource limiting between here and the upgrader. The | ||
// alternative without duplication requires moving the process of upgrading the connection here, which forces | ||
// us to establish the websocket connection here. That is more duplication, or a significant breaking change. | ||
// | ||
// Bugs around multiple calls to OpenConnection or InterceptAccept are prevented by the transport | ||
// integration tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the smallest change I could think of which provided this DoS by a single node prevention.
It's also nicely guarded by transport integration tests, so this feels okay to me.
mnc, err := manet.WrapNetConn(c) | ||
if err != nil { | ||
c.Close() | ||
return nil, err | ||
} | ||
return mnc, nil | ||
return c, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapping it by manet shadows the Scope
method which we need for the upgrader. Any regression here is prevented by transport integration tests.
options.go
Outdated
// ShareTCPListener shares the same listen address between TCP and Websocket transports. | ||
func ShareTCPListener() Option { | ||
return func(cfg *Config) error { | ||
cfg.ShareTCPListener = true | ||
return nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API has the nice property of keeping all current code same and enabling sharing by just specifying
libp2p.New(
...
ShareTCPListener()
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As libp2p.New
user I will wonder why I need to explicitly pass this Option to enable sharing for TCP ports, but don't have to do anything extra for UDP ones.
No chance of making this implicit? Or at least explain in godoc comment why this needs to be explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason is that I want to get some confidence here before defaulting this behavior to on. In a future release it'll be the default and you wouldn't need this.
Aside: UDP transports don't currently share the port if you use port 0
across them (this does). That's also something that I'd like to change in a future release.
I'll update the comment.
…, and WSS transports
c3ff407
to
7e34d05
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this mostly looks good. Just a couple things I'll clean up in the next pass
const ( | ||
Unknown DemultiplexedConnType = iota | ||
MultistreamSelect | ||
HTTP | ||
TLS | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. Thought needs to put into extending this. Compared to that, adding a new enum here is easy.
cadd881
to
c314a2a
Compare
8bbc8a7
to
3487369
Compare
closes #2684
@MarcoPolo @sukunrt I took a stab at this as a result of looking at ipfs/kubo#10521. I also pulled from @Jorropo's work on #2737.
Initial testing seems to indicate things work ok, but this definitely needs more tests, API cleanup + eyes before merging.
Side note: Given that we're sharing the underlying TCP listener now I wonder if we basically have to tackle #1435 as well.