From b14b377aa3bdd23420c206cfc7b4adb466672739 Mon Sep 17 00:00:00 2001 From: Srdjan S Date: Thu, 21 Nov 2024 10:12:24 +0100 Subject: [PATCH] Daily commit --- p2p/test/transport/transport_test.go | 182 +++++++++++++-------------- p2p/transport/memory/conn.go | 6 +- p2p/transport/memory/stream.go | 58 +++++---- p2p/transport/memory/stream_test.go | 24 +++- 4 files changed, 149 insertions(+), 121 deletions(-) diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index e353ba6526..8a3ce3a979 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -26,13 +26,9 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/sec" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/protocol/ping" - "github.com/libp2p/go-libp2p/p2p/security/noise" - tls "github.com/libp2p/go-libp2p/p2p/security/tls" libp2pmemory "github.com/libp2p/go-libp2p/p2p/transport/memory" - libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc" "go.uber.org/mock/gomock" ma "github.com/multiformats/go-multiaddr" @@ -68,95 +64,95 @@ func transformOpts(opts TransportTestCaseOpts) []config.Option { } var transportsToTest = []TransportTestCase{ - { - Name: "TCP / Noise / Yamux", - HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { - libp2pOpts := transformOpts(opts) - libp2pOpts = append(libp2pOpts, libp2p.Security(noise.ID, noise.New)) - libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport)) - if opts.NoListen { - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - } else { - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) - } - h, err := libp2p.New(libp2pOpts...) - require.NoError(t, err) - return h - }, - }, - { - Name: "TCP / TLS / Yamux", - HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { - libp2pOpts := transformOpts(opts) - libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New)) - libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport)) - if opts.NoListen { - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - } else { - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) - } - h, err := libp2p.New(libp2pOpts...) - require.NoError(t, err) - return h - }, - }, - { - Name: "WebSocket", - HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { - libp2pOpts := transformOpts(opts) - if opts.NoListen { - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - } else { - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws")) - } - h, err := libp2p.New(libp2pOpts...) - require.NoError(t, err) - return h - }, - }, - { - Name: "QUIC", - HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { - libp2pOpts := transformOpts(opts) - if opts.NoListen { - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - } else { - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1")) - } - h, err := libp2p.New(libp2pOpts...) - require.NoError(t, err) - return h - }, - }, - { - Name: "WebTransport", - HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { - libp2pOpts := transformOpts(opts) - if opts.NoListen { - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - } else { - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1/webtransport")) - } - h, err := libp2p.New(libp2pOpts...) - require.NoError(t, err) - return h - }, - }, - { - Name: "WebRTC", - HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { - libp2pOpts := transformOpts(opts) - libp2pOpts = append(libp2pOpts, libp2p.Transport(libp2pwebrtc.New)) - if opts.NoListen { - libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) - } else { - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/webrtc-direct")) - } - h, err := libp2p.New(libp2pOpts...) - require.NoError(t, err) - return h - }, - }, + // { + // Name: "TCP / Noise / Yamux", + // HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { + // libp2pOpts := transformOpts(opts) + // libp2pOpts = append(libp2pOpts, libp2p.Security(noise.ID, noise.New)) + // libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport)) + // if opts.NoListen { + // libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) + // } else { + // libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + // } + // h, err := libp2p.New(libp2pOpts...) + // require.NoError(t, err) + // return h + // }, + // }, + // { + // Name: "TCP / TLS / Yamux", + // HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { + // libp2pOpts := transformOpts(opts) + // libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New)) + // libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport)) + // if opts.NoListen { + // libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) + // } else { + // libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + // } + // h, err := libp2p.New(libp2pOpts...) + // require.NoError(t, err) + // return h + // }, + // }, + // { + // Name: "WebSocket", + // HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { + // libp2pOpts := transformOpts(opts) + // if opts.NoListen { + // libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) + // } else { + // libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws")) + // } + // h, err := libp2p.New(libp2pOpts...) + // require.NoError(t, err) + // return h + // }, + // }, + // { + // Name: "QUIC", + // HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { + // libp2pOpts := transformOpts(opts) + // if opts.NoListen { + // libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) + // } else { + // libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1")) + // } + // h, err := libp2p.New(libp2pOpts...) + // require.NoError(t, err) + // return h + // }, + // }, + // { + // Name: "WebTransport", + // HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { + // libp2pOpts := transformOpts(opts) + // if opts.NoListen { + // libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) + // } else { + // libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1/webtransport")) + // } + // h, err := libp2p.New(libp2pOpts...) + // require.NoError(t, err) + // return h + // }, + // }, + // { + // Name: "WebRTC", + // HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { + // libp2pOpts := transformOpts(opts) + // libp2pOpts = append(libp2pOpts, libp2p.Transport(libp2pwebrtc.New)) + // if opts.NoListen { + // libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs) + // } else { + // libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/webrtc-direct")) + // } + // h, err := libp2p.New(libp2pOpts...) + // require.NoError(t, err) + // return h + // }, + // }, { Name: "Memory", HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host { diff --git a/p2p/transport/memory/conn.go b/p2p/transport/memory/conn.go index 515fb43625..d08b942a70 100644 --- a/p2p/transport/memory/conn.go +++ b/p2p/transport/memory/conn.go @@ -63,7 +63,8 @@ func newConnection( func (c *conn) Close() error { c.closed.Store(true) - for _, s := range c.streams { + for id, s := range c.streams { + c.removeStream(id) s.Close() } @@ -83,8 +84,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { func (c *conn) AcceptStream() (network.MuxedStream, error) { in := <-c.streamC - id := streamCounter.Add(1) - c.addStream(id, in) + c.addStream(in.id, in) return in, nil } diff --git a/p2p/transport/memory/stream.go b/p2p/transport/memory/stream.go index 66d8879f88..aa20eebd23 100644 --- a/p2p/transport/memory/stream.go +++ b/p2p/transport/memory/stream.go @@ -27,10 +27,9 @@ var ErrClosed = errors.New("stream closed") func newStreamPair() (*stream, *stream) { ra, rb := make(chan byte, 4096), make(chan byte, 4096) - wa, wb := rb, ra - in := newStream(rb, wb, network.DirInbound) - out := newStream(ra, wa, network.DirOutbound) + in := newStream(rb, ra, network.DirInbound) + out := newStream(ra, rb, network.DirOutbound) return in, out } @@ -47,28 +46,34 @@ func newStream(r, w chan byte, _ network.Direction) *stream { return s } -// How to handle errors with writes? func (s *stream) Write(p []byte) (n int, err error) { if s.closed.Load() { return 0, ErrClosed } - for i := 0; i < len(p); i++ { + select { + case <-s.reset: + return 0, network.ErrReset + case <-s.closeWrite: + return 0, ErrClosed + default: + } + + for n < len(p) { select { - case <-s.reset: - err = network.ErrReset - return case <-s.closeWrite: - err = ErrClosed - return - case s.write <- p[i]: - n = i + return n, ErrClosed + case <-s.reset: + return n, network.ErrReset + case s.write <- p[n]: + n++ default: err = io.ErrClosedPipe + return } } - return n + 1, err + return } func (s *stream) Read(p []byte) (n int, err error) { @@ -76,27 +81,32 @@ func (s *stream) Read(p []byte) (n int, err error) { return 0, ErrClosed } - for n = 0; n < len(p); n++ { + select { + case <-s.reset: + return 0, network.ErrReset + case <-s.closeRead: + return 0, ErrClosed + default: + } + + for n < len(p) { select { - case <-s.reset: - err = network.ErrReset - return case <-s.closeRead: - err = ErrClosed - return + return n, ErrClosed + case <-s.reset: + return n, network.ErrReset case b, ok := <-s.read: if !ok { - err = io.EOF - return + return n, ErrClosed } p[n] = b + n++ default: - err = io.EOF - return + return n, io.EOF } } - return + return n, nil } func (s *stream) CloseWrite() error { diff --git a/p2p/transport/memory/stream_test.go b/p2p/transport/memory/stream_test.go index cd5149c685..b23d3f7bb4 100644 --- a/p2p/transport/memory/stream_test.go +++ b/p2p/transport/memory/stream_test.go @@ -8,6 +8,7 @@ import ( ) func TestStreamSimpleReadWriteClose(t *testing.T) { + // t.Parallel() clientStr, serverStr := newStreamPair() // send a foobar from the client @@ -24,6 +25,7 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { b, err := io.ReadAll(serverStr) require.NoError(t, err) require.Equal(t, []byte("foobar"), b) + // reading again should give another io.EOF n, err = serverStr.Read(make([]byte, 10)) require.Zero(t, n) @@ -35,7 +37,6 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { require.NoError(t, serverStr.CloseWrite()) // and read it at the client - //require.False(t, clientDone.Load()) b, err = io.ReadAll(clientStr) require.NoError(t, err) require.Equal(t, []byte("lorem ipsum"), b) @@ -46,3 +47,24 @@ func TestStreamSimpleReadWriteClose(t *testing.T) { // Need to call Close for cleanup. Otherwise the FIN_ACK is never read require.NoError(t, serverStr.Close()) } + +func TestStreamPartialReads(t *testing.T) { + // t.Parallel() + clientStr, serverStr := newStreamPair() + + _, err := serverStr.Write([]byte("foobar")) + require.NoError(t, err) + require.NoError(t, serverStr.CloseWrite()) + + n, err := clientStr.Read([]byte{}) // empty read + require.NoError(t, err) + require.Zero(t, n) + b := make([]byte, 3) + n, err = clientStr.Read(b) + require.Equal(t, 3, n) + require.NoError(t, err) + require.Equal(t, []byte("foo"), b) + b, err = io.ReadAll(clientStr) + require.NoError(t, err) + require.Equal(t, []byte("bar"), b) +}