Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: add support to configure transport flow control values #201

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 77 additions & 25 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ import (
)

const (
// transportDefaultConnFlow is how many connection-level flow control
// maxWriteFrameSize is the maximum possible frame size used to write to server.
maxWriteFrameSize = 512 << 10

// defaultTransportDefaultConnFlow is how many connection-level flow control
// tokens we give the server at start-up, past the default 64k.
transportDefaultConnFlow = 1 << 30
defaultTransportDefaultConnFlow = 1 << 30

// transportDefaultStreamFlow is how many stream-level flow
// defaultTransportDefaultStreamFlow is how many stream-level flow
// control tokens we announce to the peer, and how many bytes
// we buffer per stream.
transportDefaultStreamFlow = 4 << 20
defaultTransportDefaultStreamFlow = 4 << 20

defaultUserAgent = "Go-http-client/2.0"

Expand Down Expand Up @@ -124,6 +127,21 @@ type Transport struct {
// Values are bounded in the range 16k to 16M.
MaxReadFrameSize uint32

// MaxWriteFrameSize is the maximum frame size that we can a client
// connection can send to a server, even if server advertises a higher value.
// If 0, then a default value is used.
MaxWriteFrameSize uint32

// MaxDownloadBufferPerConnection is the maximum per connection buffer size,
// used for receiving data from the server.
// If 0, then a default value is used.
MaxDownloadBufferPerConnection uint32

// MaxDownloadBufferPerStream is the maximum buffer to use for inflow data sent
// by the server.
// If 0, then a default value is used.
MaxDownloadBufferPerStream uint32

// MaxDecoderHeaderTableSize optionally specifies the http2
// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
// informs the remote endpoint of the maximum size of the header compression
Expand Down Expand Up @@ -304,6 +322,9 @@ type ClientConn struct {
idleTimeout time.Duration // or 0 for never
idleTimer *time.Timer

maxWriteFrameSize uint32
maxDownloadBufferPerStream uint32

mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
Expand Down Expand Up @@ -731,25 +752,55 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 {
return initialHeaderTableSize
}

func (t *Transport) maxDownloadBufferPerConnection() uint32 {
maxWindow := uint32((2 << 31) - 1 - initialWindowSize)

if v := t.MaxDownloadBufferPerConnection; v >= initialWindowSize {
if v > maxWindow {
return maxWindow
} else {
return v
}
}

return defaultTransportDefaultConnFlow
}

func (t *Transport) maxDownloadBufferPerStream() uint32 {
if v := t.MaxDownloadBufferPerStream; v > 0 {
return v
}
return defaultTransportDefaultStreamFlow
}

func (t *Transport) maxWriteFrameSize() uint32 {
if v := t.MaxWriteFrameSize; v > 0 && v <= maxWriteFrameSize {
return v
}
return maxWriteFrameSize
}

func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
return t.newClientConn(c, t.disableKeepAlives())
}

func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
maxWriteFrameSize: t.maxWriteFrameSize(),
maxDownloadBufferPerStream: t.maxDownloadBufferPerStream(),
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
Expand Down Expand Up @@ -796,7 +847,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro

initialSettings := []Setting{
{ID: SettingEnablePush, Val: 0},
{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
{ID: SettingInitialWindowSize, Val: t.maxDownloadBufferPerStream()},
}
if max := t.maxFrameReadSize(); max != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max})
Expand All @@ -810,8 +861,8 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro

cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
cc.fr.WriteWindowUpdate(0, t.maxDownloadBufferPerConnection())
cc.inflow.init(int32(t.maxDownloadBufferPerConnection()) + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
Expand Down Expand Up @@ -1660,12 +1711,12 @@ var (
// outgoing request bodies to read/write to/from.
//
// It returns max(1, min(peer's advertised max frame size,
// Request.ContentLength+1, 512KB)).
// Request.ContentLength+1, Transport.MaxWriteFrameSize)).
func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
const max = 512 << 10
var maxSize = int64(cs.cc.maxWriteFrameSize)
n := int64(maxFrameSize)
if n > max {
n = max
if n > maxSize {
n = maxSize
}
if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
Expand Down Expand Up @@ -2120,7 +2171,8 @@ type resAndError struct {
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.init(transportDefaultStreamFlow)
// no need to truncate since max is maxWriteFrameSize
cs.inflow.init(int32(cc.maxDownloadBufferPerStream))
cs.ID = cc.nextStreamID
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
Expand Down
2 changes: 1 addition & 1 deletion http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2867,7 +2867,7 @@ func TestTransportFlowControl(t *testing.T) {
}
read += int64(n)

const max = transportDefaultStreamFlow
const max = defaultTransportDefaultStreamFlow
if w := atomic.LoadInt64(&wrote); -max > read-w || read-w > max {
t.Fatalf("Too much data inflight: server wrote %v bytes but client only received %v", w, read)
}
Expand Down