From b210f832f9296c8625d288b810de06cda9a7e881 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Fri, 16 Feb 2024 00:37:32 +0300 Subject: [PATCH] feat: add header filter feature --- consumer_base.go | 10 ++++ consumer_base_test.go | 39 ++++++++++++++ consumer_config.go | 5 +- examples/with-header-filter-consumer/main.go | 47 ++++++++++++++++ test/integration/go.sum | 3 +- test/integration/integration_test.go | 57 ++++++++++++++++++++ 6 files changed, 157 insertions(+), 4 deletions(-) create mode 100644 examples/with-header-filter-consumer/main.go diff --git a/consumer_base.go b/consumer_base.go index fcb7cf2..35f057b 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -54,6 +54,7 @@ type base struct { context context.Context r Reader cancelFn context.CancelFunc + skipMessageByHeaderFn SkipMessageByHeaderFn metric *ConsumerMetric pause chan struct{} quit chan struct{} @@ -107,6 +108,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { singleConsumingStream: make(chan *Message, cfg.Concurrency), batchConsumingStream: make(chan []*Message, cfg.Concurrency), consumerState: stateRunning, + skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, } if cfg.DistributedTracingEnabled { @@ -159,6 +161,14 @@ func (c *base) startConsume() { continue } + if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) { + c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", (*m).Headers) + if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil { + c.logger.Errorf("Commit Error %s,", err.Error()) + } + continue + } + incomingMessage := &IncomingMessage{ kafkaMessage: m, message: fromKafkaMessage(m), diff --git a/consumer_base_test.go b/consumer_base_test.go index 65f60e0..602006f 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -57,6 +57,45 @@ func Test_base_startConsume(t *testing.T) { t.Error(diff) } }) + + t.Run("Skip_Incoming_Messages_When_SkipMessageByHeaderFn_Is_Applied", func(t *testing.T) { + // Given + mc := mockReader{} + skipMessageCh := make(chan struct{}) + b := base{ + wg: sync.WaitGroup{}, + r: &mc, + logger: NewZapLogger(LogLevelError), + incomingMessageStream: make(chan *IncomingMessage), + skipMessageByHeaderFn: func(header []kafka.Header) bool { + defer func() { + skipMessageCh <- struct{}{} + }() + + for _, h := range header { + if h.Key == "header" { + return true + } + } + return false + }, + } + + b.wg.Add(1) + + // When + go b.startConsume() + + // Then + <-skipMessageCh + + // assert incomingMessageStream does not receive any value because message is skipped + select { + case <-b.incomingMessageStream: + t.Fatal("incoming message stream must equal to 0") + case <-time.After(1 * time.Second): + } + }) } func Test_base_Pause(t *testing.T) { diff --git a/consumer_config.go b/consumer_config.go index d431348..a09436f 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -21,6 +21,8 @@ type PreBatchFn func([]*Message) []*Message type ConsumeFn func(*Message) error +type SkipMessageByHeaderFn func(header []kafka.Header) bool + type DialConfig struct { Timeout time.Duration KeepAlive time.Duration @@ -36,6 +38,7 @@ type ConsumerConfig struct { Dial *DialConfig BatchConfiguration *BatchConfiguration ConsumeFn ConsumeFn + SkipMessageByHeaderFn SkipMessageByHeaderFn TransactionalRetry *bool RetryConfiguration RetryConfiguration LogLevel LogLevel @@ -116,8 +119,6 @@ type DistributedTracingConfiguration struct { Propagator propagation.TextMapPropagator } -type SkipMessageByHeaderFn func(headers []Header) bool - func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header { headers := make([]Header, 0, len(cronsumerHeaders)) for i := range cronsumerHeaders { diff --git a/examples/with-header-filter-consumer/main.go b/examples/with-header-filter-consumer/main.go new file mode 100644 index 0000000..0c699cb --- /dev/null +++ b/examples/with-header-filter-consumer/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" + "os" + "os/signal" +) + +func main() { + consumerCfg := &kafka.ConsumerConfig{ + Concurrency: 1, + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + RetryEnabled: false, + SkipMessageByHeaderFn: skipMessageByHeaderFn, + ConsumeFn: consumeFn, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func skipMessageByHeaderFn(headers []kafka.Header) bool { + for _, header := range headers { + if header.Key == "SkipMessage" { + return true + } + } + return false +} + +func consumeFn(message *kafka.Message) error { + fmt.Printf("Message From %s with value %s\n", message.Topic, string(message.Value)) + return nil +} diff --git a/test/integration/go.sum b/test/integration/go.sum index b75f3d8..5eb6b0d 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,4 @@ -github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY= -github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index f4b4295..a7733f1 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -436,6 +436,63 @@ func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) { } } +func Test_Should_Skip_Message_When_Header_Filter_Given(t *testing.T) { + // Given + topic := "header-filter-topic" + consumerGroup := "header-filter-cg" + brokerAddress := "localhost:9092" + + incomingMessage := []segmentio.Message{ + { + Topic: topic, + Headers: []segmentio.Header{ + {Key: "SkipMessage", Value: []byte("any")}, + }, + Key: []byte("1"), + Value: []byte(`foo`), + }, + } + + _, cleanUp := createTopicAndWriteMessages(t, topic, incomingMessage) + defer cleanUp() + + consumeCh := make(chan struct{}) + skipMessageCh := make(chan struct{}) + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + SkipMessageByHeaderFn: func(header []kafka.Header) bool { + defer func() { + skipMessageCh <- struct{}{} + }() + for _, h := range header { + if h.Key == "SkipMessage" { + return true + } + } + return false + }, + ConsumeFn: func(message *kafka.Message) error { + consumeCh <- struct{}{} + return nil + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + <-skipMessageCh + + select { + case <-consumeCh: + t.Fatal("Message must be skipped! consumeCh mustn't receive any value") + case <-time.After(1 * time.Second): + } +} + func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) { t.Helper()