Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable to set thrift http response buf limit #294

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
enable to set thrift http response buf limit
HarrisChu committed Nov 23, 2023
commit 8427d739ce7d5f4d766f71276763006b8a23fbad
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ func logoutAndClose(conn *connection, sessionID int64) {
func TestConnection(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
@@ -122,7 +122,7 @@ func TestConnection(t *testing.T) {
func TestConnectionIPv6(t *testing.T) {
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
@@ -254,7 +254,7 @@ func TestAuthentication(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
@@ -1421,7 +1421,7 @@ func prepareSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
@@ -1458,7 +1458,7 @@ func dropSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
16 changes: 16 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,9 @@ type PoolConfig struct {
UseHTTP2 bool
// HttpHeader is the http headers for the connection when using HTTP2
HttpHeader http.Header
// HttpResponseBufLimit is the response buffer limit.
// If the response is larger than this, the buffer will be renewed.
HttpResponseBufLimit int
}

// validateConf validates config
@@ -54,6 +57,10 @@ func (conf *PoolConfig) validateConf(log Logger) {
conf.MinConnPoolSize = 0
log.Warn("Invalid MinConnPoolSize value, the default value of 0 has been applied")
}
if conf.HttpResponseBufLimit < 0 {
conf.HttpResponseBufLimit = 0
log.Warn("Invalid HttpResponseBufLimit value, the default value of 0 has been applied")
}
}

// GetDefaultConf returns the default config
@@ -138,6 +145,9 @@ type SessionPoolConf struct {
useHTTP2 bool
// httpHeader is the http headers for the connection
httpHeader http.Header
// HttpResponseBufLimit is the response buffer limit.
// If the response is larger than this, the buffer will be renewed.
HttpResponseBufLimit int
}

type SessionPoolConfOption func(*SessionPoolConf)
@@ -214,6 +224,12 @@ func WithHttpHeader(header http.Header) SessionPoolConfOption {
}
}

func WithHttpResponseBufLimit(limit int) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.HttpResponseBufLimit = limit
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
28 changes: 19 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
@@ -24,13 +24,14 @@ import (
)

type connection struct {
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
graph *graph.GraphServiceClient
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
httpResponseBufLimit int
graph *graph.GraphServiceClient
}

func newConnection(severAddress HostAddress) *connection {
@@ -46,12 +47,14 @@ func newConnection(severAddress HostAddress) *connection {
// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header) error {
useHTTP2 bool, httpHeader http.Header, httpRespBufferLimit int) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
cn.useHTTP2 = useHTTP2
cn.httpHeader = httpHeader
cn.httpResponseBufLimit = cn.httpResponseBufLimit

var (
err error
@@ -103,6 +106,13 @@ func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslCo
}
}
}
if httpRespBufferLimit != 0 {
client, ok := transport.(*thrift.HTTPClient)
if !ok {
return fmt.Errorf("failed to get thrift http client")
}
client.SetResponseBufferLimit(int64(httpRespBufferLimit))
}
} else {
bufferSize := 128 << 10

@@ -150,7 +160,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader, cn.httpResponseBufLimit)
}

// Authenticate
6 changes: 3 additions & 3 deletions connection_pool.go
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ func (pool *ConnectionPool) initPool() error {

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HttpResponseBufLimit); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
@@ -246,7 +246,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HttpResponseBufLimit); err != nil {
return nil, err
}
// Add connection to active queue
@@ -371,7 +371,7 @@ func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Conf
useHTTP2 bool, httpHeader http.Header) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, 0); err != nil {
return err
}
defer newConn.close()
2 changes: 1 addition & 1 deletion session_pool.go
Original file line number Diff line number Diff line change
@@ -287,7 +287,7 @@ func (pool *SessionPool) newSession() (*pureSession, error) {

// open a new connection
if err := cn.open(cn.severAddress, pool.conf.timeOut, pool.conf.sslConfig,
pool.conf.useHTTP2, pool.conf.httpHeader); err != nil {
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.HttpResponseBufLimit); err != nil {
return nil, fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}