diff --git a/batch_consumer_test.go b/batch_consumer_test.go index d36643c..55d1684 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -96,7 +96,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) { messageProcessedStream: make(chan struct{}, 1), metric: &ConsumerMetric{}, wg: sync.WaitGroup{}, - messageGroupDuration: 500 * time.Millisecond, + messageGroupDuration: 20 * time.Second, r: &mc, concurrency: 1, }, diff --git a/consumer_base.go b/consumer_base.go index d429ec5..95ad55d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -21,9 +21,13 @@ type Consumer interface { Consume() // Pause function pauses consumer, it is stop consuming new messages + // It works idempotent under the hood + // Calling with multiple goroutines is safe Pause() // Resume function resumes consumer, it is start to working + // It works idempotent under the hood + // Calling with multiple goroutines is safe Resume() // GetMetricCollectors for the purpose of making metric collectors available. @@ -78,6 +82,7 @@ type base struct { distributedTracingEnabled bool consumerState state metricPrefix string + mu sync.Mutex } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -116,6 +121,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { consumerState: stateRunning, skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, metricPrefix: cfg.MetricPrefix, + mu: sync.Mutex{}, } if cfg.DistributedTracingEnabled { @@ -173,6 +179,7 @@ func (c *base) startConsume() { m := &kafka.Message{} err := c.r.FetchMessage(c.context, m) if err != nil { + c.logger.Debug("c.r.FetchMessage ", err.Error()) if c.context.Err() != nil { continue } @@ -203,7 +210,15 @@ func (c *base) startConsume() { } func (c *base) Pause() { - c.logger.Info("Consumer is paused!") + c.mu.Lock() + defer c.mu.Unlock() + + if c.consumerState == statePaused { + c.logger.Debug("Consumer is already paused mode!") + return + } + + c.logger.Infof("Consumer is paused!") c.cancelFn() @@ -213,6 +228,14 @@ func (c *base) Pause() { } func (c *base) Resume() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.consumerState == stateRunning { + c.logger.Debug("Consumer is already running mode!") + return + } + c.logger.Info("Consumer is resumed!") c.pause = make(chan struct{}) diff --git a/consumer_base_test.go b/consumer_base_test.go index 2bd07c9..72b1c52 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -99,50 +99,123 @@ func Test_base_startConsume(t *testing.T) { } func Test_base_Pause(t *testing.T) { - // Given - ctx, cancelFn := context.WithCancel(context.Background()) - b := base{ - logger: NewZapLogger(LogLevelDebug), - pause: make(chan struct{}), - context: ctx, cancelFn: cancelFn, - consumerState: stateRunning, - } - go func() { - <-b.pause - }() + t.Run("Call_One_Goroutine", func(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + mu: sync.Mutex{}, + } + go func() { + <-b.pause + }() - // When - b.Pause() + // When + b.Pause() - // Then - if b.consumerState != statePaused { - t.Fatal("consumer state must be in paused") - } + // Then + if b.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } + }) + t.Run("Call_Multiple_Goroutine", func(t *testing.T) { + // Given + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + context: ctx, cancelFn: cancelFn, + consumerState: stateRunning, + mu: sync.Mutex{}, + } + go func() { + <-b.pause + }() + + // When + var wg sync.WaitGroup + wg.Add(2) + go func() { + b.Pause() + wg.Done() + }() + go func() { + b.Pause() + wg.Done() + }() + wg.Wait() + + // Then + if b.consumerState != statePaused { + t.Fatal("consumer state must be in paused") + } + }) } func Test_base_Resume(t *testing.T) { - // Given - mc := mockReader{} - ctx, cancelFn := context.WithCancel(context.Background()) - b := base{ - r: &mc, - logger: NewZapLogger(LogLevelDebug), - pause: make(chan struct{}), - quit: make(chan struct{}), - wg: sync.WaitGroup{}, - context: ctx, cancelFn: cancelFn, - } + t.Run("Call_One_Goroutine", func(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + mu: sync.Mutex{}, + } - // When - b.Resume() + // When + b.Resume() - // Then - if b.consumerState != stateRunning { - t.Fatal("consumer state must be in running") - } - if ctx == b.context { - t.Fatal("contexts must be differ!") - } + // Then + if b.consumerState != stateRunning { + t.Fatal("consumer state must be in running") + } + if ctx == b.context { + t.Fatal("contexts must be differ!") + } + }) + t.Run("Call_Multiple_Goroutine", func(t *testing.T) { + // Given + mc := mockReader{} + ctx, cancelFn := context.WithCancel(context.Background()) + b := base{ + r: &mc, + logger: NewZapLogger(LogLevelDebug), + pause: make(chan struct{}), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + context: ctx, cancelFn: cancelFn, + mu: sync.Mutex{}, + } + + // When + var wg sync.WaitGroup + wg.Add(2) + go func() { + b.Resume() + wg.Done() + }() + go func() { + b.Resume() + wg.Done() + }() + wg.Wait() + + // Then + if b.consumerState != stateRunning { + t.Fatal("consumer state must be in running") + } + if ctx == b.context { + t.Fatal("contexts must be differ!") + } + }) } type mockReader struct {