From b5059eb0e05a38a3c0fcd2f133f6359f27259ec4 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Thu, 19 Sep 2024 11:18:36 +0530 Subject: [PATCH] fix: make unhealthy signal send non-blocking (#32) --- internal/relay/relay.go | 21 ++++++++++++++++++--- internal/relay/source_pool.go | 7 ++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/internal/relay/relay.go b/internal/relay/relay.go index 606805b..6363da5 100644 --- a/internal/relay/relay.go +++ b/internal/relay/relay.go @@ -103,7 +103,12 @@ func (re *Relay) Start(globalCtx context.Context) error { // Start the consumer group worker by trigger a signal to the relay loop to fetch // a consumer worker to fetch initial healthy node. re.log.Info("starting consumer worker") - re.signalCh <- struct{}{} + // The push is non-blocking to avoid getting stuck trying to send on the poll loop + // if the threshold checker go-routine might have already sent on the channel concurrently. + select { + case re.signalCh <- struct{}{}: + default: + } wg.Add(1) // Relay teardown. @@ -179,7 +184,12 @@ loop: of, err := re.source.GetHighWatermark(ctx, server.Client) if err != nil { re.log.Error("could not get end offsets (first poll); sending unhealthy signal", "id", server.ID, "server", server.Config.BootstrapBrokers, "error", err) - re.signalCh <- struct{}{} + // The push is non-blocking to avoid getting stuck trying to send on the poll loop + // if the threshold checker go-routine might have already sent on the channel concurrently. + select { + case re.signalCh <- struct{}{}: + default: + } continue loop } @@ -197,7 +207,12 @@ loop: fetches, err := re.source.GetFetches(server) if err != nil { re.log.Error("marking server as unhealthy", "server", server.ID) - re.signalCh <- struct{}{} + // The push is non-blocking to avoid getting stuck trying to send on the poll loop + // if the threshold checker go-routine might have already sent on the channel concurrently. + select { + case re.signalCh <- struct{}{}: + default: + } continue loop } diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index 0f197bf..814c0ab 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -368,7 +368,12 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err sp.fetchCancel() // Signal the relay poll loop to start asking for a healthy client. - signal <- struct{}{} + // The push is non-blocking to avoid getting stuck trying to send on the poll loop + // if the poll loop's subsection (checking for errors) has already sent a signal + select { + case signal <- struct{}{}: + default: + } } } }