Skip to content

Commit

Permalink
http2: add support to configure transport flow control values
Browse files Browse the repository at this point in the history
Adds to the transport configuration the capability to configure the maximum flow control values
instead of using default ones.
For applications that use a lot of client connections, being able to configure these values
allows for better control on memory usage, particulary when connections are long-lived.

related: golang/go#20448
  • Loading branch information
João Miguel Forte Oliveirinha authored and joliveirinha committed Jan 13, 2024
1 parent 689bbc7 commit 2aaa1ad
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
102 changes: 77 additions & 25 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ import (
)

const (
// transportDefaultConnFlow is how many connection-level flow control
// maxWriteFrameSize is the maximum possible frame size used to write to server.
maxWriteFrameSize = 512 << 10

// defaultTransportDefaultConnFlow is how many connection-level flow control
// tokens we give the server at start-up, past the default 64k.
transportDefaultConnFlow = 1 << 30
defaultTransportDefaultConnFlow = 1 << 30

// transportDefaultStreamFlow is how many stream-level flow
// defaultTransportDefaultStreamFlow is how many stream-level flow
// control tokens we announce to the peer, and how many bytes
// we buffer per stream.
transportDefaultStreamFlow = 4 << 20
defaultTransportDefaultStreamFlow = 4 << 20

defaultUserAgent = "Go-http-client/2.0"

Expand Down Expand Up @@ -124,6 +127,21 @@ type Transport struct {
// Values are bounded in the range 16k to 16M.
MaxReadFrameSize uint32

// MaxWriteFrameSize is the maximum frame size that we can a client
// connection can send to a server, even if server advertises a higher value.
// If 0, then a default value is used.
MaxWriteFrameSize uint32

// MaxDownloadBufferPerConnection is the maximum per connection buffer size,
// used for receiving data from the server.
// If 0, then a default value is used.
MaxDownloadBufferPerConnection uint32

// MaxDownloadBufferPerStream is the maximum buffer to use for inflow data sent
// by the server.
// If 0, then a default value is used.
MaxDownloadBufferPerStream uint32

// MaxDecoderHeaderTableSize optionally specifies the http2
// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
// informs the remote endpoint of the maximum size of the header compression
Expand Down Expand Up @@ -304,6 +322,9 @@ type ClientConn struct {
idleTimeout time.Duration // or 0 for never
idleTimer *time.Timer

maxWriteFrameSize uint32
maxDownloadBufferPerStream uint32

mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
Expand Down Expand Up @@ -731,25 +752,55 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 {
return initialHeaderTableSize
}

func (t *Transport) maxDownloadBufferPerConnection() uint32 {
maxWindow := uint32((2 << 31) - 1 - initialWindowSize)

if v := t.MaxDownloadBufferPerConnection; v >= initialWindowSize {
if v > maxWindow {
return maxWindow
} else {
return v
}
}

return defaultTransportDefaultConnFlow
}

func (t *Transport) maxDownloadBufferPerStream() uint32 {
if v := t.MaxDownloadBufferPerStream; v > 0 {
return v
}
return defaultTransportDefaultStreamFlow
}

func (t *Transport) maxWriteFrameSize() uint32 {
if v := t.MaxWriteFrameSize; v > 0 && v <= maxWriteFrameSize {
return v
}
return maxWriteFrameSize
}

func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
return t.newClientConn(c, t.disableKeepAlives())
}

func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
maxWriteFrameSize: t.maxWriteFrameSize(),
maxDownloadBufferPerStream: t.maxDownloadBufferPerStream(),
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
Expand Down Expand Up @@ -796,7 +847,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro

initialSettings := []Setting{
{ID: SettingEnablePush, Val: 0},
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
{ID: SettingInitialWindowSize, Val: t.maxDownloadBufferPerStream()},
}
if max := t.maxFrameReadSize(); max != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max})
Expand All @@ -810,8 +861,8 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro

cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
cc.fr.WriteWindowUpdate(0, t.maxDownloadBufferPerConnection())
cc.inflow.init(int32(t.maxDownloadBufferPerConnection()) + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
Expand Down Expand Up @@ -1660,12 +1711,12 @@ var (
// outgoing request bodies to read/write to/from.
//
// It returns max(1, min(peer's advertised max frame size,
// Request.ContentLength+1, 512KB)).
// Request.ContentLength+1, Transport.MaxWriteFrameSize)).
func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
const max = 512 << 10
var maxSize = int64(cs.cc.maxWriteFrameSize)
n := int64(maxFrameSize)
if n > max {
n = max
if n > maxSize {
n = maxSize
}
if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
Expand Down Expand Up @@ -2120,7 +2171,8 @@ type resAndError struct {
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.init(transportDefaultStreamFlow)
// no need to truncate since max is maxWriteFrameSize
cs.inflow.init(int32(cc.maxDownloadBufferPerStream))
cs.ID = cc.nextStreamID
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
Expand Down
2 changes: 1 addition & 1 deletion http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2867,7 +2867,7 @@ func TestTransportFlowControl(t *testing.T) {
}
read += int64(n)

const max = transportDefaultStreamFlow
const max = defaultTransportDefaultStreamFlow
if w := atomic.LoadInt64(&wrote); -max > read-w || read-w > max {
t.Fatalf("Too much data inflight: server wrote %v bytes but client only received %v", w, read)
}
Expand Down

0 comments on commit 2aaa1ad

Please sign in to comment.