-
Notifications
You must be signed in to change notification settings - Fork 548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protect shared data accesses in libp2p_helper
#14560
Protect shared data accesses in libp2p_helper
#14560
Conversation
defer app.StreamsMutex.Unlock() | ||
app.Streams[streamIdx] = stream | ||
go func() { | ||
// FIXME HACK: allow time for the openStreamResult to get printed before we start inserting stream events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need confirmation but I think this is a leftover from an older implementation that didn't track sequence numbers and is not needed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hack might still be needed.
The code tries to synchronize calls to app.writeMsg
: call corresponding to openStream
response should precede the call about message received.
Is the change necessary?
I know how we could do it the right way, but it would require some effort.
P.S. seems like the ordering on openStream
and messages within might not be necessary, as it seems like the first message on stream always comes from the stream initiator. But I'd prefer to avoid relying on this much, hence if we are to remove the hack, I'd preserve ordering in a different way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will restore it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this hack in commit 036464d
libp2p_helper
func (app *app) AddPeers(infos ...peer.AddrInfo) { | ||
app.addedPeersMutex.Lock() | ||
app._addedPeers = append(app._addedPeers, infos...) | ||
app.addedPeersMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rewrite all unlocks to an idiomatic version with defer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will update them all
defer app.StreamsMutex.Unlock() | ||
app.Streams[streamIdx] = stream | ||
go func() { | ||
// FIXME HACK: allow time for the openStreamResult to get printed before we start inserting stream events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hack might still be needed.
The code tries to synchronize calls to app.writeMsg
: call corresponding to openStream
response should precede the call about message received.
Is the change necessary?
I know how we could do it the right way, but it would require some effort.
P.S. seems like the ordering on openStream
and messages within might not be necessary, as it seems like the first message on stream always comes from the stream initiator. But I'd prefer to avoid relying on this much, hence if we are to remove the hack, I'd preserve ordering in a different way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments, I'm going to fix them myself in PR #14467
handleStreamReads(app, stream, streamIdx) | ||
app.writeMsg(mkIncomingStreamUpcall(peerinfo, streamIdx, protocolId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reordering shouldn't have happened.
func (app *app) GetAddedPeers() []peer.AddrInfo { | ||
app.addedPeersMutex.RLock() | ||
defer app.addedPeersMutex.RUnlock() | ||
copyOfAddedPeers := make([]peer.AddrInfo, len(app._addedPeers)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can avoid explicit copying by explicitly specifying non-modification contract in comment to the function
app.ValidatorMutex.Lock() | ||
defer app.ValidatorMutex.Unlock() | ||
if st, ok := app.Validators[seqno]; ok { | ||
found := app.FinishValidator(seqno, func(st *validationStatus) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving function out of mutex
defer app.subsMutex.Unlock() | ||
|
||
if sub, ok := app._subs[subId]; ok { | ||
sub.Sub.Cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving I/O code out of the locked section
defer app.streamsMutex.Unlock() | ||
if stream, ok := app._streams[streamId]; ok { | ||
delete(app._streams, streamId) | ||
err := stream.Reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving I/O code out of the locked section
defer app.streamsMutex.Unlock() | ||
if stream, ok := app._streams[streamId]; ok { | ||
delete(app._streams, streamId) | ||
err := stream.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving I/O code out of the locked section
if stream, ok := app._streams[streamId]; ok { | ||
n, err := stream.Write(data) | ||
if err != nil { | ||
// TODO check that it's correct to error out, not repeat writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving I/O code out of the locked section
e64d3d5
into
MinaProtocol:fix/libp2p-helper-concurrent-writes
Explain your changes:
This is an ad-hoc solution, ideally a single thread would be touching this state and no mutexes would be required.
Explain how you tested your changes:
libp2p_helper
with the-race
flag, launched the node and left it running for a while.These still showed up with the original patch but don't show anymore once the patch in this branch is applied too:
#14467 (comment)
Checklist: