diff --git a/batch_consumer.go b/batch_consumer.go index e97228a..e949e90 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -35,7 +35,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { return nil, err } - messageGroupByteSizeLimit, err := ResolveUnionIntOrStringValue(cfg.BatchConfiguration.MessageGroupByteSizeLimit) + messageGroupByteSizeLimit, err := resolveUnionIntOrStringValue(cfg.BatchConfiguration.MessageGroupByteSizeLimit) if err != nil { return nil, err } @@ -112,14 +112,18 @@ func (b *batchConsumer) startBatch() { return } - if maximumMessageByteSizeLimit != 0 && messageByteSize+len(msg.message.Value) > maximumMessageByteSizeLimit { + msgSize := msg.message.TotalSize() + + // Check if there is an enough byte in batch, if not flush it. + if maximumMessageByteSizeLimit != 0 && messageByteSize+msgSize > maximumMessageByteSizeLimit { b.consume(&messages, &commitMessages, &messageByteSize) } messages = append(messages, msg.message) commitMessages = append(commitMessages, *msg.kafkaMessage) - messageByteSize += len(msg.message.Value) + messageByteSize += msgSize + // Check if there is an enough size in batch, if not flush it. if len(messages) == maximumMessageLimit { b.consume(&messages, &commitMessages, &messageByteSize) } diff --git a/data_units.go b/data_units.go index 99f86e9..3b7b7ee 100644 --- a/data_units.go +++ b/data_units.go @@ -6,7 +6,7 @@ import ( "strings" ) -func ResolveUnionIntOrStringValue(input any) (int, error) { +func resolveUnionIntOrStringValue(input any) (int, error) { switch value := input.(type) { case int: return value, nil diff --git a/data_units_test.go b/data_units_test.go index 7856095..a9c762e 100644 --- a/data_units_test.go +++ b/data_units_test.go @@ -36,7 +36,7 @@ func TestDcp_ResolveConnectionBufferSize(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got, _ := ResolveUnionIntOrStringValue(tt.input); got != tt.want { + if got, _ := resolveUnionIntOrStringValue(tt.input); got != tt.want { t.Errorf("ResolveConnectionBufferSize() = %v, want %v", got, tt.want) } }) diff --git a/message.go b/message.go index 58cb114..8f9f09c 100644 --- a/message.go +++ b/message.go @@ -39,6 +39,27 @@ type Message struct { ErrDescription string } +func (msg *Message) TotalSize() int { + return 14 + msg.keySize() + msg.valueSize() + msg.headerSize() +} + +func (msg *Message) headerSize() int { + s := 0 + for _, header := range msg.Headers { + s += sizeofString(header.Key) + s += len(header.Value) + } + return s +} + +func (msg *Message) keySize() int { + return sizeofBytes(msg.Key) +} + +func (msg *Message) valueSize() int { + return 4 + len(msg.Value) +} + type IncomingMessage struct { kafkaMessage *kafka.Message message *Message @@ -157,3 +178,11 @@ func (m *Message) RemoveHeader(header Header) { } } } + +func sizeofBytes(b []byte) int { + return 4 + len(b) +} + +func sizeofString(s string) int { + return 2 + len(s) +}