diff --git a/.gitignore b/.gitignore index e5129de..5313535 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea dist/ unit_coverage.out -unit_coverage.html \ No newline at end of file +unit_coverage.html +.DS_Store \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml index 8020ef1..5b30eb9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,7 +8,6 @@ linters: disable-all: true enable: - bodyclose - - depguard - errcheck - dupl - exhaustive @@ -43,6 +42,7 @@ issues: - path: _test\.go linters: - errcheck + - gosec - funlen service: diff --git a/README.md b/README.md index 97a89eb..2ddba47 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | | | `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | | `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | +| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | | `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | | `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | diff --git a/consumer_config.go b/consumer_config.go index 9c33450..d431348 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -76,6 +76,12 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel), } + if cfg.RetryConfiguration.SkipMessageByHeaderFn != nil { + cronsumerCfg.Consumer.SkipMessageByHeaderFn = func(headers []kcronsumer.Header) bool { + return cfg.RetryConfiguration.SkipMessageByHeaderFn(toHeaders(headers)) + } + } + if !cfg.RetryConfiguration.SASL.IsEmpty() { cronsumerCfg.SASL.Enabled = true cronsumerCfg.SASL.AuthType = string(cfg.RetryConfiguration.SASL.Type) @@ -110,18 +116,32 @@ type DistributedTracingConfiguration struct { Propagator propagation.TextMapPropagator } +type SkipMessageByHeaderFn func(headers []Header) bool + +func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header { + headers := make([]Header, 0, len(cronsumerHeaders)) + for i := range cronsumerHeaders { + headers = append(headers, Header{ + Key: cronsumerHeaders[i].Key, + Value: cronsumerHeaders[i].Value, + }) + } + return headers +} + type RetryConfiguration struct { - SASL *SASLConfig - TLS *TLSConfig - ClientID string - StartTimeCron string - Topic string - DeadLetterTopic string - Rack string - LogLevel LogLevel - Brokers []string - MaxRetry int - WorkDuration time.Duration + SASL *SASLConfig + TLS *TLSConfig + ClientID string + StartTimeCron string + Topic string + DeadLetterTopic string + Rack string + LogLevel LogLevel + Brokers []string + MaxRetry int + WorkDuration time.Duration + SkipMessageByHeaderFn SkipMessageByHeaderFn } type BatchConfiguration struct { diff --git a/consumer_config_test.go b/consumer_config_test.go index ab9c5c5..5855784 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -3,6 +3,9 @@ package kafka import ( "testing" "time" + + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "github.com/google/go-cmp/cmp" ) func TestConsumerConfig_validate(t *testing.T) { @@ -61,3 +64,77 @@ func TestConsumerConfig_validate(t *testing.T) { } }) } + +func TestConsumerConfig_newCronsumerConfig(t *testing.T) { + t.Run("Should_Return_Nil_When_Client_Don't_Use_SkipMessageByHeaderFn", func(t *testing.T) { + // Given + cfg := ConsumerConfig{} + + // When + actual := cfg.newCronsumerConfig() + + // Then + if actual.Consumer.SkipMessageByHeaderFn != nil { + t.Error("SkipMessageByHeaderFn must be nil") + } + }) + t.Run("Should_Set_When_Client_Give_SkipMessageByHeaderFn", func(t *testing.T) { + // Given + cfg := ConsumerConfig{ + RetryConfiguration: RetryConfiguration{ + SkipMessageByHeaderFn: func(headers []Header) bool { + return false + }, + }, + } + + // When + actual := cfg.newCronsumerConfig() + + // Then + if actual.Consumer.SkipMessageByHeaderFn == nil { + t.Error("SkipMessageByHeaderFn mustn't be nil") + } + }) +} + +func Test_toHeader(t *testing.T) { + t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Nil", func(t *testing.T) { + // When + headers := toHeaders(nil) + + // Then + if len(headers) != 0 { + t.Error("Header must be nil") + } + }) + t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Empty", func(t *testing.T) { + // When + headers := toHeaders([]kcronsumer.Header{}) + + // Then + if len(headers) != 0 { + t.Error("Header must be nil") + } + }) + t.Run("Should_Covert_List_When_Cronsumer_Header", func(t *testing.T) { + // Given + expected := []Header{ + {Key: "key", Value: []byte("val")}, + {Key: "key2", Value: []byte("val2")}, + {Key: "key3", Value: nil}, + } + + // When + actual := toHeaders([]kcronsumer.Header{ + {Key: "key", Value: []byte("val")}, + {Key: "key2", Value: []byte("val2")}, + {Key: "key3", Value: nil}, + }) + + // Then + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + }) +} diff --git a/go.mod b/go.mod index 168ae3a..0da3f85 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2 go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.4.6 + github.com/Trendyol/kafka-cronsumer v1.4.7 github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.50.0 diff --git a/go.sum b/go.sum index db33ecc..c7cba5b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY= -github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= +github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/test/integration/go.mod b/test/integration/go.mod index 321845b..89d79c1 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.4.6 // indirect + github.com/Trendyol/kafka-cronsumer v1.4.7 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 2bb34ea..b75f3d8 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,6 @@ github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY= github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=