Skip to content

add configurable buffer sizes for Redis connections #3453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ OAuth
Azure
StreamingCredentialsProvider
oauth
entraid
entraid
MiB
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ func main() {
```
### Buffer Size Configuration
go-redis uses 0.5MiB read and write buffers by default for optimal performance. For high-throughput applications or large pipelines, you can customize buffer sizes:
```go
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
ReadBufferSize: 1024 * 1024, // 1MiB read buffer
WriteBufferSize: 1024 * 1024, // 1MiB write buffer
})
```
### Advanced Configuration
go-redis supports extending the client identification phase to allow projects to send their own custom client identification.
Expand Down
183 changes: 183 additions & 0 deletions internal/pool/buffer_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package pool_test

import (
"bufio"
"context"
"net"
"unsafe"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"

"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)

var _ = Describe("Buffer Size Configuration", func() {
var connPool *pool.ConnPool
ctx := context.Background()

AfterEach(func() {
if connPool != nil {
connPool.Close()
}
})

It("should use default buffer sizes when not specified", func() {
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolTimeout: 1000,
})

cn, err := connPool.NewConn(ctx)
Expect(err).NotTo(HaveOccurred())
defer connPool.CloseConn(cn)

// Check that default buffer sizes are used (0.5MiB)
writerBufSize := getWriterBufSizeUnsafe(cn)
readerBufSize := getReaderBufSizeUnsafe(cn)

Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
})

It("should use custom buffer sizes when specified", func() {
customReadSize := 32 * 1024 // 32KB
customWriteSize := 64 * 1024 // 64KB

connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolTimeout: 1000,
ReadBufferSize: customReadSize,
WriteBufferSize: customWriteSize,
})

cn, err := connPool.NewConn(ctx)
Expect(err).NotTo(HaveOccurred())
defer connPool.CloseConn(cn)

// Check that custom buffer sizes are used
writerBufSize := getWriterBufSizeUnsafe(cn)
readerBufSize := getReaderBufSizeUnsafe(cn)

Expect(writerBufSize).To(Equal(customWriteSize))
Expect(readerBufSize).To(Equal(customReadSize))
})

It("should handle zero buffer sizes by using defaults", func() {
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolTimeout: 1000,
ReadBufferSize: 0, // Should use default
WriteBufferSize: 0, // Should use default
})

cn, err := connPool.NewConn(ctx)
Expect(err).NotTo(HaveOccurred())
defer connPool.CloseConn(cn)

// Check that default buffer sizes are used (0.5MiB)
writerBufSize := getWriterBufSizeUnsafe(cn)
readerBufSize := getReaderBufSizeUnsafe(cn)

Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
})

It("should use 0.5MiB default buffer sizes for standalone NewConn", func() {
// Test that NewConn (without pool) also uses 0.5MiB defaults
netConn := newDummyConn()
cn := pool.NewConn(netConn)
defer cn.Close()

writerBufSize := getWriterBufSizeUnsafe(cn)
readerBufSize := getReaderBufSizeUnsafe(cn)

Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
})

It("should use 0.5MiB defaults even when pool is created directly without buffer sizes", func() {
// Test the scenario where someone creates a pool directly (like in tests)
// without setting ReadBufferSize and WriteBufferSize
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 1,
PoolTimeout: 1000,
// ReadBufferSize and WriteBufferSize are not set (will be 0)
})

cn, err := connPool.NewConn(ctx)
Expect(err).NotTo(HaveOccurred())
defer connPool.CloseConn(cn)

// Should still get 0.5MiB defaults because NewConnPool sets them
writerBufSize := getWriterBufSizeUnsafe(cn)
readerBufSize := getReaderBufSizeUnsafe(cn)

Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size
})
})

// Helper functions to extract buffer sizes using unsafe pointers
func getWriterBufSizeUnsafe(cn *pool.Conn) int {
cnPtr := (*struct {
usedAt int64
netConn net.Conn
rd *proto.Reader
bw *bufio.Writer
wr *proto.Writer
// ... other fields
})(unsafe.Pointer(cn))

if cnPtr.bw == nil {
return -1
}

bwPtr := (*struct {
err error
buf []byte
n int
wr interface{}
})(unsafe.Pointer(cnPtr.bw))

return len(bwPtr.buf)
}

func getReaderBufSizeUnsafe(cn *pool.Conn) int {
cnPtr := (*struct {
usedAt int64
netConn net.Conn
rd *proto.Reader
bw *bufio.Writer
wr *proto.Writer
// ... other fields
})(unsafe.Pointer(cn))

if cnPtr.rd == nil {
return -1
}

rdPtr := (*struct {
rd *bufio.Reader
})(unsafe.Pointer(cnPtr.rd))

if rdPtr.rd == nil {
return -1
}

bufReaderPtr := (*struct {
buf []byte
rd interface{}
r, w int
err error
lastByte int
lastRuneSize int
})(unsafe.Pointer(rdPtr.rd))

return len(bufReaderPtr.buf)
}
20 changes: 18 additions & 2 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,28 @@ type Conn struct {
}

func NewConn(netConn net.Conn) *Conn {
return NewConnWithBufferSize(netConn, proto.DefaultBufferSize, proto.DefaultBufferSize)
}

func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn {
cn := &Conn{
netConn: netConn,
createdAt: time.Now(),
}
cn.rd = proto.NewReader(netConn)
cn.bw = bufio.NewWriter(netConn)

// Use specified buffer sizes, or fall back to 0.5MiB defaults if 0
if readBufSize > 0 {
cn.rd = proto.NewReaderSize(netConn, readBufSize)
} else {
cn.rd = proto.NewReader(netConn) // Uses 0.5MiB default
}

if writeBufSize > 0 {
cn.bw = bufio.NewWriterSize(netConn, writeBufSize)
} else {
cn.bw = bufio.NewWriterSize(netConn, proto.DefaultBufferSize)
}

cn.wr = proto.NewWriter(cn.bw)
cn.SetUsedAt(time.Now())
return cn
Expand Down
5 changes: 4 additions & 1 deletion internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Options struct {
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

ReadBufferSize int
WriteBufferSize int
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -226,7 +229,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
return nil, err
}

cn := NewConn(netConn)
cn := NewConnWithBufferSize(netConn, p.cfg.ReadBufferSize, p.cfg.WriteBufferSize)
cn.pooled = pooled
return cn, nil
}
Expand Down
11 changes: 10 additions & 1 deletion internal/proto/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/redis/go-redis/v9/internal/util"
)

// DefaultBufferSize is the default size for read/write buffers (0.5MiB)
const DefaultBufferSize = 512 * 1024

// redis resp protocol data type.
const (
RespStatus = '+' // +<string>\r\n
Expand Down Expand Up @@ -58,7 +61,13 @@ type Reader struct {

func NewReader(rd io.Reader) *Reader {
return &Reader{
rd: bufio.NewReader(rd),
rd: bufio.NewReaderSize(rd, DefaultBufferSize),
}
}

func NewReaderSize(rd io.Reader, size int) *Reader {
return &Reader{
rd: bufio.NewReaderSize(rd, size),
}
}

Expand Down
23 changes: 23 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)

// Limiter is the interface of a rate limiter or a circuit breaker.
Expand Down Expand Up @@ -130,6 +131,20 @@ type Options struct {
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
ContextTimeoutEnabled bool

// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
// Larger buffers can improve performance for commands that return large responses.
// Smaller buffers can improve memory usage for larger pools.
//
// default: 0.5MiB (524288 bytes)
ReadBufferSize int

// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
// Larger buffers can improve performance for large pipelines and commands with many arguments.
// Smaller buffers can improve memory usage for larger pools.
//
// default: 0.5MiB (524288 bytes)
WriteBufferSize int

// PoolFIFO type of connection pool.
//
// - true for FIFO pool
Expand Down Expand Up @@ -241,6 +256,12 @@ func (opt *Options) init() {
if opt.PoolSize == 0 {
opt.PoolSize = 10 * runtime.GOMAXPROCS(0)
}
if opt.ReadBufferSize == 0 {
opt.ReadBufferSize = proto.DefaultBufferSize
}
if opt.WriteBufferSize == 0 {
opt.WriteBufferSize = proto.DefaultBufferSize
}
switch opt.ReadTimeout {
case -2:
opt.ReadTimeout = -1
Expand Down Expand Up @@ -592,5 +613,7 @@ func newConnPool(
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
ReadBufferSize: opt.ReadBufferSize,
WriteBufferSize: opt.WriteBufferSize,
})
}
22 changes: 22 additions & 0 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ type ClusterOptions struct {
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
// Larger buffers can improve performance for commands that return large responses.
// Smaller buffers can improve memory usage for larger pools.
//
// default: 0.5MiB (524288 bytes)
ReadBufferSize int

// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
// Larger buffers can improve performance for large pipelines and commands with many arguments.
// Smaller buffers can improve memory usage for larger pools.
//
// default: 0.5MiB (524288 bytes)
WriteBufferSize int

TLSConfig *tls.Config

// DisableIndentity - Disable set-lib on connect.
Expand Down Expand Up @@ -127,6 +141,12 @@ func (opt *ClusterOptions) init() {
if opt.PoolSize == 0 {
opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
}
if opt.ReadBufferSize == 0 {
opt.ReadBufferSize = proto.DefaultBufferSize
}
if opt.WriteBufferSize == 0 {
opt.WriteBufferSize = proto.DefaultBufferSize
}

switch opt.ReadTimeout {
case -1:
Expand Down Expand Up @@ -318,6 +338,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
ReadBufferSize: opt.ReadBufferSize,
WriteBufferSize: opt.WriteBufferSize,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIdentity,
IdentitySuffix: opt.IdentitySuffix,
Expand Down
Loading
Loading