Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Fix: LDK remove mutex #92

Merged
merged 6 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 27 additions & 63 deletions ldk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/getAlby/ldk-node-go/ldk_node"
Expand All @@ -20,12 +19,12 @@ import (
)

type LDKService struct {
svc *Service
workdir string
node *ldk_node.LdkNode
cancelLdkEventListenerCtx context.CancelFunc
subscribeLdkEvents func() chan ldk_node.Event
unsubscribeLdkEvents func(chan ldk_node.Event)
svc *Service
workdir string
node *ldk_node.LdkNode
ldkEventBroadcaster LDKEventBroadcaster
ctx context.Context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: storing context in structs is discouraged. It's better to pass them to specific methods instead.

(I see it has been done this way before in other places in the codebase, so just FYI 😉)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this wasn't even used. Thanks.

For the other ones I think it should be possible to remove them. We need to move the signal.NotifyContext out of service into the wails/http main functions and then it shouldn't be necessary to store it on the service. I'm not sure why I did it that way. I will create a separate issue for this.

cancel context.CancelFunc
}

func NewLDKService(svc *Service, mnemonic, workDir string, network string, esploraServer string, gossipSource string) (result lnclient.LNClient, err error) {
Expand Down Expand Up @@ -72,43 +71,14 @@ func NewLDKService(svc *Service, mnemonic, workDir string, network string, esplo
return nil, err
}

// TODO: move this event handler code
ldkEventListenerCtx, cancelLdkEventListenerCtx := context.WithCancel(context.Background())
ldkEventHandlers := []chan ldk_node.Event{}
var ldkEventHandlersMutex sync.Mutex

subscribeLdkEvents := func() chan ldk_node.Event {
ldkEventHandler := make(chan ldk_node.Event)
svc.Logger.Debugf("Locking event handler mutex")
ldkEventHandlersMutex.Lock()
svc.Logger.Debugf("Locked event handler mutex")
ldkEventHandlers = append(ldkEventHandlers, ldkEventHandler)
ldkEventHandlersMutex.Unlock()
svc.Logger.Debugf("Unlocked event handler mutex")
return ldkEventHandler
}

unsubscribeLdkEvents := func(eventHandler chan ldk_node.Event) {
svc.Logger.Debugf("Locking event handler mutex")
ldkEventHandlersMutex.Lock()
svc.Logger.Debugf("Locked event handler mutex")
for i := 0; i < len(ldkEventHandlers); i++ {
if eventHandler == ldkEventHandlers[i] {
// Replace the element to be removed with the last element of the slice
ldkEventHandlers[i] = ldkEventHandlers[len(ldkEventHandlers)-1]
// Slice off the last element
ldkEventHandlers = ldkEventHandlers[:len(ldkEventHandlers)-1]
break
}
}
ldkEventHandlersMutex.Unlock()
svc.Logger.Debugf("Unlocked event handler mutex")
}
ldkEventConsumer := make(chan *ldk_node.Event)
ctx, cancel := context.WithCancel(svc.ctx)

// check for and forward new LDK events to LDKEventBroadcaster (through ldkEventConsumer)
go func() {
for {
select {
case <-ldkEventListenerCtx.Done():
case <-ctx.Done():
return
default:
// NOTE: do not use WaitNextEvent() as it can block the LDK thread
Expand All @@ -117,15 +87,9 @@ func NewLDKService(svc *Service, mnemonic, workDir string, network string, esplo
time.Sleep(time.Duration(1) * time.Millisecond)
continue
}
svc.Logger.Debugf("Locking event handler mutex")
ldkEventHandlersMutex.Lock()
svc.Logger.Debugf("Locked event handler mutex")
svc.Logger.Infof("Received LDK event %+v (%d listeners)", *event, len(ldkEventHandlers))
for _, eventHandler := range ldkEventHandlers {
eventHandler <- *event
}
ldkEventHandlersMutex.Unlock()
svc.Logger.Debugf("Unlocked event handler mutex")

svc.Logger.Infof("Received LDK event %+v", *event)
ldkEventConsumer <- event

node.EventHandled()
}
Expand All @@ -136,10 +100,10 @@ func NewLDKService(svc *Service, mnemonic, workDir string, network string, esplo
workdir: newpath,
node: node,
//listener: &listener,
svc: svc,
cancelLdkEventListenerCtx: cancelLdkEventListenerCtx,
subscribeLdkEvents: subscribeLdkEvents,
unsubscribeLdkEvents: unsubscribeLdkEvents,
svc: svc,
ctx: ctx,
cancel: cancel,
ldkEventBroadcaster: NewLDKEventBroadcaster(svc.Logger, ctx, ldkEventConsumer),
}

nodeId := node.NodeId()
Expand All @@ -155,16 +119,16 @@ func NewLDKService(svc *Service, mnemonic, workDir string, network string, esplo

func (gs *LDKService) Shutdown() error {
gs.svc.Logger.Infof("shutting down LDK client")
gs.cancelLdkEventListenerCtx()
gs.cancel()
gs.node.Destroy()

return nil
}

func (gs *LDKService) SendPaymentSync(ctx context.Context, payReq string) (preimage string, err error) {
paymentStart := time.Now()
eventListener := gs.subscribeLdkEvents()
defer gs.unsubscribeLdkEvents(eventListener)
ldkEventSubscription := gs.ldkEventBroadcaster.Subscribe()
defer gs.ldkEventBroadcaster.CancelSubscription(ldkEventSubscription)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of it, I still feel a bit uneasy about this. Imagine this scenario:

  • We subscribe and defer cancelling the subscription;
  • When we initiate cancellation, we don't listen to ldkEventSubscription;
  • If the broadcaster happens to broadcast an event at this exact moment, it will block trying to send to ldkEventSubscription;
  • Since the dispatch loop is blocked, it will never get to read the removeListener channel;
  • Since that channel is never read, CancelSubscription() never unblocks;
  • Deadlock?

Not sure if this logic is correct though; WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdmitr I changed the code to close the channel and then drain it as part of the CancelSubscription function. I think this should remove the chance of a deadlock. Then when sending events to the channel, I also recover in case an event is sent to a closed channel. What do you think?


paymentHash, err := gs.node.SendPayment(payReq)
if err != nil {
Expand All @@ -173,10 +137,10 @@ func (gs *LDKService) SendPaymentSync(ctx context.Context, payReq string) (preim
}

for start := time.Now(); time.Since(start) < time.Second*60; {
event := <-eventListener
event := <-ldkEventSubscription

eventPaymentSuccessful, isEventPaymentSuccessfulEvent := event.(ldk_node.EventPaymentSuccessful)
eventPaymentFailed, isEventPaymentFailedEvent := event.(ldk_node.EventPaymentFailed)
eventPaymentSuccessful, isEventPaymentSuccessfulEvent := (*event).(ldk_node.EventPaymentSuccessful)
eventPaymentFailed, isEventPaymentFailedEvent := (*event).(ldk_node.EventPaymentFailed)

if isEventPaymentSuccessfulEvent && eventPaymentSuccessful.PaymentHash == paymentHash {
gs.svc.Logger.Infof("Got payment success event")
Expand Down Expand Up @@ -432,8 +396,8 @@ func (gs *LDKService) OpenChannel(ctx context.Context, openChannelRequest *lncli
return nil, errors.New("node is not peered yet")
}

eventListener := gs.subscribeLdkEvents()
defer gs.unsubscribeLdkEvents(eventListener)
ldkEventSubscription := gs.ldkEventBroadcaster.Subscribe()
defer gs.ldkEventBroadcaster.CancelSubscription(ldkEventSubscription)

gs.svc.Logger.Infof("Opening channel with: %v", foundPeer.NodeId)
userChannelId, err := gs.node.ConnectOpenChannel(foundPeer.NodeId, foundPeer.Address, uint64(openChannelRequest.Amount), nil, nil, openChannelRequest.Public)
Expand All @@ -446,9 +410,9 @@ func (gs *LDKService) OpenChannel(ctx context.Context, openChannelRequest *lncli
gs.svc.Logger.Infof("Funded channel: %v", userChannelId)

for start := time.Now(); time.Since(start) < time.Second*60; {
event := <-eventListener
event := <-ldkEventSubscription

channelPendingEvent, isChannelPendingEvent := event.(ldk_node.EventChannelPending)
channelPendingEvent, isChannelPendingEvent := (*event).(ldk_node.EventChannelPending)

if !isChannelPendingEvent {
continue
Expand Down
77 changes: 77 additions & 0 deletions ldk_event_broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

import (
"context"
"slices"

"github.com/getAlby/ldk-node-go/ldk_node"
"github.com/sirupsen/logrus"
)

// based on https://betterprogramming.pub/how-to-broadcast-messages-in-go-using-channels-b68f42bdf32e
type ldkEventBroadcastServer struct {
logger *logrus.Logger
source <-chan *ldk_node.Event
listeners []chan *ldk_node.Event
addListener chan chan *ldk_node.Event
removeListener chan (<-chan *ldk_node.Event)
}

type LDKEventBroadcaster interface {
Subscribe() <-chan *ldk_node.Event
CancelSubscription(<-chan *ldk_node.Event)
}

func NewLDKEventBroadcaster(logger *logrus.Logger, ctx context.Context, source <-chan *ldk_node.Event) LDKEventBroadcaster {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: the common idiom in Go is "return structs, accept interfaces". That is, unless you have a good reason to not expose the concrete type, it is recommended to return structs, not interfaces, from constructors. And the interfaces are generally implemented on the consumer side, not where the type is implemented :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the concrete type should be exposed. The consumer only needs to access the public interface.

This is the same how we expose our LNClient to ensure the different implementations (LND, LDK, Greenlight, Breez) follow the same interface. Could you give a simple example of how we would re-structure this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a good discussion of the principle and why it is encouraged for Go: https://medium.com/@cep21/preemptive-interface-anti-pattern-in-go-54c18ac0668a

I'm not saying it's absolutely necessary to follow it — just felt I should mention it while I'm at it 😉

service := &ldkEventBroadcastServer{
logger: logger,
source: source,
listeners: make([]chan *ldk_node.Event, 0),
addListener: make(chan chan *ldk_node.Event),
removeListener: make(chan (<-chan *ldk_node.Event)),
}
go service.serve(ctx)
return service
}

func (s *ldkEventBroadcastServer) Subscribe() <-chan *ldk_node.Event {
newListener := make(chan *ldk_node.Event)
s.addListener <- newListener
return newListener
}

func (s *ldkEventBroadcastServer) CancelSubscription(channel <-chan *ldk_node.Event) {
s.removeListener <- channel
}

func (s *ldkEventBroadcastServer) serve(ctx context.Context) {
defer func() {
for _, listener := range s.listeners {
if listener != nil {
close(listener)
}
}
}()

for {
select {
case <-ctx.Done():
return
case newListener := <-s.addListener:
s.listeners = append(s.listeners, newListener)
case listenerToRemove := <-s.removeListener:
for i, listener := range s.listeners {
if listener == listenerToRemove {
s.listeners[i] = s.listeners[len(s.listeners)-1]
s.listeners = slices.Delete(s.listeners, len(s.listeners)-1, len(s.listeners))
close(listener)
break
}
}
case event := <-s.source:
for _, listener := range s.listeners {
listener <- event
}
}
}
}
Loading