Skip to content

Commit

Permalink
refactor: create MeshPeer function in Pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored and chaitanyaprem committed Aug 7, 2024
1 parent b12b0e1 commit fab982a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
2 changes: 1 addition & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1993,7 +1993,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
return peers
}

func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID {
func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID {
peers, ok := gs.mesh[topic]
if !ok {
return nil
Expand Down
29 changes: 25 additions & 4 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type PubSub struct {
// get chan of peers we are connected to
getPeers chan *listPeerReq

// get chan to obtain list of full mesh peers (only applies when ussing gossipsub)
getMeshPeers chan *listPeerReq

// send subscription here to cancel it
cancelCh chan *Subscription

Expand Down Expand Up @@ -271,6 +274,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
getMeshPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq),
rmRelay: make(chan string),
Expand Down Expand Up @@ -616,6 +620,13 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleAddRelay(relay)
case topic := <-p.rmRelay:
p.handleRemoveRelay(topic)
case meshpreq := <-p.getMeshPeers:
var peers []peer.ID
rt, ok := p.rt.(*GossipSubRouter)
if ok {
peers = rt.meshPeers(meshpreq.topic)
}
meshpreq.resp <- peers
case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok {
Expand Down Expand Up @@ -1364,6 +1375,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
return <-out
}

// MeshPeers returns a list of full mesh peers for a given topic
func (p *PubSub) MeshPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
select {
case p.getMeshPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}

// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
select {
Expand Down Expand Up @@ -1420,7 +1445,3 @@ type addRelayReq struct {
topic string
resp chan RelayCancelFunc
}

func (p *PubSub) Router() PubSubRouter {
return p.rt
}

0 comments on commit fab982a

Please sign in to comment.