Skip to content

Commit

Permalink
Fix libp2p unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
georgeee committed Nov 17, 2023
1 parent 7abacae commit 8548686
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 19 deletions.
10 changes: 5 additions & 5 deletions src/app/libp2p_helper/src/libp2p_helper/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (app *app) SetConnectionHandlers() {
app.setConnectionHandlersOnce.Do(func() {
app.P2p.ConnectionManager.AddOnConnectHandler(func(net net.Network, c net.Conn) {
app.updateConnectionMetrics()
app.writeMsg(mkPeerConnectedUpcall(peer.Encode(c.RemotePeer())))
app.writeMsg(mkPeerConnectedUpcall(c.RemotePeer().String()))
})
app.P2p.ConnectionManager.AddOnDisconnectHandler(func(net net.Network, c net.Conn) {
app.updateConnectionMetrics()
app.writeMsg(mkPeerDisconnectedUpcall(peer.Encode(c.RemotePeer())))
app.writeMsg(mkPeerDisconnectedUpcall(c.RemotePeer().String()))
})
})
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (app *app) WriteStream(streamId uint64, data []byte) error {
// another goroutine
close_err := stream.stream.Close()
if close_err != nil {
app.P2p.Logger.Errorf("failed to close stream %d after encountering write failure (%s): %s", streamId, err.Error(), close_err.Error())
app.P2p.Logger.Debugf("failed to close stream %d after encountering write failure (%s): %s", streamId, err.Error(), close_err.Error())
}
}
return wrapError(badp2p(err), fmt.Sprintf("only wrote %d out of %d bytes", n, len(data)))
Expand Down Expand Up @@ -308,13 +308,13 @@ func (app *app) checkPeerCount() {

err = prometheus.Register(peerCount)
if err != nil {
app.P2p.Logger.Debugf("couldn't register peer_count; perhaps we've already done so", err.Error())
app.P2p.Logger.Debugf("couldn't register peer_count; perhaps we've already done so: %s", err)
return
}

err = prometheus.Register(connectedPeerCount)
if err != nil {
app.P2p.Logger.Debugf("couldn't register connected_peer_count; perhaps we've already done so", err.Error())
app.P2p.Logger.Debugf("couldn't register connected_peer_count; perhaps we've already done so: %s", err)
return
}

Expand Down
18 changes: 12 additions & 6 deletions src/app/libp2p_helper/src/libp2p_helper/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"sync"
Expand All @@ -23,6 +22,8 @@ import (

net "github.com/libp2p/go-libp2p/core/network"

gonet "net"

ipc "libp2p_ipc"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -54,7 +55,7 @@ const (
)

func TestMplex_SendLargeMessage(t *testing.T) {
// assert we are able to send and receive a message with size up to 1 << 30 bytes
// assert we are able to send and receive a message with size up to 1 MiB
appA, _ := newTestApp(t, nil, true)
appA.NoDHT = true

Expand All @@ -68,7 +69,7 @@ func TestMplex_SendLargeMessage(t *testing.T) {
err = appB.P2p.Host.Connect(appB.Ctx, appAInfos[0])
require.NoError(t, err)

msgSize := uint64(1 << 30)
msgSize := uint64(1 << 20)

withTimeoutAsync(t, func(done chan interface{}) {
// create handler that reads `msgSize` bytes
Expand Down Expand Up @@ -274,9 +275,14 @@ func TestLibp2pMetrics(t *testing.T) {

appB.P2p.Host.SetStreamHandler(testProtocol, handler)

listener, err := gonet.Listen("tcp", ":0")
if err != nil {
panic(err)
}
port := listener.Addr().(*gonet.TCPAddr).Port
server := http.NewServeMux()
server.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":9001", server)
go http.Serve(listener, server)

go appB.checkPeerCount()
go appB.checkMessageStats()
Expand All @@ -292,11 +298,11 @@ func TestLibp2pMetrics(t *testing.T) {
expectedPeerCount := len(appB.P2p.Host.Network().Peers())
expectedCurrentConnCount := appB.P2p.ConnectionManager.GetInfo().ConnCount

resp, err := http.Get("http://localhost:9001/metrics")
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
require.NoError(t, err)
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

respBody := string(body)
Expand Down
10 changes: 8 additions & 2 deletions src/app/libp2p_helper/src/libp2p_helper/message_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,26 @@ func testPubsubMsgIdFun(t *testing.T, topic string) {

// Subscribe to the topic
testSubscribeDo(t, alice, topic, 21, 58)
// Timeouts between subscriptions are needed because otherwise each process would try to discover peers
// and will only find that no other peers are connected to the same topic.
// That said, pubsub's implementation is imperfect
time.Sleep(time.Second)
testSubscribeDo(t, bob, topic, 21, 58)
time.Sleep(time.Second)
testSubscribeDo(t, carol, topic, 21, 58)
time.Sleep(time.Second)

_ = testOpenStreamDo(t, bob, alice.P2p.Host, appAPort, 9900, string(newProtocol))
_ = testOpenStreamDo(t, carol, alice.P2p.Host, appAPort, 9900, string(newProtocol))

<-trapA.IncomingStream
<-trapA.IncomingStream

msg := []byte("hello world")
testPublishDo(t, alice, topic, msg, 21)
testPublishDo(t, bob, topic, msg, 21)

time.Sleep(time.Millisecond * 100)

time.Sleep(time.Second)
n := 0
loop:
for {
Expand Down
54 changes: 48 additions & 6 deletions src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -58,7 +59,10 @@ func testOpenStreamDo(t *testing.T, appA *app, appBHost host.Host, appBPort uint
require.NoError(t, pid.SetId(appBHost.ID().String()))
require.NoError(t, err)

resMsg, _ := OpenStreamReq(m).handle(appA, rpcSeqno)
resMsg, afterWriteHandler := OpenStreamReq(m).handle(appA, rpcSeqno)
if afterWriteHandler != nil {
afterWriteHandler()
}
seqno, respSuccess := checkRpcResponseSuccess(t, resMsg, "openStream")
require.Equal(t, seqno, rpcSeqno)
require.True(t, respSuccess.HasOpenStream())
Expand Down Expand Up @@ -151,7 +155,10 @@ func TestRemoveStreamHandler(t *testing.T) {
require.NoError(t, err)

var osRpcSeqno uint64 = 1026
osResMsg, _ := OpenStreamReq(os).handle(appA, osRpcSeqno)
osResMsg, afterWriteHandler := OpenStreamReq(os).handle(appA, osRpcSeqno)
if afterWriteHandler != nil {
afterWriteHandler()
}
osRpcSeqno_, errMsg := checkRpcResponseError(t, osResMsg)
require.Equal(t, osRpcSeqno, osRpcSeqno_)
require.Equal(t, "libp2p error: protocols not supported: [/mina/99]", errMsg)
Expand Down Expand Up @@ -182,6 +189,22 @@ func TestResetStream(t *testing.T) {
testResetStreamDo(t, appA, streamId, 114558)
}

func testSendStreamFailDo(t *testing.T, app *app, streamId uint64, msgBytes []byte, rpcSeqno uint64) {
_, seg, err := capnp.NewMessage(capnp.SingleSegment(nil))
require.NoError(t, err)
m, err := ipc.NewRootLibp2pHelperInterface_SendStream_Request(seg)
require.NoError(t, err)
msg, err := m.NewMsg()
require.NoError(t, err)
sid, err := msg.NewStreamId()
require.NoError(t, err)
sid.SetId(streamId)
require.NoError(t, msg.SetData(msgBytes))

resMsg, _ := SendStreamReq(m).handle(app, rpcSeqno)
checkRpcResponseError(t, resMsg)
}

func testSendStreamDo(t *testing.T, app *app, streamId uint64, msgBytes []byte, rpcSeqno uint64) {
_, seg, err := capnp.NewMessage(capnp.SingleSegment(nil))
require.NoError(t, err)
Expand Down Expand Up @@ -221,7 +244,7 @@ func TestOpenStreamBeforeAndAfterSetGatingConfig(t *testing.T) {
aUpcallErrChan := make(chan error)
launchFeedUpcallTrap(appA.P2p.Logger, appA.OutChan, aTrap, aUpcallErrChan, ctx)

appB, appBPort := newTestApp(t, appAInfos, false)
appB, appBPort := newTestApp(t, nil, false)
err = appB.P2p.Host.Connect(appB.Ctx, appAInfos[0])
require.NoError(t, err)
bTrap := newUpcallTrap("appB", 64, upcallDropAllMask^(1<<StreamMessageReceivedChan))
Expand Down Expand Up @@ -291,8 +314,27 @@ func TestOpenStreamBeforeAndAfterSetGatingConfig(t *testing.T) {
require.NoError(t, pid.SetId(appB.P2p.Host.ID().String()))
require.NoError(t, err)

resMsg, _ := OpenStreamReq(m).handle(appA, 9905)
seqno, _ := checkRpcResponseError(t, resMsg)
require.Equal(t, uint64(9905), seqno)
resMsg, afterWriteHandler := OpenStreamReq(m).handle(appA, 9905)
if afterWriteHandler != nil {
afterWriteHandler()
}
msg, err := ipc.ReadRootDaemonInterface_Message(resMsg)
require.NoError(t, err)
require.True(t, msg.HasRpcResponse())
resp, err := msg.RpcResponse()
require.NoError(t, err)
if !resp.HasError() {
_, succ := checkRpcResponseSuccess(t, resMsg, "openStream")
require.True(t, succ.HasOpenStream())
resp, err := succ.OpenStream()
require.NoError(t, err)
sid, err := resp.StreamId()
require.NoError(t, err)
streamId := sid.Id()
msg := make([]byte, 1000000)
_, err = rand.Read(msg)
require.NoError(t, err)
testSendStreamFailDo(t, appA, streamId, msg, 1983)
}
}
}

0 comments on commit 8548686

Please sign in to comment.