From bdf33ef95d11db83ac6aa6dae5ec1c0b8f0d4d00 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Tue, 2 Apr 2024 11:09:31 +0300 Subject: [PATCH 1/5] feat: make Resume Pause as idempotent --- consumer_base.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/consumer_base.go b/consumer_base.go index d429ec5..76145ee 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -78,6 +78,7 @@ type base struct { distributedTracingEnabled bool consumerState state metricPrefix string + mu sync.Mutex } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -116,6 +117,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { consumerState: stateRunning, skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, metricPrefix: cfg.MetricPrefix, + mu: sync.Mutex{}, } if cfg.DistributedTracingEnabled { @@ -173,6 +175,7 @@ func (c *base) startConsume() { m := &kafka.Message{} err := c.r.FetchMessage(c.context, m) if err != nil { + c.logger.Debug("c.r.FetchMessage ", err.Error()) if c.context.Err() != nil { continue } @@ -203,7 +206,14 @@ func (c *base) startConsume() { } func (c *base) Pause() { - c.logger.Info("Consumer is paused!") + c.mu.Lock() + defer c.mu.Unlock() + + if c.consumerState == statePaused { + return + } + + c.logger.Infof("Consumer is paused!") c.cancelFn() @@ -213,6 +223,13 @@ func (c *base) Pause() { } func (c *base) Resume() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.consumerState == stateRunning { + return + } + c.logger.Info("Consumer is resumed!") c.pause = make(chan struct{}) From 3a9a202d1f0fa725602b32dd0eebd548fff16f22 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Tue, 2 Apr 2024 11:39:51 +0300 Subject: [PATCH 2/5] chore: add log pause/resume --- consumer_base.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consumer_base.go b/consumer_base.go index 76145ee..9894ee9 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -210,6 +210,7 @@ func (c *base) Pause() { defer c.mu.Unlock() if c.consumerState == statePaused { + c.logger.Debug("Consumer is already paused mode!") return } @@ -227,6 +228,7 @@ func (c *base) Resume() { defer c.mu.Unlock() if c.consumerState == stateRunning { + c.logger.Debug("Consumer is already running mode!") return } From 58a3234eeb9d3f14f5e48107d2a6c36547f0cae3 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Tue, 2 Apr 2024 11:53:42 +0300 Subject: [PATCH 3/5] chore: add description pause and resume --- consumer_base.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/consumer_base.go b/consumer_base.go index 9894ee9..95ad55d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -21,9 +21,13 @@ type Consumer interface { Consume() // Pause function pauses consumer, it is stop consuming new messages + // It works idempotent under the hood + // Calling with multiple goroutines is safe Pause() // Resume function resumes consumer, it is start to working + // It works idempotent under the hood + // Calling with multiple goroutines is safe Resume() // GetMetricCollectors for the purpose of making metric collectors available. From b5b5c0b77850e005193668100716ea2ac80495b8 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Wed, 3 Apr 2024 01:00:18 +0300 Subject: [PATCH 4/5] chore: add tests --- consumer_base_test.go | 147 +++++++++++++++++++++++++++++++----------- 1 file changed, 110 insertions(+), 37 deletions(-) diff --git a/consumer_base_test.go b/consumer_base_test.go index 2bd07c9..72b1c52 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -99,50 +99,123 @@ func Test_base_startConsume(t *testing.T) { } func Test_base_Pause(t *testing.T) { - // Given - ctx, cancelFn := context.WithCancel(context.Background()) - b := base{ - logger: NewZapLogger(LogLevelDebug), - pause: make(chan struct{}), - context: ctx, cancelFn: cancelFn, - consumerState: stateRunning, - } - go func() { - <-b.pause - }() + t.Run("Call_One_Goroutine", func(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + mu: sync.Mutex{}, + } + go func() { + <-b.pause + }() - // When - b.Pause() + // When + b.Pause() - // Then - if b.consumerState != statePaused { - t.Fatal("consumer state must be in paused") - } + // Then + if b.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } + }) + t.Run("Call_Multiple_Goroutine", func(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + mu: sync.Mutex{}, + } + go func() { + <-b.pause + }() + + // When + var wg sync.WaitGroup + wg.Add(2) + go func() { + b.Pause() + wg.Done() + }() + go func() { + b.Pause() + wg.Done() + }() + wg.Wait() + + // Then + if b.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } + }) } func Test_base_Resume(t *testing.T) { - // Given - mc := mockReader{} - ctx, cancelFn := context.WithCancel(context.Background()) - b := base{ - r: &mc, - logger: NewZapLogger(LogLevelDebug), - pause: make(chan struct{}), - quit: make(chan struct{}), - wg: sync.WaitGroup{}, - context: ctx, cancelFn: cancelFn, - } + t.Run("Call_One_Goroutine", func(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + mu: sync.Mutex{}, + } - // When - b.Resume() + // When + b.Resume() - // Then - if b.consumerState != stateRunning { - t.Fatal("consumer state must be in running") - } - if ctx == b.context { - t.Fatal("contexts must be differ!") - } + // Then + if b.consumerState != stateRunning { + t.Fatal("consumer state must be in running") + } + if ctx == b.context { + t.Fatal("contexts must be differ!") + } + }) + t.Run("Call_Multiple_Goroutine", func(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + mu: sync.Mutex{}, + } + + // When + var wg sync.WaitGroup + wg.Add(2) + go func() { + b.Resume() + wg.Done() + }() + go func() { + b.Resume() + wg.Done() + }() + wg.Wait() + + // Then + if b.consumerState != stateRunning { + t.Fatal("consumer state must be in running") + } + if ctx == b.context { + t.Fatal("contexts must be differ!") + } + }) } type mockReader struct { From 37a099654cc10257ee38fd3b8513d183ca8a2ece Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Wed, 3 Apr 2024 01:03:34 +0300 Subject: [PATCH 5/5] chore: fix test --- batch_consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch_consumer_test.go b/batch_consumer_test.go index d36643c..55d1684 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -96,7 +96,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) { messageProcessedStream: make(chan struct{}, 1), metric: &ConsumerMetric{}, wg: sync.WaitGroup{}, - messageGroupDuration: 500 * time.Millisecond, + messageGroupDuration: 20 * time.Second, r: &mc, concurrency: 1, },