Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add skip message by header filter function for cronsumer #104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
dist/
unit_coverage.out
unit_coverage.html
unit_coverage.html
.DS_Store
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ linters:
disable-all: true
enable:
- bodyclose
- depguard
- errcheck
- dupl
- exhaustive
Expand Down Expand Up @@ -43,6 +42,7 @@ issues:
- path: _test\.go
linters:
- errcheck
- gosec
- funlen

service:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | |
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
Expand Down
42 changes: 31 additions & 11 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@
LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel),
}

if cfg.RetryConfiguration.SkipMessageByHeaderFn != nil {
cronsumerCfg.Consumer.SkipMessageByHeaderFn = func(headers []kcronsumer.Header) bool {
return cfg.RetryConfiguration.SkipMessageByHeaderFn(toHeaders(headers))

Check warning on line 81 in consumer_config.go

View check run for this annotation

Codecov / codecov/patch

consumer_config.go#L81

Added line #L81 was not covered by tests
}
}

if !cfg.RetryConfiguration.SASL.IsEmpty() {
cronsumerCfg.SASL.Enabled = true
cronsumerCfg.SASL.AuthType = string(cfg.RetryConfiguration.SASL.Type)
Expand Down Expand Up @@ -110,18 +116,32 @@
Propagator propagation.TextMapPropagator
}

type SkipMessageByHeaderFn func(headers []Header) bool

func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header {
headers := make([]Header, 0, len(cronsumerHeaders))
for i := range cronsumerHeaders {
headers = append(headers, Header{
Key: cronsumerHeaders[i].Key,
Value: cronsumerHeaders[i].Value,
})
}
return headers
}

type RetryConfiguration struct {
SASL *SASLConfig
TLS *TLSConfig
ClientID string
StartTimeCron string
Topic string
DeadLetterTopic string
Rack string
LogLevel LogLevel
Brokers []string
MaxRetry int
WorkDuration time.Duration
SASL *SASLConfig
TLS *TLSConfig
ClientID string
StartTimeCron string
Topic string
DeadLetterTopic string
Rack string
LogLevel LogLevel
Brokers []string
MaxRetry int
WorkDuration time.Duration
SkipMessageByHeaderFn SkipMessageByHeaderFn
}

type BatchConfiguration struct {
Expand Down
77 changes: 77 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package kafka
import (
"testing"
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/google/go-cmp/cmp"
)

func TestConsumerConfig_validate(t *testing.T) {
Expand Down Expand Up @@ -61,3 +64,77 @@ func TestConsumerConfig_validate(t *testing.T) {
}
})
}

func TestConsumerConfig_newCronsumerConfig(t *testing.T) {
t.Run("Should_Return_Nil_When_Client_Don't_Use_SkipMessageByHeaderFn", func(t *testing.T) {
// Given
cfg := ConsumerConfig{}

// When
actual := cfg.newCronsumerConfig()

// Then
if actual.Consumer.SkipMessageByHeaderFn != nil {
t.Error("SkipMessageByHeaderFn must be nil")
}
})
t.Run("Should_Set_When_Client_Give_SkipMessageByHeaderFn", func(t *testing.T) {
// Given
cfg := ConsumerConfig{
RetryConfiguration: RetryConfiguration{
SkipMessageByHeaderFn: func(headers []Header) bool {
return false
},
},
}

// When
actual := cfg.newCronsumerConfig()

// Then
if actual.Consumer.SkipMessageByHeaderFn == nil {
t.Error("SkipMessageByHeaderFn mustn't be nil")
}
})
}

func Test_toHeader(t *testing.T) {
t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Nil", func(t *testing.T) {
// When
headers := toHeaders(nil)

// Then
if len(headers) != 0 {
t.Error("Header must be nil")
}
})
t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Empty", func(t *testing.T) {
// When
headers := toHeaders([]kcronsumer.Header{})

// Then
if len(headers) != 0 {
t.Error("Header must be nil")
}
})
t.Run("Should_Covert_List_When_Cronsumer_Header", func(t *testing.T) {
// Given
expected := []Header{
{Key: "key", Value: []byte("val")},
{Key: "key2", Value: []byte("val2")},
{Key: "key3", Value: nil},
}

// When
actual := toHeaders([]kcronsumer.Header{
{Key: "key", Value: []byte("val")},
{Key: "key2", Value: []byte("val2")},
{Key: "key3", Value: nil},
})

// Then
if diff := cmp.Diff(expected, actual); diff != "" {
t.Error(diff)
}
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2
go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.4.6
github.com/Trendyol/kafka-cronsumer v1.4.7
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.50.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY=
github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY=
github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.4.6 // indirect
github.com/Trendyol/kafka-cronsumer v1.4.7 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY=
github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
Loading