From 2dde721cc933abff831a0d983ba7afa2c3ed76df Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 6 Aug 2024 17:53:22 +0900 Subject: [PATCH] peer: make peer meet query.Peer interface query.Peer is used for downloading blocks out of order during headers first download. Methods SubscribeRecvMsg() and OnDisconnect() are added to abide by the interface. --- peer/peer.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/peer/peer.go b/peer/peer.go index 195fc0b4fe..8ae7dc552e 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -494,6 +494,10 @@ type Peer struct { queueQuit chan struct{} outQuit chan struct{} quit chan struct{} + + // subscribers is a channel for relaying all messages that were received + // to this peer. + subscribers []chan wire.Message } // String returns the peer's address and directionality as a human-readable @@ -1098,6 +1102,24 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, return msg, buf, nil } +// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin +// messages received from this peer will be sent on the returned +// channel. A closure is also returned, that should be called to cancel +// the subscription. +func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { + msgChan := make(chan wire.Message, 1) + p.subscribers = append(p.subscribers, msgChan) + + // Cancellation is just removing the channel from the subscribers list. + idx := len(p.subscribers) - 1 + cancel := func() { + p.subscribers = append(p.subscribers[:idx], + p.subscribers[idx+1:]...) + } + + return msgChan, cancel +} + // writeMessage sends a bitcoin message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { // Don't do anything if we're disconnecting. @@ -1402,6 +1424,10 @@ out: // needed. rmsg, buf, err := p.readMessage(p.wireEncoding) idleTimer.Stop() + // Send the received message to all the subscribers. + for _, sub := range p.subscribers { + sub <- rmsg + } if err != nil { // In order to allow regression tests with malformed messages, don't // disconnect the peer when we're in regression test mode and the @@ -1961,6 +1987,12 @@ func (p *Peer) Disconnect() { close(p.quit) } +// OnDisconnect returns a channel that will be closed when this peer is +// disconnected. +func (p *Peer) OnDisconnect() <-chan struct{} { + return p.quit +} + // readRemoteVersionMsg waits for the next message to arrive from the remote // peer. If the next message is not a version message or the version is not // acceptable then return an error.