Skip to content

Commit

Permalink
enable to set thrift http response buf limit
Browse files Browse the repository at this point in the history
  • Loading branch information
HarrisChu committed Nov 23, 2023
1 parent 427c44b commit 8427d73
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 18 deletions.
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func logoutAndClose(conn *connection, sessionID int64) {
func TestConnection(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestConnection(t *testing.T) {
func TestConnectionIPv6(t *testing.T) {
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestAuthentication(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1421,7 +1421,7 @@ func prepareSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1458,7 +1458,7 @@ func dropSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down
16 changes: 16 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type PoolConfig struct {
UseHTTP2 bool
// HttpHeader is the http headers for the connection when using HTTP2
HttpHeader http.Header
// HttpResponseBufLimit is the response buffer limit.
// If the response is larger than this, the buffer will be renewed.
HttpResponseBufLimit int
}

// validateConf validates config
Expand All @@ -54,6 +57,10 @@ func (conf *PoolConfig) validateConf(log Logger) {
conf.MinConnPoolSize = 0
log.Warn("Invalid MinConnPoolSize value, the default value of 0 has been applied")
}
if conf.HttpResponseBufLimit < 0 {
conf.HttpResponseBufLimit = 0
log.Warn("Invalid HttpResponseBufLimit value, the default value of 0 has been applied")
}
}

// GetDefaultConf returns the default config
Expand Down Expand Up @@ -138,6 +145,9 @@ type SessionPoolConf struct {
useHTTP2 bool
// httpHeader is the http headers for the connection
httpHeader http.Header
// HttpResponseBufLimit is the response buffer limit.
// If the response is larger than this, the buffer will be renewed.
HttpResponseBufLimit int
}

type SessionPoolConfOption func(*SessionPoolConf)
Expand Down Expand Up @@ -214,6 +224,12 @@ func WithHttpHeader(header http.Header) SessionPoolConfOption {
}
}

func WithHttpResponseBufLimit(limit int) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.HttpResponseBufLimit = limit
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
Expand Down
28 changes: 19 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
)

type connection struct {
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
graph *graph.GraphServiceClient
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
httpResponseBufLimit int
graph *graph.GraphServiceClient
}

func newConnection(severAddress HostAddress) *connection {
Expand All @@ -46,12 +47,14 @@ func newConnection(severAddress HostAddress) *connection {
// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header) error {
useHTTP2 bool, httpHeader http.Header, httpRespBufferLimit int) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
cn.useHTTP2 = useHTTP2
cn.httpHeader = httpHeader
cn.httpResponseBufLimit = cn.httpResponseBufLimit

var (
err error
Expand Down Expand Up @@ -103,6 +106,13 @@ func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslCo
}
}
}
if httpRespBufferLimit != 0 {
client, ok := transport.(*thrift.HTTPClient)
if !ok {
return fmt.Errorf("failed to get thrift http client")
}
client.SetResponseBufferLimit(int64(httpRespBufferLimit))

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client (1.16)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client-example (1.16)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client-ssl (1.16)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client (1.17)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client-example (1.17)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client-ssl (1.17)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client (1.18)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client-example (1.18)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)

Check failure on line 114 in connection.go

View workflow job for this annotation

GitHub Actions / go-client-ssl (1.18)

client.SetResponseBufferLimit undefined (type *thrift.HTTPClient has no field or method SetResponseBufferLimit)
}
} else {
bufferSize := 128 << 10

Expand Down Expand Up @@ -150,7 +160,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader, cn.httpResponseBufLimit)
}

// Authenticate
Expand Down
6 changes: 3 additions & 3 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (pool *ConnectionPool) initPool() error {

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HttpResponseBufLimit); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
Expand Down Expand Up @@ -246,7 +246,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HttpResponseBufLimit); err != nil {
return nil, err
}
// Add connection to active queue
Expand Down Expand Up @@ -371,7 +371,7 @@ func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Conf
useHTTP2 bool, httpHeader http.Header) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, 0); err != nil {
return err
}
defer newConn.close()
Expand Down
2 changes: 1 addition & 1 deletion session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (pool *SessionPool) newSession() (*pureSession, error) {

// open a new connection
if err := cn.open(cn.severAddress, pool.conf.timeOut, pool.conf.sslConfig,
pool.conf.useHTTP2, pool.conf.httpHeader); err != nil {
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.HttpResponseBufLimit); err != nil {
return nil, fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}

Expand Down

0 comments on commit 8427d73

Please sign in to comment.