diff --git a/src/app/libp2p_helper/src/libp2p_helper/app.go b/src/app/libp2p_helper/src/libp2p_helper/app.go index a740ba37d52..2c49af18ea3 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/app.go +++ b/src/app/libp2p_helper/src/libp2p_helper/app.go @@ -46,11 +46,11 @@ func (app *app) SetConnectionHandlers() { app.setConnectionHandlersOnce.Do(func() { app.P2p.ConnectionManager.AddOnConnectHandler(func(net net.Network, c net.Conn) { app.updateConnectionMetrics() - app.writeMsg(mkPeerConnectedUpcall(peer.Encode(c.RemotePeer()))) + app.writeMsg(mkPeerConnectedUpcall(c.RemotePeer().String())) }) app.P2p.ConnectionManager.AddOnDisconnectHandler(func(net net.Network, c net.Conn) { app.updateConnectionMetrics() - app.writeMsg(mkPeerDisconnectedUpcall(peer.Encode(c.RemotePeer()))) + app.writeMsg(mkPeerDisconnectedUpcall(c.RemotePeer().String())) }) }) } @@ -119,7 +119,7 @@ func (app *app) WriteStream(streamId uint64, data []byte) error { // another goroutine close_err := stream.stream.Close() if close_err != nil { - app.P2p.Logger.Errorf("failed to close stream %d after encountering write failure (%s): %s", streamId, err.Error(), close_err.Error()) + app.P2p.Logger.Debugf("failed to close stream %d after encountering write failure (%s): %s", streamId, err.Error(), close_err.Error()) } } return wrapError(badp2p(err), fmt.Sprintf("only wrote %d out of %d bytes", n, len(data))) @@ -308,13 +308,13 @@ func (app *app) checkPeerCount() { err = prometheus.Register(peerCount) if err != nil { - app.P2p.Logger.Debugf("couldn't register peer_count; perhaps we've already done so", err.Error()) + app.P2p.Logger.Debugf("couldn't register peer_count; perhaps we've already done so: %s", err) return } err = prometheus.Register(connectedPeerCount) if err != nil { - app.P2p.Logger.Debugf("couldn't register connected_peer_count; perhaps we've already done so", err.Error()) + app.P2p.Logger.Debugf("couldn't register connected_peer_count; perhaps we've already done so: %s", err) return } diff --git a/src/app/libp2p_helper/src/libp2p_helper/main_test.go b/src/app/libp2p_helper/src/libp2p_helper/main_test.go index 2c8c67bdf4b..bc135bafeae 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/main_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/main_test.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "os" "strings" "sync" @@ -23,6 +22,8 @@ import ( net "github.com/libp2p/go-libp2p/core/network" + gonet "net" + ipc "libp2p_ipc" "github.com/stretchr/testify/require" @@ -54,7 +55,7 @@ const ( ) func TestMplex_SendLargeMessage(t *testing.T) { - // assert we are able to send and receive a message with size up to 1 << 30 bytes + // assert we are able to send and receive a message with size up to 1 MiB appA, _ := newTestApp(t, nil, true) appA.NoDHT = true @@ -68,7 +69,7 @@ func TestMplex_SendLargeMessage(t *testing.T) { err = appB.P2p.Host.Connect(appB.Ctx, appAInfos[0]) require.NoError(t, err) - msgSize := uint64(1 << 30) + msgSize := uint64(1 << 20) withTimeoutAsync(t, func(done chan interface{}) { // create handler that reads `msgSize` bytes @@ -274,9 +275,14 @@ func TestLibp2pMetrics(t *testing.T) { appB.P2p.Host.SetStreamHandler(testProtocol, handler) + listener, err := gonet.Listen("tcp", ":0") + if err != nil { + panic(err) + } + port := listener.Addr().(*gonet.TCPAddr).Port server := http.NewServeMux() server.Handle("/metrics", promhttp.Handler()) - go http.ListenAndServe(":9001", server) + go http.Serve(listener, server) go appB.checkPeerCount() go appB.checkMessageStats() @@ -292,11 +298,11 @@ func TestLibp2pMetrics(t *testing.T) { expectedPeerCount := len(appB.P2p.Host.Network().Peers()) expectedCurrentConnCount := appB.P2p.ConnectionManager.GetInfo().ConnCount - resp, err := http.Get("http://localhost:9001/metrics") + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port)) require.NoError(t, err) defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) require.NoError(t, err) respBody := string(body) diff --git a/src/app/libp2p_helper/src/libp2p_helper/message_id_test.go b/src/app/libp2p_helper/src/libp2p_helper/message_id_test.go index 8677e352718..215bf0a1bd6 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/message_id_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/message_id_test.go @@ -48,11 +48,18 @@ func testPubsubMsgIdFun(t *testing.T, topic string) { // Subscribe to the topic testSubscribeDo(t, alice, topic, 21, 58) + // Timeouts between subscriptions are needed because otherwise each process would try to discover peers + // and will only find that no other peers are connected to the same topic. + // That said, pubsub's implementation is imperfect + time.Sleep(time.Second) testSubscribeDo(t, bob, topic, 21, 58) + time.Sleep(time.Second) testSubscribeDo(t, carol, topic, 21, 58) + time.Sleep(time.Second) _ = testOpenStreamDo(t, bob, alice.P2p.Host, appAPort, 9900, string(newProtocol)) _ = testOpenStreamDo(t, carol, alice.P2p.Host, appAPort, 9900, string(newProtocol)) + <-trapA.IncomingStream <-trapA.IncomingStream @@ -60,8 +67,7 @@ func testPubsubMsgIdFun(t *testing.T, topic string) { testPublishDo(t, alice, topic, msg, 21) testPublishDo(t, bob, topic, msg, 21) - time.Sleep(time.Millisecond * 100) - + time.Sleep(time.Second) n := 0 loop: for { diff --git a/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go b/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go index 9de883a8a67..8e4cd233a5d 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "math/rand" "testing" "github.com/stretchr/testify/require" @@ -58,7 +59,10 @@ func testOpenStreamDo(t *testing.T, appA *app, appBHost host.Host, appBPort uint require.NoError(t, pid.SetId(appBHost.ID().String())) require.NoError(t, err) - resMsg, _ := OpenStreamReq(m).handle(appA, rpcSeqno) + resMsg, afterWriteHandler := OpenStreamReq(m).handle(appA, rpcSeqno) + if afterWriteHandler != nil { + afterWriteHandler() + } seqno, respSuccess := checkRpcResponseSuccess(t, resMsg, "openStream") require.Equal(t, seqno, rpcSeqno) require.True(t, respSuccess.HasOpenStream()) @@ -151,7 +155,10 @@ func TestRemoveStreamHandler(t *testing.T) { require.NoError(t, err) var osRpcSeqno uint64 = 1026 - osResMsg, _ := OpenStreamReq(os).handle(appA, osRpcSeqno) + osResMsg, afterWriteHandler := OpenStreamReq(os).handle(appA, osRpcSeqno) + if afterWriteHandler != nil { + afterWriteHandler() + } osRpcSeqno_, errMsg := checkRpcResponseError(t, osResMsg) require.Equal(t, osRpcSeqno, osRpcSeqno_) require.Equal(t, "libp2p error: protocols not supported: [/mina/99]", errMsg) @@ -182,6 +189,22 @@ func TestResetStream(t *testing.T) { testResetStreamDo(t, appA, streamId, 114558) } +func testSendStreamFailDo(t *testing.T, app *app, streamId uint64, msgBytes []byte, rpcSeqno uint64) { + _, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) + require.NoError(t, err) + m, err := ipc.NewRootLibp2pHelperInterface_SendStream_Request(seg) + require.NoError(t, err) + msg, err := m.NewMsg() + require.NoError(t, err) + sid, err := msg.NewStreamId() + require.NoError(t, err) + sid.SetId(streamId) + require.NoError(t, msg.SetData(msgBytes)) + + resMsg, _ := SendStreamReq(m).handle(app, rpcSeqno) + checkRpcResponseError(t, resMsg) +} + func testSendStreamDo(t *testing.T, app *app, streamId uint64, msgBytes []byte, rpcSeqno uint64) { _, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) require.NoError(t, err) @@ -221,7 +244,7 @@ func TestOpenStreamBeforeAndAfterSetGatingConfig(t *testing.T) { aUpcallErrChan := make(chan error) launchFeedUpcallTrap(appA.P2p.Logger, appA.OutChan, aTrap, aUpcallErrChan, ctx) - appB, appBPort := newTestApp(t, appAInfos, false) + appB, appBPort := newTestApp(t, nil, false) err = appB.P2p.Host.Connect(appB.Ctx, appAInfos[0]) require.NoError(t, err) bTrap := newUpcallTrap("appB", 64, upcallDropAllMask^(1<