From 311a6807633fd27dba23b010f0f9ea693058821c Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Thu, 9 May 2024 12:28:48 +0530 Subject: [PATCH] fix: batch channel should be buffered --- internal/relay/relay.go | 18 +++++++++++------- internal/relay/target.go | 17 ++++++++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/internal/relay/relay.go b/internal/relay/relay.go index e029a50..31f22dd 100644 --- a/internal/relay/relay.go +++ b/internal/relay/relay.go @@ -244,17 +244,21 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error { return nil } - // Queue the message for writing to target. - select { - case <-ctx.Done(): - return ctx.Err() - - case re.target.GetBatchCh() <- &kgo.Record{ + msg := &kgo.Record{ Key: rec.Key, Value: rec.Value, Topic: t.TargetTopic, Partition: rec.Partition, - }: + } + + // Queue the message for writing to target. + select { + case <-ctx.Done(): + return ctx.Err() + case re.target.GetBatchCh() <- msg: + default: + re.log.Error("target inlet channel blocked") + re.target.GetBatchCh() <- msg } return nil diff --git a/internal/relay/target.go b/internal/relay/target.go index 7fdf2de..0773012 100644 --- a/internal/relay/target.go +++ b/internal/relay/target.go @@ -34,8 +34,11 @@ type Target struct { // Map of target topics and their config. targetTopics Topics - batchCh chan *kgo.Record - batch []*kgo.Record + // Inlet receives relayed messages into target for batching + inletCh chan *kgo.Record + + // Holds the active batch that is produced to destination topic + batch []*kgo.Record } // NewTarget returns a new producer relay that handles target Kafka instances. @@ -49,7 +52,7 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic targetTopics: topics, batch: make([]*kgo.Record, 0, pCfg.BatchSize), - batchCh: make(chan *kgo.Record), + inletCh: make(chan *kgo.Record, pCfg.BatchSize*10), } // Initialize the actual Kafka client. @@ -73,12 +76,12 @@ func (tg *Target) Close() { // CloseBatchCh closes the Producer batch channel. func (tg *Target) CloseBatchCh() { - close(tg.batchCh) + close(tg.inletCh) } // GetBatchCh returns the Producer batch channel. func (tg *Target) GetBatchCh() chan *kgo.Record { - return tg.batchCh + return tg.inletCh } // prepareRecord checks if custom topic partition mapping is defined. @@ -111,7 +114,7 @@ func (tg *Target) Start(ctx context.Context) error { return ctx.Err() // Queue the message to and flush if the batch size is reached. - case msg, ok := <-tg.batchCh: + case msg, ok := <-tg.inletCh: if !ok { // Flush and cleanup on exit. if err := tg.drain(); err != nil { @@ -246,7 +249,7 @@ outerLoop: // drain drains and flushes any pending messages in the producer. func (tg *Target) drain() error { now := time.Now() - for rec := range tg.batchCh { + for rec := range tg.inletCh { tg.prepareRecord(rec) tg.batch = append(tg.batch, rec) }