Skip to content
Merged
58 changes: 36 additions & 22 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ const (
serverSide
)

// maxWriteBufSize is the maximum length (number of elements) the cached
// writeBuf can grow to. The length depends on the number of buffers
// contained within the BufferSlice produced by the codec, which is
// generally small.
//
// If a writeBuf larger than this limit is required, it will be allocated
// and freed after use, rather than being cached. This avoids holding
// on to large amounts of memory.
const maxWriteBufSize = 64

// Loopy receives frames from the control buffer.
// Each frame is handled individually; most of the work done by loopy goes
// into handling data frames. Loopy maintains a queue of active streams, and each
Expand Down Expand Up @@ -530,6 +540,8 @@ type loopyWriter struct {

// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)

writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
}

func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
Expand Down Expand Up @@ -962,11 +974,11 @@ func (l *loopyWriter) processData() (bool, error) {

if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
_ = reader.Close()
reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
Expand Down Expand Up @@ -999,25 +1011,20 @@ func (l *loopyWriter) processData() (bool, error) {
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
size := hSize + dSize

var buf *[]byte

if hSize != 0 && dSize == 0 {
buf = &dataItem.h
} else {
// Note: this is only necessary because the http2.Framer does not support
// partially writing a frame, so the sequence must be materialized into a buffer.
// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
pool := l.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
l.writeBuf = l.writeBuf[:0]
if hSize > 0 {
l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
}
if dSize > 0 {
var err error
l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
Copy link
Member

@dfawley dfawley Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this buffer can only grow and never shrinks.

  1. What happens if a slice holds a pointer to a huge amount of data? I believe it isn't possible to free it, but am not certain. E.g.
l.writeBuf = [][]byte{nil, nil, nil, nil, nil, nil, make([]byte, 10GB)}
l.writeBuf = l.writeBuf[:0]
  1. What happens if cap(l.writeBuf) grows to a large value and then we never need it to be that large ever again?

I think we need to have some way to scale this buffer back down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For point 1, I've updated the code to clear the buffer after calling Write. This releases references to all the slices and allows them to be GCed.

With respect to point 2, I've now set a limit of 64 on the buffer's length. If a buffer is longer than that, it's immediately freed after use instead of being cached.

Background on the 64-element limit: The BufferSlice from the proto codec is 1 element. With a potential gRPC header, the length is almost always 2. While custom codecs might produce larger slices, 64 is a generous limit that covers common cases without caching excessive memory.

This change also mitigates a worst-case memory scenario. Since Peek() filters empty slices, a 16KB http2 Data frame (the max size) could theoretically be split into 16K (16,384) distinct 1-byte slices. In that case, the memory overhead for the slice headers alone would be 24 bytes * 16 * 1024 (approx. 393KB), with the 64 size limit, the max held memory is approx 1.5KB. Also note that the framer already has a data buffer that grows up to 16KB, and after this change, that buffer should no longer be used for Data frames.

if err != nil {
// This must never happen since the reader must have at least dSize
// bytes.
// Log an error to fail tests.
l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
return false, err
}
buf = pool.Get(size)
defer pool.Put(buf)

copy((*buf)[:hSize], dataItem.h)
_, _ = reader.Read((*buf)[hSize:])
}

// Now that outgoing flow controls are checked we can replenish str's write quota
Expand All @@ -1030,15 +1037,22 @@ func (l *loopyWriter) processData() (bool, error) {
if dataItem.onEachWrite != nil {
dataItem.onEachWrite()
}
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
reader.Discard(dSize)
if cap(l.writeBuf) > maxWriteBufSize {
l.writeBuf = nil
} else {
clear(l.writeBuf)
}
if err != nil {
return false, err
}
str.bytesOutStanding += size
l.sendQuota -= uint32(size)
dataItem.h = dataItem.h[hSize:]

if remainingBytes == 0 { // All the data from that message was written out.
_ = reader.Close()
reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {
Expand Down
36 changes: 36 additions & 0 deletions internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (df *parsedDataFrame) StreamEnded() bool {
type framer struct {
writer *bufWriter
fr *http2.Framer
headerBuf []byte // cached slice for framer headers to reduce heap allocs.
reader io.Reader
dataFrame parsedDataFrame // Cached data frame to avoid heap allocations.
pool mem.BufferPool
Expand Down Expand Up @@ -443,6 +444,41 @@ func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWr
return f
}

// writeData writes a DATA frame.
//
// It is the caller's responsibility not to violate the maximum frame size.
func (f *framer) writeData(streamID uint32, endStream bool, data [][]byte) error {
var flags http2.Flags
if endStream {
flags = http2.FlagDataEndStream
}
length := uint32(0)
for _, d := range data {
length += uint32(len(d))
}
// TODO: Replace the header write with the framer API being added in
// https://github.com/golang/go/issues/66655.
f.headerBuf = append(f.headerBuf[:0],
byte(length>>16),
byte(length>>8),
byte(length),
byte(http2.FrameData),
byte(flags),
byte(streamID>>24),
byte(streamID>>16),
byte(streamID>>8),
byte(streamID))
if _, err := f.writer.Write(f.headerBuf); err != nil {
return err
}
for _, d := range data {
if _, err := f.writer.Write(d); err != nil {
return err
}
}
return nil
}

// readFrame reads a single frame. The returned Frame is only valid
// until the next call to readFrame.
func (f *framer) readFrame() (any, error) {
Expand Down
62 changes: 60 additions & 2 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package mem

import (
"fmt"
"io"
)

Expand Down Expand Up @@ -126,9 +127,10 @@ func (s BufferSlice) Reader() *Reader {
}

// Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
// with other parts systems. It also provides an additional convenience method
// Remaining(), which returns the number of unread bytes remaining in the slice.
// with other systems.
//
// Buffers will be freed as they are read.
//
// A Reader can be constructed from a BufferSlice; alternatively the zero value
// of a Reader may be used after calling Reset on it.
type Reader struct {
Expand Down Expand Up @@ -285,3 +287,59 @@ nextBuffer:
}
}
}

// Discard skips the next n bytes, returning the number of bytes discarded.
//
// It frees buffers as they are fully consumed.
//
// If Discard skips fewer than n bytes, it also returns an error.
func (r *Reader) Discard(n int) (discarded int, err error) {
total := n
for n > 0 && r.len > 0 {
curData := r.data[0].ReadOnlyData()
curSize := min(n, len(curData)-r.bufferIdx)
n -= curSize
r.len -= curSize
r.bufferIdx += curSize
if r.bufferIdx >= len(curData) {
r.data[0].Free()
r.data = r.data[1:]
r.bufferIdx = 0
}
}
discarded = total - n
if n > 0 {
return discarded, fmt.Errorf("insufficient bytes in reader")
}
return discarded, nil
}

// Peek returns the next n bytes without advancing the reader.
//
// Peek appends results to the provided res slice and returns the updated slice.
// This pattern allows re-using the storage of res if it has sufficient
// capacity.
//
// The returned subslices are views into the underlying buffers and are only
// valid until the reader is advanced past the corresponding buffer.
//
// If Peek returns fewer than n bytes, it also returns an error.
func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) {
for i := 0; n > 0 && i < len(r.data); i++ {
curData := r.data[i].ReadOnlyData()
start := 0
if i == 0 {
start = r.bufferIdx
}
curSize := min(n, len(curData)-start)
if curSize == 0 {
continue
}
res = append(res, curData[start:start+curSize])
n -= curSize
}
if n > 0 {
return nil, fmt.Errorf("insufficient bytes in reader")
}
return res, nil
}
Loading