-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
peer: make peer meet query.Peer interface #2287
base: master
Are you sure you want to change the base?
peer: make peer meet query.Peer interface #2287
Conversation
cd4d77c
to
2dde721
Compare
Pull Request Test Coverage Report for Build 12290679740Details
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Straight forward diff.
One thought I had is if we should actually overhaul the the query.Peer
interface to better suit our needs, based on the new set of requirements. Given that you've started to use it to implement the out of order block download, do you find it lacking in any notable ways?
// messages received from this peer will be sent on the returned | ||
// channel. A closure is also returned, that should be called to cancel | ||
// the subscription. | ||
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's thread safe now :)
peer/peer.go
Outdated
// Cancellation is just removing the channel from the subscribers list. | ||
idx := len(p.subscribers) - 1 | ||
cancel := func() { | ||
p.subscribers = append(p.subscribers[:idx], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use slice.Delete
here: https://pkg.go.dev/slices#Delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update the go.mod to 1.18? go build fails because go.mod specifies 1.17
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually no need since I changed this to a map
peer/peer.go
Outdated
// channel. A closure is also returned, that should be called to cancel | ||
// the subscription. | ||
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { | ||
msgChan := make(chan wire.Message, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so a buffer of 1 will mean that writeMessage
will block anytime a subscriber fails to immediately read out a message. If that goroutine blocks, then all outbound message writing will also halt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed so that it's no longer buffered but has each msg send as a new goroutine
peer/peer.go
Outdated
@@ -1402,6 +1424,10 @@ out: | |||
// needed. | |||
rmsg, buf, err := p.readMessage(p.wireEncoding) | |||
idleTimer.Stop() | |||
// Send the received message to all the subscribers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: lack of newline above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
peer/peer.go
Outdated
@@ -1402,6 +1424,10 @@ out: | |||
// needed. | |||
rmsg, buf, err := p.readMessage(p.wireEncoding) | |||
idleTimer.Stop() | |||
// Send the received message to all the subscribers. | |||
for _, sub := range p.subscribers { | |||
sub <- rmsg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we only do this in the case of a non-error case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Biggest thing I'm noticing is: 1: Hard to punish slow peers on the btcd side since there's no really good way of figuring out which peer is slow or which peer timed out on a block. That information is kept on the 2: The Also this is not exactly about the Peer interface but: 3: Peer string passed through |
query.Peer is used for downloading blocks out of order during headers first download. Methods SubscribeRecvMsg() and OnDisconnect() are added to abide by the interface.
2dde721
to
32d7f0c
Compare
Addressed everything in the new push |
query.Peer is used for downloading blocks out of order during headers first download. Methods SubscribeRecvMsg() and OnDisconnect() are added to abide by the interface.
This PR is part of #2226 and is split here so that it's easier to review.