Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor version to handshakeKey #302

Merged
merged 4 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestVersionVerify(t *testing.T) {
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, "INVALID_VERSION")
if err != nil {
assert.Contains(t, err.Error(), "incompatible version between client and server")
assert.Contains(t, err.Error(), "incompatible handshakeKey between client and server")
}
defer conn.close()
}
Expand Down
14 changes: 7 additions & 7 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type PoolConfig struct {
UseHTTP2 bool
// HttpHeader is the http headers for the connection when using HTTP2
HttpHeader http.Header
// client version, make sure the client version is in the white list of NebulaGraph server 'client_white_list'
Version string
// client handshakeKey, make sure the client handshakeKey is in the white list of NebulaGraph server 'client_white_list'
HandshakeKey string
}

// validateConf validates config
Expand Down Expand Up @@ -66,7 +66,7 @@ func GetDefaultConf() PoolConfig {
MaxConnPoolSize: 10,
MinConnPoolSize: 0,
UseHTTP2: false,
Version: "",
HandshakeKey: "",
}
}

Expand Down Expand Up @@ -141,8 +141,8 @@ type SessionPoolConf struct {
useHTTP2 bool
// httpHeader is the http headers for the connection
httpHeader http.Header
// client version, make sure the client version is in the white list of NebulaGraph server 'client_white_list'
version string
// client handshakeKey, make sure the client handshakeKey is in the white list of NebulaGraph server 'client_white_list'
handshakeKey string
}

type SessionPoolConfOption func(*SessionPoolConf)
Expand Down Expand Up @@ -219,9 +219,9 @@ func WithHttpHeader(header http.Header) SessionPoolConfOption {
}
}

func WithVersion(version string) SessionPoolConfOption {
func WithHandshakeKey(handshakeKey string) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.version = version
conf.handshakeKey = handshakeKey
}
}

Expand Down
18 changes: 9 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type connection struct {
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
version string
handshakeKey string
graph *graph.GraphServiceClient
}

Expand All @@ -40,21 +40,21 @@ func newConnection(severAddress HostAddress) *connection {
timeout: 0 * time.Millisecond,
returnedAt: time.Now(),
sslConfig: nil,
version: "",
handshakeKey: "",
graph: nil,
}
}

// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, version string) error {
useHTTP2 bool, httpHeader http.Header, handshakeKey string) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
cn.useHTTP2 = useHTTP2
cn.version = version
cn.handshakeKey = handshakeKey

var (
err error
Expand Down Expand Up @@ -136,16 +136,16 @@ func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslCo

func (cn *connection) verifyClientVersion() error {
req := graph.NewVerifyClientVersionReq()
if cn.version != "" {
req.SetVersion([]byte(cn.version))
if cn.handshakeKey != "" {
req.SetVersion([]byte(cn.handshakeKey))
}
resp, err := cn.graph.VerifyClientVersion(req)
if err != nil {
cn.close()
return fmt.Errorf("failed to verify client version: %s", err.Error())
return fmt.Errorf("failed to verify client handshakeKey: %s", err.Error())
}
if resp.GetErrorCode() != nebula.ErrorCode_SUCCEEDED {
return fmt.Errorf("incompatible version between client and server: %s", string(resp.GetErrorMsg()))
return fmt.Errorf("incompatible handshakeKey between client and server: %s", string(resp.GetErrorMsg()))
}
return nil
}
Expand All @@ -156,7 +156,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader, cn.version)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader, cn.handshakeKey)
}

// Authenticate
Expand Down
16 changes: 8 additions & 8 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewSslConnectionPool(addresses []HostAddress, conf PoolConfig, sslConfig *t
// initPool initializes the connection pool
func (pool *ConnectionPool) initPool() error {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.Version); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HandshakeKey); err != nil {
return fmt.Errorf("failed to open connection, error: %s ", err.Error())
}

Expand All @@ -76,7 +76,7 @@ func (pool *ConnectionPool) initPool() error {

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.Version); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HandshakeKey); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
Expand Down Expand Up @@ -194,7 +194,7 @@ func (pool *ConnectionPool) releaseAndBack(conn *connection, pushBack bool) {

// Ping checks availability of host
func (pool *ConnectionPool) Ping(host HostAddress, timeout time.Duration) error {
return pingAddress(host, timeout, pool.sslConfig, pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.Version)
return pingAddress(host, timeout, pool.sslConfig, pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HandshakeKey)
}

// Close closes all connection
Expand Down Expand Up @@ -246,7 +246,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.Version); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HandshakeKey); err != nil {
return nil, err
}
// Add connection to active queue
Expand Down Expand Up @@ -354,24 +354,24 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {
// It opens a temporary connection to each address and closes it immediately.
// If no error is returned, the addresses are available.
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, version string) error {
useHTTP2 bool, httpHeader http.Header, handshakeKey string) error {
var timeout = 3 * time.Second
if confTimeout != 0 && confTimeout < timeout {
timeout = confTimeout
}
for _, address := range addresses {
if err := pingAddress(address, timeout, sslConfig, useHTTP2, httpHeader, version); err != nil {
if err := pingAddress(address, timeout, sslConfig, useHTTP2, httpHeader, handshakeKey); err != nil {
return err
}
}
return nil
}

func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header, version string) error {
useHTTP2 bool, httpHeader http.Header, handshakeKey string) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, version); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, handshakeKey); err != nil {
return err
}
defer newConn.close()
Expand Down
2 changes: 1 addition & 1 deletion examples/basic_example/graph_client_basic_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2
testPoolConfig.Version = "3.0.0"
testPoolConfig.HandshakeKey = "3.0.0"

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
Expand Down
4 changes: 2 additions & 2 deletions session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewSessionPool(conf SessionPoolConf, log Logger) (*SessionPool, error) {
func (pool *SessionPool) init() error {
// check the hosts status
if err := checkAddresses(pool.conf.timeOut, pool.conf.serviceAddrs, pool.conf.sslConfig,
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.version); err != nil {
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.handshakeKey); err != nil {
return fmt.Errorf("failed to initialize the session pool, %s", err.Error())
}

Expand Down Expand Up @@ -287,7 +287,7 @@ func (pool *SessionPool) newSession() (*pureSession, error) {

// open a new connection
if err := cn.open(cn.severAddress, pool.conf.timeOut, pool.conf.sslConfig,
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.version); err != nil {
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.handshakeKey); err != nil {
return nil, fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}

Expand Down
8 changes: 4 additions & 4 deletions session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,25 @@ func TestSessionPoolServerCheck(t *testing.T) {
}
}

func TestSessionPoolInvalidVersion(t *testing.T) {
func TestSessionPoolInvalidHandshakeKey(t *testing.T) {
prepareSpace("client_test")
defer dropSpace("client_test")
hostAddress := HostAddress{Host: address, Port: port}

// wrong version info
// wrong handshakeKey info
versionConfig, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
"client_test",
)
versionConfig.version = "INVALID_VERSION"
versionConfig.handshakeKey = "INVALID_VERSION"
versionConfig.minSize = 1

// create session pool
_, err = NewSessionPool(*versionConfig, DefaultLogger{})
if err != nil {
assert.Contains(t, err.Error(), "incompatible version between client and server")
assert.Contains(t, err.Error(), "incompatible handshakeKey between client and server")
}
}
Copy link
Contributor

@veezhang veezhang Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There also many version words in the test func including the func name.


Expand Down
Loading