Skip to content

Commit

Permalink
feature: refactor the code
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed May 28, 2024
1 parent ec0ebc6 commit 41cda82
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 5 deletions.
10 changes: 7 additions & 3 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return nil, err
}

messageGroupByteSizeLimit, err := ResolveUnionIntOrStringValue(cfg.BatchConfiguration.MessageGroupByteSizeLimit)
messageGroupByteSizeLimit, err := resolveUnionIntOrStringValue(cfg.BatchConfiguration.MessageGroupByteSizeLimit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,14 +112,18 @@ func (b *batchConsumer) startBatch() {
return
}

if maximumMessageByteSizeLimit != 0 && messageByteSize+len(msg.message.Value) > maximumMessageByteSizeLimit {
msgSize := msg.message.TotalSize()

// Check if there is an enough byte in batch, if not flush it.
if maximumMessageByteSizeLimit != 0 && messageByteSize+msgSize > maximumMessageByteSizeLimit {
b.consume(&messages, &commitMessages, &messageByteSize)
}

messages = append(messages, msg.message)
commitMessages = append(commitMessages, *msg.kafkaMessage)
messageByteSize += len(msg.message.Value)
messageByteSize += msgSize

// Check if there is an enough size in batch, if not flush it.
if len(messages) == maximumMessageLimit {
b.consume(&messages, &commitMessages, &messageByteSize)
}
Expand Down
2 changes: 1 addition & 1 deletion data_units.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
)

func ResolveUnionIntOrStringValue(input any) (int, error) {
func resolveUnionIntOrStringValue(input any) (int, error) {
switch value := input.(type) {
case int:
return value, nil
Expand Down
2 changes: 1 addition & 1 deletion data_units_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDcp_ResolveConnectionBufferSize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got, _ := ResolveUnionIntOrStringValue(tt.input); got != tt.want {
if got, _ := resolveUnionIntOrStringValue(tt.input); got != tt.want {
t.Errorf("ResolveConnectionBufferSize() = %v, want %v", got, tt.want)
}
})
Expand Down
29 changes: 29 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,27 @@ type Message struct {
ErrDescription string
}

func (msg *Message) TotalSize() int {
return 14 + msg.keySize() + msg.valueSize() + msg.headerSize()
}

func (msg *Message) headerSize() int {
s := 0
for _, header := range msg.Headers {
s += sizeofString(header.Key)
s += len(header.Value)
}
return s
}

func (msg *Message) keySize() int {
return sizeofBytes(msg.Key)
}

func (msg *Message) valueSize() int {
return 4 + len(msg.Value)
}

type IncomingMessage struct {
kafkaMessage *kafka.Message
message *Message
Expand Down Expand Up @@ -157,3 +178,11 @@ func (m *Message) RemoveHeader(header Header) {
}
}
}

func sizeofBytes(b []byte) int {
return 4 + len(b)
}

func sizeofString(s string) int {
return 2 + len(s)
}

0 comments on commit 41cda82

Please sign in to comment.