Skip to content

Commit

Permalink
feat: increase batch performance (#156)
Browse files Browse the repository at this point in the history
* feat: add rolling timer implementation for batch consumer (#155)

* feat: improve chunkMessages function in terms of cpu/mem (#153)

* feat: implement rolling timer in single consumer mode
  • Loading branch information
Abdulsametileri authored Dec 29, 2024
1 parent 6bda451 commit 1771050
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 110 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ integration-test:
## run-act: act for running github actions on your local machine
run-act:
act -j test --container-architecture linux/arm64

run-benchmarks:
go test -run none -bench . -benchtime=5s
74 changes: 51 additions & 23 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,42 @@ func (b *batchConsumer) Consume() {
func (b *batchConsumer) startBatch() {
defer b.wg.Done()

ticker := time.NewTicker(b.messageGroupDuration)
defer ticker.Stop()
flushTimer := time.NewTimer(b.messageGroupDuration)
defer flushTimer.Stop()

maximumMessageLimit := b.messageGroupLimit * b.concurrency
maximumMessageByteSizeLimit := b.messageGroupByteSizeLimit * b.concurrency

messages := make([]*Message, 0, maximumMessageLimit)
commitMessages := make([]kafka.Message, 0, maximumMessageLimit)
messageByteSize := 0

flushBatch := func(reason string) {
if len(messages) == 0 {
return
}

b.consume(&messages, &commitMessages, &messageByteSize)

b.logger.Debugf("[batchConsumer] Flushed batch, reason=%s", reason)

// After flushing, we always reset the timer
// But first we need to stop it and drain any event that might be pending
if !flushTimer.Stop() {
drainTimer(flushTimer)
}

// Now reset to start a new "rolling" interval
flushTimer.Reset(b.messageGroupDuration)
}

for {
select {
case <-ticker.C:
if len(messages) == 0 {
continue
}

b.consume(&messages, &commitMessages, &messageByteSize)
case <-flushTimer.C:
flushBatch("time-based (rolling timer)")
case msg, ok := <-b.incomingMessageStream:
if !ok {
flushBatch("channel-closed (final flush)")
close(b.batchConsumingStream)
close(b.messageProcessedStream)
return
Expand All @@ -117,7 +135,7 @@ func (b *batchConsumer) startBatch() {

// Check if there is an enough byte in batch, if not flush it.
if maximumMessageByteSizeLimit != 0 && messageByteSize+msgSize > maximumMessageByteSizeLimit {
b.consume(&messages, &commitMessages, &messageByteSize)
flushBatch("byte-size-limit")
}

messages = append(messages, msg.message)
Expand All @@ -126,7 +144,14 @@ func (b *batchConsumer) startBatch() {

// Check if there is an enough size in batch, if not flush it.
if len(messages) == maximumMessageLimit {
b.consume(&messages, &commitMessages, &messageByteSize)
flushBatch("message-count-limit")
} else {
// Rolling timer logic: reset the timer each time we get a new message
// Because we "stop" it, we might need to drain the channel
if !flushTimer.Stop() {
drainTimer(flushTimer)
}
flushTimer.Reset(b.messageGroupDuration)
}
}
}
Expand All @@ -144,33 +169,36 @@ func (b *batchConsumer) setupConcurrentWorkers() {
}
}

func chunkMessages(allMessages *[]*Message, chunkSize int, chunkByteSize int) [][]*Message {
func chunkMessagesOptimized(allMessages []*Message, chunkSize int, chunkByteSize int) [][]*Message {
if chunkSize <= 0 {
panic("chunkSize must be greater than 0")
}

var chunks [][]*Message
totalMessages := len(allMessages)
estimatedChunks := (totalMessages + chunkSize - 1) / chunkSize
chunks = make([][]*Message, 0, estimatedChunks)

allMessageList := *allMessages
var currentChunk []*Message
currentChunkSize := 0
currentChunk = make([]*Message, 0, chunkSize)
currentChunkBytes := 0

for _, message := range allMessageList {
for _, message := range allMessages {
messageByteSize := len(message.Value)

// Check if adding this message would exceed either the chunk size or the byte size
if len(currentChunk) >= chunkSize || (chunkByteSize != 0 && currentChunkBytes+messageByteSize > chunkByteSize) {
// Avoid too low chunkByteSize
if len(currentChunk) >= chunkSize || (chunkByteSize > 0 && currentChunkBytes+messageByteSize > chunkByteSize) {
if len(currentChunk) == 0 {
panic("invalid chunk byte size, please increase it")
panic(fmt.Sprintf("invalid chunk byte size (messageGroupByteSizeLimit) %d, "+
"message byte size is %d, bigger!, increase chunk byte size limit", chunkByteSize, messageByteSize))
}
// If it does, finalize the current chunk and start a new one
chunks = append(chunks, currentChunk)
currentChunk = []*Message{}
currentChunkSize = 0
currentChunk = make([]*Message, 0, chunkSize)
currentChunkBytes = 0
}

// Add the message to the current chunk
currentChunk = append(currentChunk, message)
currentChunkSize++
currentChunkBytes += messageByteSize
}

Expand All @@ -183,11 +211,11 @@ func chunkMessages(allMessages *[]*Message, chunkSize int, chunkByteSize int) []
}

func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message, messageByteSizeLimit *int) {
chunks := chunkMessages(allMessages, b.messageGroupLimit, b.messageGroupByteSizeLimit)
chunks := chunkMessagesOptimized(*allMessages, b.messageGroupLimit, b.messageGroupByteSizeLimit)

if b.preBatchFn != nil {
preBatchResult := b.preBatchFn(*allMessages)
chunks = chunkMessages(&preBatchResult, b.messageGroupLimit, b.messageGroupByteSizeLimit)
chunks = chunkMessagesOptimized(preBatchResult, b.messageGroupLimit, b.messageGroupByteSizeLimit)
}

// Send the messages to process
Expand Down
Loading

0 comments on commit 1771050

Please sign in to comment.