-
Notifications
You must be signed in to change notification settings - Fork 3
Fix: LDK remove mutex #92
Changes from 4 commits
2d702bf
df1f98a
1809717
cc5b8c6
737a35e
72864f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"slices" | ||
|
||
"github.com/getAlby/ldk-node-go/ldk_node" | ||
"github.com/sirupsen/logrus" | ||
) | ||
|
||
// based on https://betterprogramming.pub/how-to-broadcast-messages-in-go-using-channels-b68f42bdf32e | ||
type ldkEventBroadcastServer struct { | ||
logger *logrus.Logger | ||
source <-chan *ldk_node.Event | ||
listeners []chan *ldk_node.Event | ||
addListener chan chan *ldk_node.Event | ||
removeListener chan (<-chan *ldk_node.Event) | ||
} | ||
|
||
type LDKEventBroadcaster interface { | ||
Subscribe() chan *ldk_node.Event | ||
CancelSubscription(chan *ldk_node.Event) | ||
} | ||
|
||
func NewLDKEventBroadcaster(logger *logrus.Logger, ctx context.Context, source <-chan *ldk_node.Event) LDKEventBroadcaster { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: the common idiom in Go is "return structs, accept interfaces". That is, unless you have a good reason to not expose the concrete type, it is recommended to return structs, not interfaces, from constructors. And the interfaces are generally implemented on the consumer side, not where the type is implemented :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand why the concrete type should be exposed. The consumer only needs to access the public interface. This is the same how we expose our LNClient to ensure the different implementations (LND, LDK, Greenlight, Breez) follow the same interface. Could you give a simple example of how we would re-structure this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's a good discussion of the principle and why it is encouraged for Go: https://medium.com/@cep21/preemptive-interface-anti-pattern-in-go-54c18ac0668a I'm not saying it's absolutely necessary to follow it — just felt I should mention it while I'm at it 😉 |
||
service := &ldkEventBroadcastServer{ | ||
logger: logger, | ||
source: source, | ||
listeners: make([]chan *ldk_node.Event, 0), | ||
addListener: make(chan chan *ldk_node.Event), | ||
removeListener: make(chan (<-chan *ldk_node.Event)), | ||
} | ||
go service.serve(ctx) | ||
return service | ||
} | ||
|
||
func (s *ldkEventBroadcastServer) Subscribe() chan *ldk_node.Event { | ||
newListener := make(chan *ldk_node.Event) | ||
s.addListener <- newListener | ||
return newListener | ||
} | ||
|
||
func (s *ldkEventBroadcastServer) CancelSubscription(channel chan *ldk_node.Event) { | ||
close(channel) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, this will not work either. After you close the channel, it doesn't make sense to drain it, as there's nothing to drain (and all reads will return This approach kind of goes against the intended usage scenario of channels in Go: it's the sender who drives communication and is responsible for closing channels, therefore the language is designed to panic on write into closed channel. I suggest a different approach:
Additionally, you may want to update the broadcaster loop to discard events if they fail to be sent immediately (or within a small period of time). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdmitr removing the cancel seems to still give a chance of deadlock because even if I read all the messages at that point, a new message could be received immediately after the loop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is an example I tested which I think works quite well - you can see the broadcaster is unblocked if the channel is closed.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also added a timeout for sending events to the listeners. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will merge this now - it's not perfect but a definite improvement over what we have currently in master. Thanks for your review @rdmitr |
||
// make sure the channel is drained after closing it | ||
out: | ||
for { | ||
select { | ||
case _, ok := <-channel: | ||
if !ok { | ||
break out | ||
} | ||
default: | ||
break out | ||
} | ||
} | ||
s.removeListener <- channel | ||
} | ||
|
||
func (s *ldkEventBroadcastServer) serve(ctx context.Context) { | ||
defer func() { | ||
for _, listener := range s.listeners { | ||
if listener != nil { | ||
close(listener) | ||
} | ||
} | ||
}() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case newListener := <-s.addListener: | ||
s.listeners = append(s.listeners, newListener) | ||
case listenerToRemove := <-s.removeListener: | ||
for i, listener := range s.listeners { | ||
if listener == listenerToRemove { | ||
s.listeners[i] = s.listeners[len(s.listeners)-1] | ||
s.listeners = slices.Delete(s.listeners, len(s.listeners)-1, len(s.listeners)) | ||
break | ||
} | ||
} | ||
case event := <-s.source: | ||
s.logger.Debugf("Sending LDK event +%v to %d listeners", *event, len(s.listeners)) | ||
for _, listener := range s.listeners { | ||
func() { | ||
// if we fail to send the event to the listener it was probably closed | ||
defer func() { | ||
if r := recover(); r != nil { | ||
s.logger.Errorf("Failed to send event to listener: %v", r) | ||
} | ||
}() | ||
listener <- event | ||
}() | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking of it, I still feel a bit uneasy about this. Imagine this scenario:
ldkEventSubscription
;ldkEventSubscription
;removeListener
channel;CancelSubscription()
never unblocks;Not sure if this logic is correct though; WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdmitr I changed the code to close the channel and then drain it as part of the CancelSubscription function. I think this should remove the chance of a deadlock. Then when sending events to the channel, I also recover in case an event is sent to a closed channel. What do you think?