Skip to content

Commit

Permalink
peer: make peer meet query.Peer interface
Browse files Browse the repository at this point in the history
query.Peer is used for downloading blocks out of order during headers
first download.  Methods SubscribeRecvMsg() and OnDisconnect() are added
to abide by the interface.
  • Loading branch information
kcalvinalvin committed Dec 12, 2024
1 parent ec0b90d commit 32d7f0c
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ type Peer struct {
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

// subscribers is a channel for relaying all messages that were received
// to this peer.
subscribers map[recvMsgsubscription]struct{}
subscriberLock sync.Mutex
}

// String returns the peer's address and directionality as a human-readable
Expand Down Expand Up @@ -1098,6 +1103,35 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return msg, buf, nil
}

// recvMsgsubscription is two channels for a subscriber of recevied messages.
// msgChan for sending the messages and quit for cancelling the subscription.
type recvMsgsubscription struct {
msgChan chan wire.Message
quit chan struct{}
}

// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
// the subscription.
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()

// No need to buffer this channel as we'll spin up a new goroutine for
// every send.
msgChan := make(chan wire.Message)
quit := make(chan struct{})
sub := recvMsgsubscription{
msgChan,
quit,
}

p.subscribers[sub] = struct{}{}

return msgChan, func() { close(quit) }
}

// writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting.
Expand Down Expand Up @@ -1446,6 +1480,27 @@ out:
}
break out
}

// Send the received message to all the subscribers.
for sub := range p.subscribers {
select {
case <-sub.quit:
delete(p.subscribers, sub)
continue
default:
}

// Spin up a goroutine so that we don't block here.
go func(subscription chan wire.Message,
quit chan struct{}) {

select {
case subscription <- rmsg:
case <-p.quit:
}

}(sub.msgChan, p.quit)
}
atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

Expand Down Expand Up @@ -1961,6 +2016,12 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
func (p *Peer) OnDisconnect() <-chan struct{} {
return p.quit
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
Expand Down Expand Up @@ -2397,6 +2458,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer {
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
subscribers: make(map[recvMsgsubscription]struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
Expand Down

0 comments on commit 32d7f0c

Please sign in to comment.