Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer: make peer meet query.Peer interface #2287

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ type Peer struct {
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

// subscribers is a channel for relaying all messages that were received
// to this peer.
subscribers map[recvMsgsubscription]struct{}
subscriberLock sync.Mutex
}

// String returns the peer's address and directionality as a human-readable
Expand Down Expand Up @@ -1098,6 +1103,35 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return msg, buf, nil
}

// recvMsgsubscription is two channels for a subscriber of recevied messages.
// msgChan for sending the messages and quit for cancelling the subscription.
type recvMsgsubscription struct {
msgChan chan wire.Message
quit chan struct{}
}

// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
// the subscription.
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be thread safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's thread safe now :)

p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()

// No need to buffer this channel as we'll spin up a new goroutine for
// every send.
msgChan := make(chan wire.Message)
quit := make(chan struct{})
sub := recvMsgsubscription{
msgChan,
quit,
}

p.subscribers[sub] = struct{}{}

return msgChan, func() { close(quit) }
}

// writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting.
Expand Down Expand Up @@ -1446,6 +1480,27 @@ out:
}
break out
}

// Send the received message to all the subscribers.
for sub := range p.subscribers {
select {
case <-sub.quit:
delete(p.subscribers, sub)
continue
default:
}

// Spin up a goroutine so that we don't block here.
go func(subscription chan wire.Message,
quit chan struct{}) {

select {
case subscription <- rmsg:
case <-p.quit:
}

}(sub.msgChan, p.quit)
}
atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

Expand Down Expand Up @@ -1961,6 +2016,12 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
func (p *Peer) OnDisconnect() <-chan struct{} {
return p.quit
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
Expand Down Expand Up @@ -2397,6 +2458,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer {
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
subscribers: make(map[recvMsgsubscription]struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
Expand Down
Loading