Skip to content

Commit

Permalink
feat(producer): add MaxBufferBytes to limit retry buffer size (#3088)
Browse files Browse the repository at this point in the history
Signed-off-by: Wenli Wan <[email protected]>
  • Loading branch information
wanwenli authored Jan 28, 2025
1 parent 1759595 commit 26d7c7b
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 62 deletions.
58 changes: 41 additions & 17 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ import (
// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")

// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function.
// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit.
const minFunctionalRetryBufferLength = 4 * 1024
const (
// minFunctionalRetryBufferLength defines the minimum number of messages the retry buffer must support.
// If Producer.Retry.MaxBufferLength is set to a non-zero value below this limit, it will be adjusted to this value.
// This ensures the retry buffer remains functional under typical workloads.
minFunctionalRetryBufferLength = 4 * 1024
// minFunctionalRetryBufferBytes defines the minimum total byte size the retry buffer must support.
// If Producer.Retry.MaxBufferBytes is set to a non-zero value below this limit, it will be adjusted to this value.
// A 32 MB lower limit ensures sufficient capacity for retrying larger messages without exhausting resources.
minFunctionalRetryBufferBytes = 32 * 1024 * 1024
)

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
Expand Down Expand Up @@ -1214,11 +1221,22 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength {
maxBufferSize = minFunctionalRetryBufferLength
maxBufferLength := p.conf.Producer.Retry.MaxBufferLength
if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength {
maxBufferLength = minFunctionalRetryBufferLength
}

maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes
if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes {
maxBufferBytes = minFunctionalRetryBufferBytes
}

version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
}

var currentByteSize int64
var msg *ProducerMessage
buf := queue.New()

Expand All @@ -1229,7 +1247,8 @@ func (p *asyncProducer) retryHandler() {
select {
case msg = <-p.retries:
case p.input <- buf.Peek().(*ProducerMessage):
buf.Remove()
msgToRemove := buf.Remove().(*ProducerMessage)
currentByteSize -= int64(msgToRemove.ByteSize(version))
continue
}
}
Expand All @@ -1239,17 +1258,22 @@ func (p *asyncProducer) retryHandler() {
}

buf.Add(msg)
currentByteSize += int64(msg.ByteSize(version))

if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
msgToHandle := buf.Peek().(*ProducerMessage)
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
default:
buf.Remove()
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) {
continue
}

msgToHandle := buf.Peek().(*ProducerMessage)
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
currentByteSize -= int64(msgToHandle.ByteSize(version))
default:
buf.Remove()
currentByteSize -= int64(msgToHandle.ByteSize(version))
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
}
}
Expand Down
121 changes: 76 additions & 45 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package sarama

import (
"errors"
"github.com/stretchr/testify/assert"
"log"
"math"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -2176,7 +2178,7 @@ func TestTxnCanAbort(t *testing.T) {
require.NoError(t, err)
}

func TestPreventRetryBufferOverflow(t *testing.T) {
func TestProducerRetryBufferLimits(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()
topic := "test-topic"
Expand All @@ -2199,57 +2201,86 @@ func TestPreventRetryBufferOverflow(t *testing.T) {
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
config.Producer.Flush.MaxMessages = 1
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
config.Producer.Return.Successes = true

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
tests := []struct {
name string
configureBuffer func(*Config)
messageSize int
numMessages int
}{
{
name: "MaxBufferLength",
configureBuffer: func(config *Config) {
config.Producer.Flush.MaxMessages = 1
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
},
messageSize: 1, // Small message size
numMessages: 10000,
},
{
name: "MaxBufferBytes",
configureBuffer: func(config *Config) {
config.Producer.Flush.MaxMessages = 1
config.Producer.Retry.MaxBufferBytes = minFunctionalRetryBufferBytes
},
messageSize: 950 * 1024, // 950 KB
numMessages: 1000,
},
}

var (
wg sync.WaitGroup
successes, producerErrors int
errorFound bool
)

wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := NewTestConfig()
config.Producer.Return.Successes = true
tt.configureBuffer(config)

wg.Add(1)
go func() {
defer wg.Done()
for errMsg := range producer.Errors() {
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
errorFound = true
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
producerErrors++
}
}()

numMessages := 100000
for i := 0; i < numMessages; i++ {
kv := StringEncoder(strconv.Itoa(i))
producer.Input() <- &ProducerMessage{
Topic: topic,
Key: kv,
Value: kv,
Metadata: i,
}
}
var (
wg sync.WaitGroup
successes, producerErrors int
errorFound bool
)

wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for errMsg := range producer.Errors() {
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
errorFound = true
}
producerErrors++
}
}()

producer.AsyncClose()
wg.Wait()
longString := strings.Repeat("a", tt.messageSize)
val := StringEncoder(longString)

require.Equal(t, successes+producerErrors, numMessages, "Expected all messages to be processed")
require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow")
for i := 0; i < tt.numMessages; i++ {
msg := &ProducerMessage{
Topic: topic,
Value: val,
}
producer.Input() <- msg
}

producer.AsyncClose()
wg.Wait()

assert.Equal(t, successes+producerErrors, tt.numMessages, "Expected all messages to be processed")
assert.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow")
})
}
}

// This example shows how to use the producer while simultaneously
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ type Config struct {
// Any value between 0 and 4096 is pushed to 4096.
// A zero or negative value indicates unlimited.
MaxBufferLength int
// The maximum total byte size of messages in the bridging buffer between `input`
// and `retries` channels in AsyncProducer#retryHandler.
// This limit prevents the buffer from consuming excessive memory.
// Defaults to 0 for unlimited.
// Any value between 0 and 32 MB is pushed to 32 MB.
// A zero or negative value indicates unlimited.
MaxBufferBytes int64
}

// Interceptors to be called when the producer dispatcher reads the
Expand Down

0 comments on commit 26d7c7b

Please sign in to comment.