From 1f5b81fb614a6095baa1193f8e4140306911f02c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 11 Jul 2024 10:32:18 +0000 Subject: [PATCH 01/18] test: use the regular libp2p host (#565) This removes dependencies on swarm/testing and the blank host. 1. swarm/testing really shouldn't be used at all except for internal libp2p stuff. 2. The blank host should only be used in _very_ special cases (autonat, mostly). --- blacklist_test.go | 6 +-- discovery_test.go | 4 +- floodsub_test.go | 85 ++++++++++++++++------------------- gossipsub_connmgr_test.go | 17 ++++--- gossipsub_feat_test.go | 2 +- gossipsub_matchfn_test.go | 2 +- gossipsub_spam_test.go | 12 ++--- gossipsub_test.go | 88 ++++++++++++++----------------------- pubsub_test.go | 21 ++++++++- randomsub_test.go | 8 ++-- subscription_filter_test.go | 2 +- topic_test.go | 32 +++++++------- trace_test.go | 12 ++--- validation_builtin_test.go | 4 +- validation_test.go | 12 ++--- 15 files changed, 151 insertions(+), 156 deletions(-) diff --git a/blacklist_test.go b/blacklist_test.go index 045a9c85..a19c46e4 100644 --- a/blacklist_test.go +++ b/blacklist_test.go @@ -38,7 +38,7 @@ func TestBlacklist(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -66,7 +66,7 @@ func TestBlacklist2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -99,7 +99,7 @@ func TestBlacklist3(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) psubs[1].BlacklistPeer(hosts[0].ID()) diff --git a/discovery_test.go b/discovery_test.go index 66c9c80e..f539e69d 100644 --- a/discovery_test.go +++ b/discovery_test.go @@ -134,7 +134,7 @@ func TestSimpleDiscovery(t *testing.T) { server := newDiscoveryServer() discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)} - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) psubs := make([]*PubSub, numHosts) topicHandlers := make([]*Topic, numHosts) @@ -234,7 +234,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) { discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)} // Put the pubsub clients into two partitions - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) psubs := make([]*PubSub, numHosts) topicHandlers := make([]*Topic, numHosts) diff --git a/floodsub_test.go b/floodsub_test.go index 35dd0d53..0168b15f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -20,9 +20,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - "github.com/libp2p/go-msgio/protoio" ) @@ -42,19 +39,6 @@ func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Sub } } -func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host { - var out []host.Host - - for i := 0; i < n; i++ { - netw := swarmt.GenSwarm(t) - h := bhost.NewBlankHost(netw) - t.Cleanup(func() { h.Close() }) - out = append(out, h) - } - - return out -} - func connect(t *testing.T, a, b host.Host) { pinfo := a.Peerstore().PeerInfo(a.ID()) err := b.Connect(context.Background(), pinfo) @@ -151,7 +135,7 @@ func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration) func TestBasicFloodsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getPubsubs(ctx, hosts) @@ -193,7 +177,7 @@ func TestMultihops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 6) + hosts := getDefaultHosts(t, 6) psubs := getPubsubs(ctx, hosts) @@ -235,7 +219,7 @@ func TestReconnects(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getPubsubs(ctx, hosts) @@ -309,7 +293,7 @@ func TestNoConnection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getPubsubs(ctx, hosts) @@ -334,7 +318,7 @@ func TestSelfReceive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - host := getNetHosts(t, ctx, 1)[0] + host := getDefaultHosts(t, 1)[0] psub, err := NewFloodSub(ctx, host) if err != nil { @@ -368,7 +352,7 @@ func TestOneToOne(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -401,7 +385,7 @@ func TestTreeTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -464,7 +448,7 @@ func TestFloodSubPluggableProtocol(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubA := mustCreatePubSub(ctx, t, hosts[0], "/esh/floodsub", "/lsr/floodsub") psubB := mustCreatePubSub(ctx, t, hosts[1], "/esh/floodsub") @@ -496,7 +480,7 @@ func TestFloodSubPluggableProtocol(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubA := mustCreatePubSub(ctx, t, hosts[0], "/esh/floodsub") psubB := mustCreatePubSub(ctx, t, hosts[1], "/lsr/floodsub") @@ -551,7 +535,7 @@ func TestSubReporting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - host := getNetHosts(t, ctx, 1)[0] + host := getDefaultHosts(t, 1)[0] psub, err := NewFloodSub(ctx, host) if err != nil { t.Fatal(err) @@ -593,7 +577,7 @@ func TestPeerTopicReporting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 4) + hosts := getDefaultHosts(t, 4) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -650,7 +634,7 @@ func TestSubscribeMultipleTimes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -695,7 +679,7 @@ func TestPeerDisconnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -743,7 +727,7 @@ func TestWithNoSigning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts, WithNoAuthor(), WithMessageIdFn(func(pmsg *pb.Message) string { // silly content-based test message-ID: just use the data as whole return base64.URLEncoding.EncodeToString(pmsg.Data) @@ -788,7 +772,7 @@ func TestWithSigning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts, WithStrictSignatureVerification(true)) connect(t, hosts[0], hosts[1]) @@ -830,7 +814,7 @@ func TestImproperlySignedMessageRejected(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) adversary := hosts[0] honestPeer := hosts[1] @@ -948,7 +932,7 @@ func TestMessageSender(t *testing.T) { const topic = "foobar" - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getPubsubs(ctx, hosts) var msgs []*Subscription @@ -1002,7 +986,7 @@ func TestConfigurableMaxMessageSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) // use a 4mb limit; default is 1mb; we'll test with a 2mb payload. psubs := getPubsubs(ctx, hosts, WithMaxMessageSize(1<<22)) @@ -1045,7 +1029,7 @@ func TestAnnounceRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) ps := getPubsub(ctx, hosts[0]) watcher := &announceWatcher{} hosts[1].SetStreamHandler(FloodSubID, watcher.handleStream) @@ -1117,7 +1101,7 @@ func TestPubsubWithAssortedOptions(t *testing.T) { return string(hash[:]) } - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts, WithMessageIdFn(hashMsgID), WithPeerOutboundQueueSize(10), @@ -1152,8 +1136,7 @@ func TestWithInvalidMessageAuthor(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h.Close() + h := getDefaultHosts(t, 1)[0] _, err := NewFloodSub(ctx, h, WithMessageAuthor("bogotr0n")) if err == nil { t.Fatal("expected error") @@ -1168,10 +1151,9 @@ func TestPreconnectedNodes(t *testing.T) { defer cancel() // Create hosts - h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - defer h2.Close() + hosts := getDefaultHosts(t, 2) + h1 := hosts[0] + h2 := hosts[1] opts := []Option{WithDiscovery(&dummyDiscovery{})} // Setup first PubSub @@ -1229,10 +1211,9 @@ func TestDedupInboundStreams(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - defer h2.Close() + hosts := getDefaultHosts(t, 2) + h1 := hosts[0] + h2 := hosts[1] _, err := NewFloodSub(ctx, h1) if err != nil { @@ -1247,18 +1228,30 @@ func TestDedupInboundStreams(t *testing.T) { if err != nil { t.Fatal(err) } + _, err = s1.Read(nil) // force protocol negotiation to complete + if err != nil { + t.Fatal(err) + } time.Sleep(100 * time.Millisecond) s2, err := h2.NewStream(ctx, h1.ID(), FloodSubID) if err != nil { t.Fatal(err) } + _, err = s2.Read(nil) // force protocol negotiation to complete + if err != nil { + t.Fatal(err) + } time.Sleep(100 * time.Millisecond) s3, err := h2.NewStream(ctx, h1.ID(), FloodSubID) if err != nil { t.Fatal(err) } + _, err = s3.Read(nil) // force protocol negotiation to complete + if err != nil { + t.Fatal(err) + } time.Sleep(100 * time.Millisecond) // check that s1 and s2 have been reset diff --git a/gossipsub_connmgr_test.go b/gossipsub_connmgr_test.go index 0a97312c..accf57dd 100644 --- a/gossipsub_connmgr_test.go +++ b/gossipsub_connmgr_test.go @@ -7,10 +7,10 @@ import ( "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/host" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/net/connmgr" ) @@ -70,9 +70,14 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { t.Fatal(err) } - netw := swarmt.GenSwarm(t) - defer netw.Close() - h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i])) + h, err := libp2p.New( + libp2p.ResourceManager(&network.NullResourceManager{}), + libp2p.ConnectionManager(connmgrs[i]), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { h.Close() }) honestHosts[i] = h honestPeers[h.ID()] = struct{}{} } @@ -83,7 +88,7 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { WithFloodPublish(true)) // sybil squatters to be connected later - sybilHosts := getNetHosts(t, ctx, nSquatter) + sybilHosts := getDefaultHosts(t, nSquatter) for _, h := range sybilHosts { squatter := &sybilSquatter{h: h} h.SetStreamHandler(GossipSubID_v10, squatter.handleStream) diff --git a/gossipsub_feat_test.go b/gossipsub_feat_test.go index 712f16df..93cfb4c3 100644 --- a/gossipsub_feat_test.go +++ b/gossipsub_feat_test.go @@ -42,7 +42,7 @@ func TestGossipSubCustomProtocols(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) gsubs := getGossipsubs(ctx, hosts[:2], WithGossipSubProtocols(protos, features)) fsub := getPubsub(ctx, hosts[2]) diff --git a/gossipsub_matchfn_test.go b/gossipsub_matchfn_test.go index 279f0d34..4d688d25 100644 --- a/gossipsub_matchfn_test.go +++ b/gossipsub_matchfn_test.go @@ -17,7 +17,7 @@ func TestGossipSubMatchingFn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 4) + h := getDefaultHosts(t, 4) psubs := []*PubSub{ getGossipsub(ctx, h[0], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA100, GossipSubID_v11}, GossipSubDefaultFeatures)), getGossipsub(ctx, h[1], WithProtocolMatchFn(protocolNameMatch), WithGossipSubProtocols([]protocol.ID{customsubA101Beta}, GossipSubDefaultFeatures)), diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index f31daaab..3ccb1ab4 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -25,7 +25,7 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -142,7 +142,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -195,6 +195,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, }) + sub := sub go func() { defer cancel() @@ -292,7 +293,7 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -376,7 +377,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] @@ -430,6 +431,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { Control: &pb.ControlMessage{Graft: graft}, }) + sub := sub go func() { defer cancel() @@ -617,7 +619,7 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) { defer cancel() // Create legitimate and attacker hosts - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) legit := hosts[0] attacker := hosts[1] diff --git a/gossipsub_test.go b/gossipsub_test.go index 5933f4b5..8c9419d8 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -13,16 +13,12 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/record" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - "github.com/libp2p/go-msgio/protoio" ) @@ -45,7 +41,7 @@ func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSu func TestSparseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -86,7 +82,7 @@ func TestSparseGossipsub(t *testing.T) { func TestDenseGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -127,7 +123,7 @@ func TestDenseGossipsub(t *testing.T) { func TestGossipsubFanout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -196,7 +192,7 @@ func TestGossipsubFanout(t *testing.T) { func TestGossipsubFanoutMaintenance(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -281,7 +277,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) @@ -340,7 +336,7 @@ func TestGossipsubFanoutExpiry(t *testing.T) { func TestGossipsubGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -388,7 +384,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) { t.Skip("test no longer relevant; gossip propagation has become eager") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -457,7 +453,7 @@ func TestGossipsubGossipPropagation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) hosts1 := hosts[:GossipSubD+1] @@ -537,7 +533,7 @@ func TestGossipsubGossipPropagation(t *testing.T) { func TestGossipsubPrune(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -586,7 +582,7 @@ func TestGossipsubPrune(t *testing.T) { func TestGossipsubPruneBackoffTime(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) // App specific score that we'll change later. currentScoreForHost0 := int32(0) @@ -684,7 +680,7 @@ func TestGossipsubPruneBackoffTime(t *testing.T) { func TestGossipsubGraft(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -729,7 +725,7 @@ func TestGossipsubGraft(t *testing.T) { func TestGossipsubRemovePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) @@ -779,7 +775,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) denseConnect(t, hosts) @@ -829,7 +825,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) denseConnect(t, hosts) @@ -910,7 +906,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { func TestMixedGossipsub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 30) + hosts := getDefaultHosts(t, 30) gsubs := getGossipsubs(ctx, hosts[:20]) fsubs := getPubsubs(ctx, hosts[20:]) @@ -954,7 +950,7 @@ func TestGossipsubMultihops(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 6) + hosts := getDefaultHosts(t, 6) psubs := getGossipsubs(ctx, hosts) @@ -997,7 +993,7 @@ func TestGossipsubTreeTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getGossipsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -1061,7 +1057,7 @@ func TestGossipsubStarTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true), WithFloodPublish(true)) // configure the center of the star with a very low D @@ -1223,7 +1219,7 @@ func TestGossipsubDirectPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0], WithDirectConnectTicks(2)), getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}}), WithDirectConnectTicks(2)), @@ -1287,7 +1283,7 @@ func TestGossipSubPeerFilter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0], WithPeerFilter(func(pid peer.ID, topic string) bool { return pid == h[1].ID() @@ -1329,7 +1325,7 @@ func TestGossipsubDirectPeersFanout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0]), getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})), @@ -1416,7 +1412,7 @@ func TestGossipsubFloodPublish(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithFloodPublish(true)) // build the star @@ -1451,7 +1447,7 @@ func TestGossipsubEnoughPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts) for _, ps := range psubs { @@ -1500,7 +1496,7 @@ func TestGossipsubCustomParams(t *testing.T) { wantedMaxPendingConns := 23 params.MaxPendingConnections = wantedMaxPendingConns - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params)) @@ -1529,7 +1525,7 @@ func TestGossipsubNegativeScore(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithPeerScore( &PeerScoreParams{ @@ -1613,7 +1609,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getGossipsubs(ctx, hosts, WithPeerScore( &PeerScoreParams{ @@ -1701,8 +1697,7 @@ func TestGossipsubPiggybackControl(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h.Close() + h := getDefaultHosts(t, 1)[0] ps := getGossipsub(ctx, h) blah := peer.ID("bogotr0n") @@ -1750,7 +1745,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getGossipsubs(ctx, hosts) sparseConnect(t, hosts) @@ -1818,7 +1813,7 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 50) + hosts := getDefaultHosts(t, 50) // pubsubs for the first 10 hosts psubs := getGossipsubs(ctx, hosts[:10], WithFloodPublish(true), @@ -1919,7 +1914,7 @@ func TestGossipSubLeaveTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 2) + h := getDefaultHosts(t, 2) psubs := []*PubSub{ getGossipsub(ctx, h[0]), getGossipsub(ctx, h[1]), @@ -1990,7 +1985,7 @@ func TestGossipSubJoinTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := getNetHosts(t, ctx, 3) + h := getDefaultHosts(t, 3) psubs := []*PubSub{ getGossipsub(ctx, h[0]), getGossipsub(ctx, h[1]), @@ -2072,7 +2067,7 @@ func TestGossipsubPeerScoreInspect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) inspector := &mockPeerScoreInspector{} psub1 := getGossipsub(ctx, hosts[0], @@ -2132,7 +2127,7 @@ func TestGossipsubPeerScoreResetTopicParams(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) ps := getGossipsub(ctx, hosts[0], WithPeerScore( @@ -2199,7 +2194,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) ps := getGossipsub(ctx, hosts[0]) // make a fake peer that requests everything through IWANT gossip @@ -2553,21 +2548,6 @@ func FuzzAppendOrMergeRPC(f *testing.F) { }) } -func getDefaultHosts(t *testing.T, n int) []host.Host { - var out []host.Host - - for i := 0; i < n; i++ { - h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{})) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { h.Close() }) - out = append(out, h) - } - - return out -} - func TestGossipsubManagesAnAddressBook(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pubsub_test.go b/pubsub_test.go index 4a033159..245a69df 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -4,13 +4,32 @@ import ( "context" "testing" "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" ) +func getDefaultHosts(t *testing.T, n int) []host.Host { + var out []host.Host + + for i := 0; i < n; i++ { + h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{})) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { h.Close() }) + out = append(out, h) + } + + return out +} + // See https://github.com/libp2p/go-libp2p-pubsub/issues/426 func TestPubSubRemovesBlacklistedPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) bl := NewMapBlacklist() diff --git a/randomsub_test.go b/randomsub_test.go index 8eb640ea..5c817b7c 100644 --- a/randomsub_test.go +++ b/randomsub_test.go @@ -40,7 +40,7 @@ func TestRandomsubSmall(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getRandomsubs(ctx, hosts, 10) connectAll(t, hosts) @@ -77,7 +77,7 @@ func TestRandomsubBig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 50) + hosts := getDefaultHosts(t, 50) psubs := getRandomsubs(ctx, hosts, 50) connectSome(t, hosts, 12) @@ -114,7 +114,7 @@ func TestRandomsubMixed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 40) + hosts := getDefaultHosts(t, 40) fsubs := getPubsubs(ctx, hosts[:10]) rsubs := getRandomsubs(ctx, hosts[10:], 30) psubs := append(fsubs, rsubs...) @@ -153,7 +153,7 @@ func TestRandomsubEnoughPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 40) + hosts := getDefaultHosts(t, 40) fsubs := getPubsubs(ctx, hosts[:10]) rsubs := getRandomsubs(ctx, hosts[10:], 30) psubs := append(fsubs, rsubs...) diff --git a/subscription_filter_test.go b/subscription_filter_test.go index 8a4fe4db..7ee54a86 100644 --- a/subscription_filter_test.go +++ b/subscription_filter_test.go @@ -150,7 +150,7 @@ func TestSubscriptionFilterRPC(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) ps1 := getPubsub(ctx, hosts[0], WithSubscriptionFilter(NewAllowlistSubscriptionFilter("test1", "test2"))) ps2 := getPubsub(ctx, hosts[1], WithSubscriptionFilter(NewAllowlistSubscriptionFilter("test2", "test3"))) diff --git a/topic_test.go b/topic_test.go index 9ad3146d..a27113b2 100644 --- a/topic_test.go +++ b/topic_test.go @@ -99,7 +99,7 @@ func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic const numHosts = 1 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) ps := getPubsub(ctx, hosts[0]) // Try create and cancel topic @@ -139,7 +139,7 @@ func TestTopicReuse(t *testing.T) { const numHosts = 2 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) sender := getPubsub(ctx, hosts[0], WithDiscovery(&dummyDiscovery{})) receiver := getPubsub(ctx, hosts[1]) @@ -233,7 +233,7 @@ func TestTopicEventHandlerCancel(t *testing.T) { const numHosts = 5 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) ps := getPubsub(ctx, hosts[0]) // Try create and cancel topic @@ -265,7 +265,7 @@ func TestSubscriptionJoinNotification(t *testing.T) { const numLateSubscribers = 10 const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), "foobar") evts := getTopicEvts(topics) @@ -331,7 +331,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { defer cancel() const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) psubs := getPubsubs(ctx, hosts) topics := getTopics(psubs, "foobar") evts := getTopicEvts(topics) @@ -416,7 +416,7 @@ func TestSubscriptionManyNotifications(t *testing.T) { const topic = "foobar" const numHosts = 33 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) evts := getTopicEvts(topics) @@ -521,7 +521,7 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) { const topic = "foobar" const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) for i := 1; i < numHosts; i++ { @@ -539,7 +539,7 @@ func TestTopicRelay(t *testing.T) { const topic = "foobar" const numHosts = 5 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) // [0.Rel] - [1.Rel] - [2.Sub] @@ -603,7 +603,7 @@ func TestTopicRelayReuse(t *testing.T) { const topic = "foobar" const numHosts = 1 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) pubsubs := getPubsubs(ctx, hosts) topics := getTopics(pubsubs, topic) @@ -670,7 +670,7 @@ func TestTopicRelayOnClosedTopic(t *testing.T) { const topic = "foobar" const numHosts = 1 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) err := topics[0].Close() @@ -690,7 +690,7 @@ func TestProducePanic(t *testing.T) { const numHosts = 5 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) ps := getPubsub(ctx, hosts[0]) // Create topic @@ -792,7 +792,7 @@ func TestMinTopicSizeNoDiscovery(t *testing.T) { const numHosts = 3 topicID := "foobar" - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) sender := getPubsub(ctx, hosts[0]) receiver1 := getPubsub(ctx, hosts[1]) @@ -872,7 +872,7 @@ func TestWithTopicMsgIdFunction(t *testing.T) { const topicA, topicB = "foobarA", "foobarB" const numHosts = 2 - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) pubsubs := getPubsubs(ctx, hosts, WithMessageIdFn(func(pmsg *pb.Message) string { hash := sha256.Sum256(pmsg.Data) return string(hash[:]) @@ -932,7 +932,7 @@ func TestTopicPublishWithKeyInvalidParameters(t *testing.T) { const numHosts = 5 virtualPeer := tnet.RandPeerNetParamsOrFatal(t) - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) t.Run("nil sign private key should error", func(t *testing.T) { @@ -959,7 +959,7 @@ func TestTopicRelayPublishWithKey(t *testing.T) { const numHosts = 5 virtualPeer := tnet.RandPeerNetParamsOrFatal(t) - hosts := getNetHosts(t, ctx, numHosts) + hosts := getDefaultHosts(t, numHosts) topics := getTopics(getPubsubs(ctx, hosts), topic) // [0.Rel] - [1.Rel] - [2.Sub] @@ -1026,7 +1026,7 @@ func TestWithLocalPublication(t *testing.T) { const topic = "test" - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) pubsubs := getPubsubs(ctx, hosts) topics := getTopics(pubsubs, topic) connectAll(t, hosts) diff --git a/trace_test.go b/trace_test.go index fb8cb56d..7717a7e2 100644 --- a/trace_test.go +++ b/trace_test.go @@ -17,9 +17,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - bhost "github.com/libp2p/go-libp2p/p2p/host/blank" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - "github.com/libp2p/go-msgio/protoio" ) @@ -27,7 +24,7 @@ func testWithTracer(t *testing.T, tracer EventTracer) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getGossipsubs(ctx, hosts, WithEventTracer(tracer), // to bootstrap from star topology @@ -302,10 +299,9 @@ func TestRemoteTracer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - defer h2.Close() + hosts := getDefaultHosts(t, 2) + h1 := hosts[0] + h2 := hosts[1] mrt := &mockRemoteTracer{} h1.SetStreamHandler(RemoteTracerProtoID, mrt.handleStream) diff --git a/validation_builtin_test.go b/validation_builtin_test.go index df406f26..267cc6be 100644 --- a/validation_builtin_test.go +++ b/validation_builtin_test.go @@ -38,7 +38,7 @@ func testBasicSeqnoValidator(t *testing.T, ttl time.Duration) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getPubsubsWithOptionC(ctx, hosts, func(i int) Option { return WithDefaultValidator(NewBasicSeqnoValidator(newMockPeerMetadataStore())) @@ -86,7 +86,7 @@ func TestBasicSeqnoValidatorReplay(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 20) + hosts := getDefaultHosts(t, 20) psubs := getPubsubsWithOptionC(ctx, hosts[:19], func(i int) Option { return WithDefaultValidator(NewBasicSeqnoValidator(newMockPeerMetadataStore())) diff --git a/validation_test.go b/validation_test.go index b56e7677..0a09f70b 100644 --- a/validation_test.go +++ b/validation_test.go @@ -15,7 +15,7 @@ func TestRegisterUnregisterValidator(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) psubs := getPubsubs(ctx, hosts) err := psubs[0].RegisterTopicValidator("foo", func(context.Context, peer.ID, *Message) bool { @@ -40,7 +40,7 @@ func TestRegisterValidatorEx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 3) + hosts := getDefaultHosts(t, 3) psubs := getPubsubs(ctx, hosts) err := psubs[0].RegisterTopicValidator("test", @@ -69,7 +69,7 @@ func TestValidate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -123,7 +123,7 @@ func TestValidate2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 1) + hosts := getDefaultHosts(t, 1) psubs := getPubsubs(ctx, hosts) topic := "foobar" @@ -201,7 +201,7 @@ func TestValidateOverload(t *testing.T) { for tci, tc := range tcs { t.Run(fmt.Sprintf("%d", tci), func(t *testing.T) { - hosts := getNetHosts(t, ctx, 2) + hosts := getDefaultHosts(t, 2) psubs := getPubsubs(ctx, hosts) connect(t, hosts[0], hosts[1]) @@ -273,7 +273,7 @@ func TestValidateAssortedOptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hosts := getNetHosts(t, ctx, 10) + hosts := getDefaultHosts(t, 10) psubs := getPubsubs(ctx, hosts, WithValidateQueueSize(10), WithValidateThrottle(10), From b23b3ee559c5989b9b98aa94f95ed53fcf9033d4 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 11 Jul 2024 11:46:35 +0000 Subject: [PATCH 02/18] Switch to the new peer notify mechanism (#564) 1. Only listen for peers added and identify events. 2. Remove the old "Limited" check. Peers only show up as "Connected" if they have non-limited connections. 3. Don't bother listening for new connections directly and/or connectivity changes. We'll get a new identify event per new connection regardless. fixes #546 --- notify.go | 75 --------------------------------- peer_notify.go | 112 +++++++++++++++++++++++++++++++++++++++++++++++++ pubsub.go | 6 +-- 3 files changed, 115 insertions(+), 78 deletions(-) delete mode 100644 notify.go create mode 100644 peer_notify.go diff --git a/notify.go b/notify.go deleted file mode 100644 index f560d398..00000000 --- a/notify.go +++ /dev/null @@ -1,75 +0,0 @@ -package pubsub - -import ( - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - ma "github.com/multiformats/go-multiaddr" -) - -var _ network.Notifiee = (*PubSubNotif)(nil) - -type PubSubNotif PubSub - -func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream) { -} - -func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream) { -} - -func (p *PubSubNotif) Connected(n network.Network, c network.Conn) { - // ignore transient connections - if c.Stat().Limited { - return - } - - go func() { - p.newPeersPrioLk.RLock() - p.newPeersMx.Lock() - p.newPeersPend[c.RemotePeer()] = struct{}{} - p.newPeersMx.Unlock() - p.newPeersPrioLk.RUnlock() - - select { - case p.newPeers <- struct{}{}: - default: - } - }() -} - -func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) { -} - -func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr) { -} - -func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr) { -} - -func (p *PubSubNotif) Initialize() { - isTransient := func(pid peer.ID) bool { - for _, c := range p.host.Network().ConnsToPeer(pid) { - if !c.Stat().Limited { - return false - } - } - - return true - } - - p.newPeersPrioLk.RLock() - p.newPeersMx.Lock() - for _, pid := range p.host.Network().Peers() { - if isTransient(pid) { - continue - } - - p.newPeersPend[pid] = struct{}{} - } - p.newPeersMx.Unlock() - p.newPeersPrioLk.RUnlock() - - select { - case p.newPeers <- struct{}{}: - default: - } -} diff --git a/peer_notify.go b/peer_notify.go new file mode 100644 index 00000000..44aceeef --- /dev/null +++ b/peer_notify.go @@ -0,0 +1,112 @@ +package pubsub + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +func (ps *PubSub) watchForNewPeers(ctx context.Context) { + // We don't bother subscribing to "connectivity" events because we always run identify after + // every new connection. + sub, err := ps.host.EventBus().Subscribe([]interface{}{ + &event.EvtPeerIdentificationCompleted{}, + &event.EvtPeerProtocolsUpdated{}, + }) + if err != nil { + log.Errorf("failed to subscribe to peer identification events: %v", err) + return + } + defer sub.Close() + + ps.newPeersPrioLk.RLock() + ps.newPeersMx.Lock() + for _, pid := range ps.host.Network().Peers() { + if ps.host.Network().Connectedness(pid) != network.Connected { + continue + } + ps.newPeersPend[pid] = struct{}{} + } + ps.newPeersMx.Unlock() + ps.newPeersPrioLk.RUnlock() + + select { + case ps.newPeers <- struct{}{}: + default: + } + + var supportsProtocol func(protocol.ID) bool + if ps.protoMatchFunc != nil { + var supportedProtocols []func(protocol.ID) bool + for _, proto := range ps.rt.Protocols() { + + supportedProtocols = append(supportedProtocols, ps.protoMatchFunc(proto)) + } + supportsProtocol = func(proto protocol.ID) bool { + for _, fn := range supportedProtocols { + if (fn)(proto) { + return true + } + } + return false + } + } else { + supportedProtocols := make(map[protocol.ID]struct{}) + for _, proto := range ps.rt.Protocols() { + supportedProtocols[proto] = struct{}{} + } + supportsProtocol = func(proto protocol.ID) bool { + _, ok := supportedProtocols[proto] + return ok + } + } + + for ctx.Err() == nil { + var ev any + select { + case <-ctx.Done(): + return + case ev = <-sub.Out(): + } + + var protos []protocol.ID + var peer peer.ID + switch ev := ev.(type) { + case event.EvtPeerIdentificationCompleted: + peer = ev.Peer + protos = ev.Protocols + case event.EvtPeerProtocolsUpdated: + peer = ev.Peer + protos = ev.Added + default: + continue + } + + // We don't bother checking connectivity (connected and non-"limited") here because + // we'll check when actually handling the new peer. + + for _, p := range protos { + if supportsProtocol(p) { + ps.notifyNewPeer(peer) + break + } + } + } + +} + +func (ps *PubSub) notifyNewPeer(peer peer.ID) { + ps.newPeersPrioLk.RLock() + ps.newPeersMx.Lock() + ps.newPeersPend[peer] = struct{}{} + ps.newPeersMx.Unlock() + ps.newPeersPrioLk.RUnlock() + + select { + case ps.newPeers <- struct{}{}: + default: + } +} diff --git a/pubsub.go b/pubsub.go index c4ecae65..24c297dd 100644 --- a/pubsub.go +++ b/pubsub.go @@ -327,14 +327,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option h.SetStreamHandler(id, ps.handleNewStream) } } - h.Network().Notify((*PubSubNotif)(ps)) + go ps.watchForNewPeers(ctx) ps.val.Start(ps) go ps.processLoop(ctx) - (*PubSubNotif)(ps).Initialize() - return ps, nil } @@ -687,6 +685,8 @@ func (p *PubSub) handlePendingPeers() { p.newPeersPrioLk.Unlock() for pid := range newPeers { + // Make sure we have a non-limited connection. We do this late because we may have + // disconnected in the meantime. if p.host.Network().Connectedness(pid) != network.Connected { continue } From 093f13ce165f007a2375cffb93205b1b21701d62 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 11 Jul 2024 12:50:34 +0000 Subject: [PATCH 03/18] test: test notify protocols updated (#567) Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> Co-authored-by: gfanton <8671905+gfanton@users.noreply.github.com> --- notify_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 notify_test.go diff --git a/notify_test.go b/notify_test.go new file mode 100644 index 00000000..fa5b755a --- /dev/null +++ b/notify_test.go @@ -0,0 +1,76 @@ +package pubsub + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/p2p/protocol/identify" +) + +func TestNotifyPeerProtocolsUpdated(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getDefaultHosts(t, 2) + + // Initialize id services. + { + ids1, err := identify.NewIDService(hosts[0]) + if err != nil { + t.Fatal(err) + } + ids1.Start() + defer ids1.Close() + + ids2, err := identify.NewIDService(hosts[1]) + if err != nil { + t.Fatal(err) + } + ids2.Start() + defer ids2.Close() + } + + psubs0 := getPubsub(ctx, hosts[0]) + connect(t, hosts[0], hosts[1]) + // Delay to make sure that peers are connected. + <-time.After(time.Millisecond * 100) + psubs1 := getPubsub(ctx, hosts[1]) + + // Pubsub 0 joins topic "test". + topic0, err := psubs0.Join("test") + if err != nil { + t.Fatal(err) + } + defer topic0.Close() + + sub0, err := topic0.Subscribe() + if err != nil { + t.Fatal(err) + } + defer sub0.Cancel() + + // Pubsub 1 joins topic "test". + topic1, err := psubs1.Join("test") + if err != nil { + t.Fatal(err) + } + defer topic1.Close() + + sub1, err := topic1.Subscribe() + if err != nil { + t.Fatal(err) + } + defer sub1.Cancel() + + // Delay before checking results (similar to most tests). + <-time.After(time.Millisecond * 100) + + if len(topic0.ListPeers()) == 0 { + t.Fatalf("topic0 should at least have 1 peer") + } + + if len(topic1.ListPeers()) == 0 { + t.Fatalf("topic1 should at least have 1 peer") + } +} From e508d8643ddb0b9557dd97a827380f267a18082e Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu <34831323+sstanculeanu@users.noreply.github.com> Date: Thu, 11 Jul 2024 18:25:52 +0300 Subject: [PATCH 04/18] added missing Close call on the AddrBook member of GossipSubRouter (#568) --- gossipsub.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index a36049f7..3121a210 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "fmt" + "io" "math/rand" "sort" "time" @@ -543,6 +544,13 @@ func (gs *GossipSubRouter) manageAddrBook() { for { select { case <-gs.p.ctx.Done(): + cabCloser, ok := gs.cab.(io.Closer) + if ok { + errClose := cabCloser.Close() + if errClose != nil { + log.Warnf("failed to close addr book: %v", errClose) + } + } return case ev := <-sub.Out(): switch ev := ev.(type) { From 88c73f4a89bbf7bb1ca04af386577d5ea589e7f8 Mon Sep 17 00:00:00 2001 From: web3-bot Date: Mon, 5 Aug 2024 17:25:23 +0000 Subject: [PATCH 05/18] chore: add or force update .github/workflows/go-test.yml --- .github/workflows/go-test.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .github/workflows/go-test.yml diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml new file mode 100644 index 00000000..505ece58 --- /dev/null +++ b/.github/workflows/go-test.yml @@ -0,0 +1,21 @@ +name: Go Test + +on: + pull_request: + push: + branches: ["master"] + workflow_dispatch: + merge_group: + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }} + cancel-in-progress: true + +jobs: + go-test: + uses: ipdxco/unified-github-workflows/.github/workflows/go-test.yml@v1.0 + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} From db70c1d6784547fbdf3ed5195088f1eafc29c586 Mon Sep 17 00:00:00 2001 From: web3-bot Date: Mon, 5 Aug 2024 17:25:23 +0000 Subject: [PATCH 06/18] chore: add or force update .github/workflows/go-check.yml --- .github/workflows/go-check.yml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 .github/workflows/go-check.yml diff --git a/.github/workflows/go-check.yml b/.github/workflows/go-check.yml new file mode 100644 index 00000000..724ef8e3 --- /dev/null +++ b/.github/workflows/go-check.yml @@ -0,0 +1,19 @@ +name: Go Checks + +on: + pull_request: + push: + branches: ["master"] + workflow_dispatch: + merge_group: + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event_name == 'push' && github.sha || github.ref }} + cancel-in-progress: true + +jobs: + go-check: + uses: ipdxco/unified-github-workflows/.github/workflows/go-check.yml@v1.0 From 7c54be0278015a172a520232d698ade3e05c8f57 Mon Sep 17 00:00:00 2001 From: web3-bot Date: Mon, 5 Aug 2024 17:25:23 +0000 Subject: [PATCH 07/18] chore: add or force update .github/workflows/releaser.yml --- .github/workflows/releaser.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .github/workflows/releaser.yml diff --git a/.github/workflows/releaser.yml b/.github/workflows/releaser.yml new file mode 100644 index 00000000..a2b2a044 --- /dev/null +++ b/.github/workflows/releaser.yml @@ -0,0 +1,21 @@ +name: Releaser + +on: + push: + paths: ["version.json"] + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: ${{ github.workflow }}-${{ github.sha }} + cancel-in-progress: true + +jobs: + releaser: + uses: ipdxco/unified-github-workflows/.github/workflows/releaser.yml@v1.0 + with: + sources: '["version.json"]' + secrets: + UCI_GITHUB_TOKEN: ${{ secrets.UCI_GITHUB_TOKEN }} From b32ed641c0da8bee16dc5956ddaae9ba98878242 Mon Sep 17 00:00:00 2001 From: web3-bot Date: Mon, 5 Aug 2024 17:25:23 +0000 Subject: [PATCH 08/18] chore: add or force update .github/workflows/release-check.yml --- .github/workflows/release-check.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .github/workflows/release-check.yml diff --git a/.github/workflows/release-check.yml b/.github/workflows/release-check.yml new file mode 100644 index 00000000..681b5ef1 --- /dev/null +++ b/.github/workflows/release-check.yml @@ -0,0 +1,21 @@ +name: Release Checker + +on: + pull_request_target: + paths: ["version.json"] + types: [ opened, synchronize, reopened, labeled, unlabeled ] + workflow_dispatch: + +permissions: + contents: write + pull-requests: write + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + release-check: + uses: ipdxco/unified-github-workflows/.github/workflows/release-check.yml@v1.0 + with: + sources: '["version.json"]' From ca1b3dabb768821a48d93b3db36be3f8ef1b7d25 Mon Sep 17 00:00:00 2001 From: web3-bot Date: Mon, 5 Aug 2024 17:25:23 +0000 Subject: [PATCH 09/18] chore: add or force update .github/workflows/tagpush.yml --- .github/workflows/tagpush.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 .github/workflows/tagpush.yml diff --git a/.github/workflows/tagpush.yml b/.github/workflows/tagpush.yml new file mode 100644 index 00000000..5ef3fb9e --- /dev/null +++ b/.github/workflows/tagpush.yml @@ -0,0 +1,18 @@ +name: Tag Push Checker + +on: + push: + tags: + - v* + +permissions: + contents: read + issues: write + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + releaser: + uses: ipdxco/unified-github-workflows/.github/workflows/tagpush.yml@v1.0 From 03952ea658cf46d66f3224e51fa4960c02092de2 Mon Sep 17 00:00:00 2001 From: web3-bot Date: Mon, 5 Aug 2024 17:25:23 +0000 Subject: [PATCH 10/18] chore: add or force update version.json --- version.json | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 version.json diff --git a/version.json b/version.json new file mode 100644 index 00000000..ea22ea59 --- /dev/null +++ b/version.json @@ -0,0 +1,3 @@ +{ + "version": "v0.11.0" +} From 435b99e317fabd8b495a3e8ef648253a546f0d21 Mon Sep 17 00:00:00 2001 From: galargh Date: Mon, 5 Aug 2024 19:38:07 +0200 Subject: [PATCH 11/18] chore: go fmt --- subscription_filter_test.go | 16 ++++++++-------- validation_builtin_test.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/subscription_filter_test.go b/subscription_filter_test.go index 7ee54a86..0057cdcf 100644 --- a/subscription_filter_test.go +++ b/subscription_filter_test.go @@ -19,15 +19,15 @@ func TestBasicSubscriptionFilter(t *testing.T) { topic3 := "test3" yes := true subs := []*pb.RPC_SubOpts{ - &pb.RPC_SubOpts{ + { Topicid: &topic1, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic2, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic3, Subscribe: &yes, }, @@ -108,24 +108,24 @@ func TestSubscriptionFilterDeduplication(t *testing.T) { yes := true no := false subs := []*pb.RPC_SubOpts{ - &pb.RPC_SubOpts{ + { Topicid: &topic1, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic1, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic2, Subscribe: &yes, }, - &pb.RPC_SubOpts{ + { Topicid: &topic2, Subscribe: &no, }, - &pb.RPC_SubOpts{ + { Topicid: &topic3, Subscribe: &yes, }, diff --git a/validation_builtin_test.go b/validation_builtin_test.go index 267cc6be..bca8774c 100644 --- a/validation_builtin_test.go +++ b/validation_builtin_test.go @@ -246,7 +246,7 @@ func (r *replayActor) replay(msg *pb.Message) { var peers []peer.ID r.mx.Lock() - for p, _ := range r.out { + for p := range r.out { if rng.Intn(2) > 0 { peers = append(peers, p) } From 8f56e8c97ae2df675d33d7a624fc5315ff61d8f8 Mon Sep 17 00:00:00 2001 From: galargh Date: Mon, 5 Aug 2024 19:43:28 +0200 Subject: [PATCH 12/18] chore: update rand usage --- backoff.go | 1 - floodsub_test.go | 11 ++++++----- gossipsub_spam_test.go | 2 +- gossipsub_test.go | 35 ++++++++++++++++++----------------- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/backoff.go b/backoff.go index 4909e153..99ca7fd0 100644 --- a/backoff.go +++ b/backoff.go @@ -43,7 +43,6 @@ func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Dur info: make(map[peer.ID]*backoffHistory), } - rand.Seed(time.Now().UnixNano()) // used for jitter go b.cleanupLoop(ctx) return b diff --git a/floodsub_test.go b/floodsub_test.go index 0168b15f..8a2db35b 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -3,11 +3,12 @@ package pubsub import ( "bytes" "context" + crand "crypto/rand" "crypto/sha256" "encoding/base64" "fmt" "io" - "math/rand" + mrand "math/rand" "sort" "sync" "testing" @@ -25,7 +26,7 @@ import ( func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) { data := make([]byte, 16) - rand.Read(data) + crand.Read(data) for _, p := range pubs { err := p.Publish(topic, data) @@ -58,7 +59,7 @@ func denseConnect(t *testing.T, hosts []host.Host) { func connectSome(t *testing.T, hosts []host.Host, d int) { for i, a := range hosts { for j := 0; j < d; j++ { - n := rand.Intn(len(hosts)) + n := mrand.Intn(len(hosts)) if n == i { j-- continue @@ -157,7 +158,7 @@ func TestBasicFloodsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -1006,7 +1007,7 @@ func TestConfigurableMaxMessageSize(t *testing.T) { // 2mb payload. msg := make([]byte, 1<<21) - rand.Read(msg) + crand.Read(msg) err := psubs[0].Publish(topic, msg) if err != nil { t.Fatal(err) diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index 3ccb1ab4..8e9b40e3 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -2,7 +2,7 @@ package pubsub import ( "context" - "math/rand" + "crypto/rand" "strconv" "sync" "testing" diff --git a/gossipsub_test.go b/gossipsub_test.go index 8c9419d8..d4a8a79d 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3,9 +3,10 @@ package pubsub import ( "bytes" "context" + crand "crypto/rand" "fmt" "io" - "math/rand" + mrand "math/rand" "sync" "sync/atomic" "testing" @@ -63,7 +64,7 @@ func TestSparseGossipsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -104,7 +105,7 @@ func TestDenseGossipsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -358,7 +359,7 @@ func TestGossipsubGossip(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -416,7 +417,7 @@ func TestGossipsubGossipPiggyback(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) psubs[owner].Publish("bazcrux", msg) @@ -563,7 +564,7 @@ func TestGossipsubPrune(t *testing.T) { for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -661,7 +662,7 @@ func TestGossipsubPruneBackoffTime(t *testing.T) { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) // Don't publish from host 0, since everyone should have pruned it. - owner := rand.Intn(len(psubs)-1) + 1 + owner := mrand.Intn(len(psubs)-1) + 1 psubs[owner].Publish("foobar", msg) @@ -706,7 +707,7 @@ func TestGossipsubGraft(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -755,7 +756,7 @@ func TestGossipsubRemovePeer(t *testing.T) { for i := 0; i < 10; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := 5 + rand.Intn(len(psubs)-5) + owner := 5 + mrand.Intn(len(psubs)-5) psubs[owner].Publish("foobar", msg) @@ -803,7 +804,7 @@ func TestGossipsubGraftPruneRetry(t *testing.T) { for i, topic := range topics { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish(topic, msg) @@ -849,7 +850,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { // create a background flood of messages that overloads the queues done := make(chan struct{}) go func() { - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) for i := 0; i < 10000; i++ { msg := []byte("background flooooood") psubs[owner].Publish("flood", msg) @@ -887,7 +888,7 @@ func TestGossipsubControlPiggyback(t *testing.T) { for i, topic := range topics { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish(topic, msg) @@ -930,7 +931,7 @@ func TestMixedGossipsub(t *testing.T) { for i := 0; i < 100; i++ { msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) - owner := rand.Intn(len(psubs)) + owner := mrand.Intn(len(psubs)) psubs[owner].Publish("foobar", msg) @@ -2217,7 +2218,7 @@ func TestGossipsubRPCFragmentation(t *testing.T) { msgSize := 20000 for i := 0; i < nMessages; i++ { msg := make([]byte, msgSize) - rand.Read(msg) + crand.Read(msg) ps.Publish("test", msg) time.Sleep(20 * time.Millisecond) } @@ -2357,7 +2358,7 @@ func TestFragmentRPCFunction(t *testing.T) { mkMsg := func(size int) *pb.Message { msg := &pb.Message{} msg.Data = make([]byte, size-4) // subtract the protobuf overhead, so msg.Size() returns requested size - rand.Read(msg.Data) + crand.Read(msg.Data) return msg } @@ -2471,7 +2472,7 @@ func TestFragmentRPCFunction(t *testing.T) { messageIds := make([]string, msgsPerTopic) for m := 0; m < msgsPerTopic; m++ { mid := make([]byte, messageIdSize) - rand.Read(mid) + crand.Read(mid) messageIds[m] = string(mid) } rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds} @@ -2492,7 +2493,7 @@ func TestFragmentRPCFunction(t *testing.T) { // It should not be present in the fragmented messages, but smaller IDs should be rpc.Reset() giantIdBytes := make([]byte, limit*2) - rand.Read(giantIdBytes) + crand.Read(giantIdBytes) rpc.Control = &pb.ControlMessage{ Iwant: []*pb.ControlIWant{ {MessageIDs: []string{"hello", string(giantIdBytes)}}, From 097b4671b033d40918dc2ed9bbbabf1ab411300b Mon Sep 17 00:00:00 2001 From: galargh Date: Mon, 5 Aug 2024 19:51:38 +0200 Subject: [PATCH 13/18] chore: staticcheck --- floodsub_test.go | 1 + gossipsub_spam_test.go | 1 + gossipsub_test.go | 9 +++------ timecache/time_cache.go | 4 ---- topic_test.go | 2 +- trace_test.go | 1 + tracer.go | 1 + 7 files changed, 8 insertions(+), 11 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 8a2db35b..e7bf379f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index 8e9b40e3..ab22e7a9 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -15,6 +15,7 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) diff --git a/gossipsub_test.go b/gossipsub_test.go index d4a8a79d..8c2a216e 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -20,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/record" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) @@ -1924,13 +1925,11 @@ func TestGossipSubLeaveTopic(t *testing.T) { connect(t, h[0], h[1]) // Join all peers - var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe("test") + _, err := ps.Subscribe("test") if err != nil { t.Fatal(err) } - subs = append(subs, sub) } time.Sleep(time.Second) @@ -2005,13 +2004,11 @@ func TestGossipSubJoinTopic(t *testing.T) { router0.backoff["test"] = peerMap // Join all peers - var subs []*Subscription for _, ps := range psubs { - sub, err := ps.Subscribe("test") + _, err := ps.Subscribe("test") if err != nil { t.Fatal(err) } - subs = append(subs, sub) } time.Sleep(time.Second) diff --git a/timecache/time_cache.go b/timecache/time_cache.go index e33bc354..ee34fd5b 100644 --- a/timecache/time_cache.go +++ b/timecache/time_cache.go @@ -2,12 +2,8 @@ package timecache import ( "time" - - logger "github.com/ipfs/go-log/v2" ) -var log = logger.Logger("pubsub/timecache") - // Stategy is the TimeCache expiration strategy to use. type Strategy uint8 diff --git a/topic_test.go b/topic_test.go index a27113b2..ef05feb4 100644 --- a/topic_test.go +++ b/topic_test.go @@ -743,7 +743,7 @@ func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) { } // Wait for the unsubscribe messages to reach the primary peer - for len(primaryTopic.ListPeers()) < 0 { + for len(primaryTopic.ListPeers()) > 0 { time.Sleep(time.Millisecond * 100) } diff --git a/trace_test.go b/trace_test.go index 7717a7e2..287216f1 100644 --- a/trace_test.go +++ b/trace_test.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) diff --git a/tracer.go b/tracer.go index 8e744c91..cbb92ad7 100644 --- a/tracer.go +++ b/tracer.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + //lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated "github.com/libp2p/go-msgio/protoio" ) From 5c9a4d053625681baddf5cbe1d37fee8005f9820 Mon Sep 17 00:00:00 2001 From: Piotr Galar Date: Mon, 5 Aug 2024 20:03:41 +0200 Subject: [PATCH 14/18] ci: create go-test-config.json --- .github/workflows/go-test-config.json | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .github/workflows/go-test-config.json diff --git a/.github/workflows/go-test-config.json b/.github/workflows/go-test-config.json new file mode 100644 index 00000000..d59c1fbf --- /dev/null +++ b/.github/workflows/go-test-config.json @@ -0,0 +1,4 @@ +{ + "skipOSes": ["windows"], + "skipRace": true +} From dc33a34d4d4976ec48e44b32a59344513c560446 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 6 Aug 2024 20:43:14 +0000 Subject: [PATCH 15/18] ci: disable testing on macos (#571) It appears to be a bit flaky and we have nothing macos specific in this repo. --- .github/workflows/go-test-config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go-test-config.json b/.github/workflows/go-test-config.json index d59c1fbf..b0642fbe 100644 --- a/.github/workflows/go-test-config.json +++ b/.github/workflows/go-test-config.json @@ -1,4 +1,4 @@ { - "skipOSes": ["windows"], + "skipOSes": ["windows", "macos"], "skipRace": true } From 19ffbb3a482caecabb8520917c631e3047a78094 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 6 Aug 2024 20:43:28 +0000 Subject: [PATCH 16/18] Re-enable disabled gossipsub test (#566) And change it to take into account the fact that libp2p now trims connections immediately (when no grace-period is specified) instead of waiting for a timeout. --- gossipsub_connmgr_test.go | 15 +-------------- gossipsub_test.go | 13 ++++++++++--- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/gossipsub_connmgr_test.go b/gossipsub_connmgr_test.go index accf57dd..a5477026 100644 --- a/gossipsub_connmgr_test.go +++ b/gossipsub_connmgr_test.go @@ -15,7 +15,6 @@ import ( ) func TestGossipsubConnTagMessageDeliveries(t *testing.T) { - t.Skip("Test disabled with go-libp2p v0.22.0") // TODO: reenable test when updating to v0.23.0 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -90,7 +89,7 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { // sybil squatters to be connected later sybilHosts := getDefaultHosts(t, nSquatter) for _, h := range sybilHosts { - squatter := &sybilSquatter{h: h} + squatter := &sybilSquatter{h: h, ignoreErrors: true} h.SetStreamHandler(GossipSubID_v10, squatter.handleStream) } @@ -144,18 +143,6 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) { allHosts := append(honestHosts, sybilHosts...) connectAll(t, allHosts) - // verify that we have a bunch of connections - for _, h := range honestHosts { - if len(h.Network().Conns()) != nHonest+nSquatter-1 { - t.Errorf("expected to have conns to all peers, have %d", len(h.Network().Conns())) - } - } - - // force the connection managers to trim, so we don't need to muck about with timing as much - for _, cm := range connmgrs { - cm.TrimOpenConns(ctx) - } - // we should still have conns to all the honest peers, but not the sybils for _, h := range honestHosts { nHonestConns := 0 diff --git a/gossipsub_test.go b/gossipsub_test.go index 8c2a216e..4481be9e 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2025,7 +2025,8 @@ func TestGossipSubJoinTopic(t *testing.T) { } type sybilSquatter struct { - h host.Host + h host.Host + ignoreErrors bool // set to false to ignore connection/stream errors. } func (sq *sybilSquatter) handleStream(s network.Stream) { @@ -2033,7 +2034,10 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10) if err != nil { - panic(err) + if !sq.ignoreErrors { + panic(err) + } + return } // send a subscription for test in the output stream to become candidate for GRAFT @@ -2044,7 +2048,10 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { topic := "test" err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}}) if err != nil { - panic(err) + if !sq.ignoreErrors { + panic(err) + } + return } var rpc pb.RPC From b12b0e138b65e13c409fb988e38eba111c71004d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Sun, 30 Jun 2024 20:54:50 -0400 Subject: [PATCH 17/18] feat: expose router and mesh peers (#1) --- gossipsub.go | 16 ++++++++++++++++ pubsub.go | 4 ++++ 2 files changed, 20 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index 3121a210..cdbd7345 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1993,6 +1993,22 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } +func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID { + peers, ok := gs.mesh[topic] + if !ok { + return nil + } + + result := make([]peer.ID, len(peers)) + i := 0 + for p := range peers { + result[i] = p + i++ + } + + return result +} + // WithDefaultTagTracer returns the tag tracer of the GossipSubRouter as a PubSub option. // This is useful for cases where the GossipSubRouter is instantiated externally, and is // injected into the GossipSub constructor as a dependency. This allows the tag tracer to be diff --git a/pubsub.go b/pubsub.go index 24c297dd..8956abd0 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1420,3 +1420,7 @@ type addRelayReq struct { topic string resp chan RelayCancelFunc } + +func (p *PubSub) Router() PubSubRouter { + return p.rt +} From fab982a94aed5a6be77748894233c06d1022b6c0 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 3 Jul 2024 15:16:59 -0400 Subject: [PATCH 18/18] refactor: create MeshPeer function in Pubsub --- gossipsub.go | 2 +- pubsub.go | 29 +++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index cdbd7345..da841615 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1993,7 +1993,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } -func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID { +func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID { peers, ok := gs.mesh[topic] if !ok { return nil diff --git a/pubsub.go b/pubsub.go index 8956abd0..330b6f11 100644 --- a/pubsub.go +++ b/pubsub.go @@ -90,6 +90,9 @@ type PubSub struct { // get chan of peers we are connected to getPeers chan *listPeerReq + // get chan to obtain list of full mesh peers (only applies when ussing gossipsub) + getMeshPeers chan *listPeerReq + // send subscription here to cancel it cancelCh chan *Subscription @@ -271,6 +274,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), + getMeshPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), addRelay: make(chan *addRelayReq), rmRelay: make(chan string), @@ -616,6 +620,13 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleAddRelay(relay) case topic := <-p.rmRelay: p.handleRemoveRelay(topic) + case meshpreq := <-p.getMeshPeers: + var peers []peer.ID + rt, ok := p.rt.(*GossipSubRouter) + if ok { + peers = rt.meshPeers(meshpreq.topic) + } + meshpreq.resp <- peers case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -1364,6 +1375,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID { return <-out } +// MeshPeers returns a list of full mesh peers for a given topic +func (p *PubSub) MeshPeers(topic string) []peer.ID { + out := make(chan []peer.ID) + select { + case p.getMeshPeers <- &listPeerReq{ + resp: out, + topic: topic, + }: + case <-p.ctx.Done(): + return nil + } + return <-out +} + // BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped. func (p *PubSub) BlacklistPeer(pid peer.ID) { select { @@ -1420,7 +1445,3 @@ type addRelayReq struct { topic string resp chan RelayCancelFunc } - -func (p *PubSub) Router() PubSubRouter { - return p.rt -}