From b0bb6cec8e41edb37f7ee07062f452760609d85d Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 30 Jul 2025 16:43:48 +0300 Subject: [PATCH 1/3] add configurable buffer sizes for Redis connections --- README.md | 12 ++ internal/pool/buffer_size_test.go | 183 ++++++++++++++++++++++++++++++ internal/pool/conn.go | 20 +++- internal/pool/pool.go | 5 +- internal/proto/reader.go | 11 +- options.go | 21 ++++ osscluster.go | 20 ++++ ring.go | 22 ++++ 8 files changed, 290 insertions(+), 4 deletions(-) create mode 100644 internal/pool/buffer_size_test.go diff --git a/README.md b/README.md index c37a52ec70..356870b174 100644 --- a/README.md +++ b/README.md @@ -297,6 +297,18 @@ func main() { ``` +### Buffer Size Configuration + +go-redis uses 0.5MiB read and write buffers by default for optimal performance. For high-throughput applications or large pipelines, you can customize buffer sizes: + +```go +rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + ReadBufferSize: 1024 * 1024, // 1MiB read buffer + WriteBufferSize: 1024 * 1024, // 1MiB write buffer +}) +``` + ### Advanced Configuration go-redis supports extending the client identification phase to allow projects to send their own custom client identification. diff --git a/internal/pool/buffer_size_test.go b/internal/pool/buffer_size_test.go new file mode 100644 index 0000000000..a54230102a --- /dev/null +++ b/internal/pool/buffer_size_test.go @@ -0,0 +1,183 @@ +package pool_test + +import ( + "bufio" + "context" + "net" + "unsafe" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" +) + +var _ = Describe("Buffer Size Configuration", func() { + var connPool *pool.ConnPool + ctx := context.Background() + + AfterEach(func() { + if connPool != nil { + connPool.Close() + } + }) + + It("should use default buffer sizes when not specified", func() { + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Check that default buffer sizes are used (0.5MiB) + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) + + It("should use custom buffer sizes when specified", func() { + customReadSize := 32 * 1024 // 32KB + customWriteSize := 64 * 1024 // 64KB + + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + ReadBufferSize: customReadSize, + WriteBufferSize: customWriteSize, + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Check that custom buffer sizes are used + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(customWriteSize)) + Expect(readerBufSize).To(Equal(customReadSize)) + }) + + It("should handle zero buffer sizes by using defaults", func() { + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + ReadBufferSize: 0, // Should use default + WriteBufferSize: 0, // Should use default + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Check that default buffer sizes are used (0.5MiB) + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) + + It("should use 0.5MiB default buffer sizes for standalone NewConn", func() { + // Test that NewConn (without pool) also uses 0.5MiB defaults + netConn := newDummyConn() + cn := pool.NewConn(netConn) + defer cn.Close() + + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) + + It("should use 0.5MiB defaults even when pool is created directly without buffer sizes", func() { + // Test the scenario where someone creates a pool directly (like in tests) + // without setting ReadBufferSize and WriteBufferSize + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 1, + PoolTimeout: 1000, + // ReadBufferSize and WriteBufferSize are not set (will be 0) + }) + + cn, err := connPool.NewConn(ctx) + Expect(err).NotTo(HaveOccurred()) + defer connPool.CloseConn(cn) + + // Should still get 0.5MiB defaults because NewConnPool sets them + writerBufSize := getWriterBufSizeUnsafe(cn) + readerBufSize := getReaderBufSizeUnsafe(cn) + + Expect(writerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + Expect(readerBufSize).To(Equal(proto.DefaultBufferSize)) // Default 0.5MiB buffer size + }) +}) + +// Helper functions to extract buffer sizes using unsafe pointers +func getWriterBufSizeUnsafe(cn *pool.Conn) int { + cnPtr := (*struct { + usedAt int64 + netConn net.Conn + rd *proto.Reader + bw *bufio.Writer + wr *proto.Writer + // ... other fields + })(unsafe.Pointer(cn)) + + if cnPtr.bw == nil { + return -1 + } + + bwPtr := (*struct { + err error + buf []byte + n int + wr interface{} + })(unsafe.Pointer(cnPtr.bw)) + + return len(bwPtr.buf) +} + +func getReaderBufSizeUnsafe(cn *pool.Conn) int { + cnPtr := (*struct { + usedAt int64 + netConn net.Conn + rd *proto.Reader + bw *bufio.Writer + wr *proto.Writer + // ... other fields + })(unsafe.Pointer(cn)) + + if cnPtr.rd == nil { + return -1 + } + + rdPtr := (*struct { + rd *bufio.Reader + })(unsafe.Pointer(cnPtr.rd)) + + if rdPtr.rd == nil { + return -1 + } + + bufReaderPtr := (*struct { + buf []byte + rd interface{} + r, w int + err error + lastByte int + lastRuneSize int + })(unsafe.Pointer(rdPtr.rd)) + + return len(bufReaderPtr.buf) +} diff --git a/internal/pool/conn.go b/internal/pool/conn.go index c1087b401a..989ab10d23 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -28,12 +28,28 @@ type Conn struct { } func NewConn(netConn net.Conn) *Conn { + return NewConnWithBufferSize(netConn, proto.DefaultBufferSize, proto.DefaultBufferSize) +} + +func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn { cn := &Conn{ netConn: netConn, createdAt: time.Now(), } - cn.rd = proto.NewReader(netConn) - cn.bw = bufio.NewWriter(netConn) + + // Use specified buffer sizes, or fall back to 0.5MiB defaults if 0 + if readBufSize > 0 { + cn.rd = proto.NewReaderSize(netConn, readBufSize) + } else { + cn.rd = proto.NewReader(netConn) // Uses 0.5MiB default + } + + if writeBufSize > 0 { + cn.bw = bufio.NewWriterSize(netConn, writeBufSize) + } else { + cn.bw = bufio.NewWriterSize(netConn, proto.DefaultBufferSize) + } + cn.wr = proto.NewWriter(cn.bw) cn.SetUsedAt(time.Now()) return cn diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 3ee3dea6d8..6d3381c9fa 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -71,6 +71,9 @@ type Options struct { MaxActiveConns int ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration + + ReadBufferSize int + WriteBufferSize int } type lastDialErrorWrap struct { @@ -226,7 +229,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { return nil, err } - cn := NewConn(netConn) + cn := NewConnWithBufferSize(netConn, p.cfg.ReadBufferSize, p.cfg.WriteBufferSize) cn.pooled = pooled return cn, nil } diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 8d23817fe8..a447809989 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -12,6 +12,9 @@ import ( "github.com/redis/go-redis/v9/internal/util" ) +// DefaultBufferSize is the default size for read/write buffers (0.5MiB) +const DefaultBufferSize = 512 * 1024 + // redis resp protocol data type. const ( RespStatus = '+' // +\r\n @@ -58,7 +61,13 @@ type Reader struct { func NewReader(rd io.Reader) *Reader { return &Reader{ - rd: bufio.NewReader(rd), + rd: bufio.NewReaderSize(rd, DefaultBufferSize), + } +} + +func NewReaderSize(rd io.Reader, size int) *Reader { + return &Reader{ + rd: bufio.NewReaderSize(rd, size), } } diff --git a/options.go b/options.go index b87a234a41..d717e56704 100644 --- a/options.go +++ b/options.go @@ -15,6 +15,7 @@ import ( "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" ) // Limiter is the interface of a rate limiter or a circuit breaker. @@ -130,6 +131,18 @@ type Options struct { // See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts ContextTimeoutEnabled bool + // ReadBufferSize is the size of the bufio.Reader buffer for each connection. + // Larger buffers can improve performance for commands that return large responses. + // + // default: 0.5MiB (524288 bytes) + ReadBufferSize int + + // WriteBufferSize is the size of the bufio.Writer buffer for each connection. + // Larger buffers can improve performance for large pipelines and commands with many arguments. + // + // default: 0.5MiB (524288 bytes) + WriteBufferSize int + // PoolFIFO type of connection pool. // // - true for FIFO pool @@ -241,6 +254,12 @@ func (opt *Options) init() { if opt.PoolSize == 0 { opt.PoolSize = 10 * runtime.GOMAXPROCS(0) } + if opt.ReadBufferSize == 0 { + opt.ReadBufferSize = proto.DefaultBufferSize + } + if opt.WriteBufferSize == 0 { + opt.WriteBufferSize = proto.DefaultBufferSize + } switch opt.ReadTimeout { case -2: opt.ReadTimeout = -1 @@ -592,5 +611,7 @@ func newConnPool( MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, }) } diff --git a/osscluster.go b/osscluster.go index 0526022ba0..197bf97890 100644 --- a/osscluster.go +++ b/osscluster.go @@ -92,6 +92,18 @@ type ClusterOptions struct { ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration + // ReadBufferSize is the size of the bufio.Reader buffer for each connection. + // Larger buffers can improve performance for commands that return large responses. + // + // default: 0.5MiB (524288 bytes) + ReadBufferSize int + + // WriteBufferSize is the size of the bufio.Writer buffer for each connection. + // Larger buffers can improve performance for large pipelines and commands with many arguments. + // + // default: 0.5MiB (524288 bytes) + WriteBufferSize int + TLSConfig *tls.Config // DisableIndentity - Disable set-lib on connect. @@ -127,6 +139,12 @@ func (opt *ClusterOptions) init() { if opt.PoolSize == 0 { opt.PoolSize = 5 * runtime.GOMAXPROCS(0) } + if opt.ReadBufferSize == 0 { + opt.ReadBufferSize = proto.DefaultBufferSize + } + if opt.WriteBufferSize == 0 { + opt.WriteBufferSize = proto.DefaultBufferSize + } switch opt.ReadTimeout { case -1: @@ -318,6 +336,8 @@ func (opt *ClusterOptions) clientOptions() *Options { MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, DisableIdentity: opt.DisableIdentity, DisableIndentity: opt.DisableIdentity, IdentitySuffix: opt.IdentitySuffix, diff --git a/ring.go b/ring.go index 0c15660197..3a8ab750fd 100644 --- a/ring.go +++ b/ring.go @@ -18,6 +18,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/hashtag" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/rand" ) @@ -113,6 +114,18 @@ type RingOptions struct { ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration + // ReadBufferSize is the size of the bufio.Reader buffer for each connection. + // Larger buffers can improve performance for commands that return large responses. + // + // default: 0.5MiB (524288 bytes) + ReadBufferSize int + + // WriteBufferSize is the size of the bufio.Writer buffer for each connection. + // Larger buffers can improve performance for large pipelines and commands with many arguments. + // + // default: 0.5MiB (524288 bytes) + WriteBufferSize int + TLSConfig *tls.Config Limiter Limiter @@ -164,6 +177,13 @@ func (opt *RingOptions) init() { case 0: opt.MaxRetryBackoff = 512 * time.Millisecond } + + if opt.ReadBufferSize == 0 { + opt.ReadBufferSize = proto.DefaultBufferSize + } + if opt.WriteBufferSize == 0 { + opt.WriteBufferSize = proto.DefaultBufferSize + } } func (opt *RingOptions) clientOptions() *Options { @@ -195,6 +215,8 @@ func (opt *RingOptions) clientOptions() *Options { MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, + ReadBufferSize: opt.ReadBufferSize, + WriteBufferSize: opt.WriteBufferSize, TLSConfig: opt.TLSConfig, Limiter: opt.Limiter, From c4cc80bfc79166db1a8eed19ced6616e36a24bc4 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 30 Jul 2025 16:49:25 +0300 Subject: [PATCH 2/3] add MiB to wordlist --- .github/wordlist.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/wordlist.txt b/.github/wordlist.txt index a922d99bac..e0c73eb506 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -73,4 +73,5 @@ OAuth Azure StreamingCredentialsProvider oauth -entraid \ No newline at end of file +entraid +MiB \ No newline at end of file From c2fd8dc1983f92bb93cba668541f29c7646b3586 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Sun, 3 Aug 2025 17:19:43 +0300 Subject: [PATCH 3/3] Add description for buffer size parameter --- options.go | 2 ++ osscluster.go | 2 ++ ring.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/options.go b/options.go index d717e56704..2ce807e4c6 100644 --- a/options.go +++ b/options.go @@ -133,12 +133,14 @@ type Options struct { // ReadBufferSize is the size of the bufio.Reader buffer for each connection. // Larger buffers can improve performance for commands that return large responses. + // Smaller buffers can improve memory usage for larger pools. // // default: 0.5MiB (524288 bytes) ReadBufferSize int // WriteBufferSize is the size of the bufio.Writer buffer for each connection. // Larger buffers can improve performance for large pipelines and commands with many arguments. + // Smaller buffers can improve memory usage for larger pools. // // default: 0.5MiB (524288 bytes) WriteBufferSize int diff --git a/osscluster.go b/osscluster.go index 197bf97890..ad654821d1 100644 --- a/osscluster.go +++ b/osscluster.go @@ -94,12 +94,14 @@ type ClusterOptions struct { // ReadBufferSize is the size of the bufio.Reader buffer for each connection. // Larger buffers can improve performance for commands that return large responses. + // Smaller buffers can improve memory usage for larger pools. // // default: 0.5MiB (524288 bytes) ReadBufferSize int // WriteBufferSize is the size of the bufio.Writer buffer for each connection. // Larger buffers can improve performance for large pipelines and commands with many arguments. + // Smaller buffers can improve memory usage for larger pools. // // default: 0.5MiB (524288 bytes) WriteBufferSize int diff --git a/ring.go b/ring.go index 3a8ab750fd..0d73e0101c 100644 --- a/ring.go +++ b/ring.go @@ -116,12 +116,14 @@ type RingOptions struct { // ReadBufferSize is the size of the bufio.Reader buffer for each connection. // Larger buffers can improve performance for commands that return large responses. + // Smaller buffers can improve memory usage for larger pools. // // default: 0.5MiB (524288 bytes) ReadBufferSize int // WriteBufferSize is the size of the bufio.Writer buffer for each connection. // Larger buffers can improve performance for large pipelines and commands with many arguments. + // Smaller buffers can improve memory usage for larger pools. // // default: 0.5MiB (524288 bytes) WriteBufferSize int