From 72864f5bc1d32c16788e5619d843df186dd88925 Mon Sep 17 00:00:00 2001 From: Roland Bewick Date: Fri, 8 Mar 2024 12:02:06 +0700 Subject: [PATCH] fix: ldk broadcaster cleanup channels, add timeout for sending events --- ldk_event_broadcaster.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/ldk_event_broadcaster.go b/ldk_event_broadcaster.go index be814a5a..4a6c2ffd 100644 --- a/ldk_event_broadcaster.go +++ b/ldk_event_broadcaster.go @@ -3,6 +3,7 @@ package main import ( "context" "slices" + "time" "github.com/getAlby/ldk-node-go/ldk_node" "github.com/sirupsen/logrus" @@ -48,9 +49,14 @@ func (s *ldkEventBroadcastServer) CancelSubscription(channel chan *ldk_node.Even func (s *ldkEventBroadcastServer) serve(ctx context.Context) { defer func() { for _, listener := range s.listeners { - if listener != nil { + func() { + defer func() { + if r := recover(); r != nil { + s.logger.Errorf("Failed to close channel: %v", r) + } + }() close(listener) - } + }() } }() @@ -69,7 +75,7 @@ func (s *ldkEventBroadcastServer) serve(ctx context.Context) { } } case event := <-s.source: - s.logger.Debugf("Sending LDK event +%v to %d listeners", *event, len(s.listeners)) + s.logger.Debugf("Sending LDK event %+v to %d listeners", *event, len(s.listeners)) for _, listener := range s.listeners { func() { // if we fail to send the event to the listener it was probably closed @@ -78,7 +84,13 @@ func (s *ldkEventBroadcastServer) serve(ctx context.Context) { s.logger.Errorf("Failed to send event to listener: %v", r) } }() - listener <- event + + select { + case listener <- event: + s.logger.Debugln("sent event to listener") + case <-time.After(5 * time.Second): + s.logger.Errorf("Timeout sending %+v to listener", *event) + } }() } }