diff --git a/pkg/client/client.go b/pkg/client/client.go index 5c1191872..2b3cd38a7 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -62,15 +62,17 @@ func initTmClient(cfg *Config) { }) } -// initRemoting init rpc client +// initRemoting init remoting func initRemoting(cfg *Config) { - getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{ + seataConfig := remoteConfig.SeataConfig{ ApplicationID: cfg.ApplicationID, TxServiceGroup: cfg.TxServiceGroup, ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping, ServiceGrouplist: cfg.ServiceConfig.Grouplist, LoadBalanceType: cfg.GettyConfig.LoadBalanceType, - }) + } + + getty.InitGetty(&cfg.GettyConfig, &seataConfig) } // InitRmClient init client rm client diff --git a/pkg/remoting/config/config.go b/pkg/remoting/config/config.go index 5bb126677..823440c7b 100644 --- a/pkg/remoting/config/config.go +++ b/pkg/remoting/config/config.go @@ -84,7 +84,7 @@ type SeataConfig struct { LoadBalanceType string } -func IniConfig(seataConf *SeataConfig) { +func InitConfig(seataConf *SeataConfig) { seataConfig = seataConf } diff --git a/pkg/remoting/getty/getty_client.go b/pkg/remoting/getty/getty_client.go index 72874b0f9..470f2a4f4 100644 --- a/pkg/remoting/getty/getty_client.go +++ b/pkg/remoting/getty/getty_client.go @@ -35,14 +35,16 @@ var ( ) type GettyRemotingClient struct { - idGenerator *atomic.Uint32 + idGenerator *atomic.Uint32 + gettyRemoting *GettyRemoting } func GetGettyRemotingClient() *GettyRemotingClient { if gettyRemotingClient == nil { onceGettyRemotingClient.Do(func() { gettyRemotingClient = &GettyRemotingClient{ - idGenerator: &atomic.Uint32{}, + idGenerator: &atomic.Uint32{}, + gettyRemoting: newGettyRemoting(), } }) } @@ -63,7 +65,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error { Compressor: 0, Body: msg, } - return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback) + return client.gettyRemoting.SendAsync(rpcMessage, nil, client.asyncCallback) } func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error { @@ -74,7 +76,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{ Compressor: 0, Body: msg, } - return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil) + return client.gettyRemoting.SendAsync(rpcMessage, nil, nil) } func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) { @@ -85,7 +87,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{} Compressor: 0, Body: msg, } - return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback) + return client.gettyRemoting.SendSync(rpcMessage, nil, client.syncCallback) } func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) { @@ -96,10 +98,30 @@ func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg * func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) { select { case <-gxtime.GetDefaultTimerWheel().After(RpcRequestTimeout): - GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID) + g.gettyRemoting.RemoveMergedMessageFuture(reqMsg.ID) log.Errorf("wait resp timeout: %#v", reqMsg) return nil, fmt.Errorf("wait response timeout, request: %#v", reqMsg) case <-respMsg.Done: return respMsg.Response, respMsg.Err } } + +func (client *GettyRemotingClient) GetMergedMessage(msgID int32) *message.MergedWarpMessage { + return client.gettyRemoting.GetMergedMessage(msgID) +} + +func (client *GettyRemotingClient) GetMessageFuture(msgID int32) *message.MessageFuture { + return client.gettyRemoting.GetMessageFuture(msgID) +} + +func (client *GettyRemotingClient) RemoveMessageFuture(msgID int32) { + client.gettyRemoting.RemoveMessageFuture(msgID) +} + +func (client *GettyRemotingClient) RemoveMergedMessageFuture(msgID int32) { + client.gettyRemoting.RemoveMergedMessageFuture(msgID) +} + +func (client *GettyRemotingClient) NotifyRpcMessageResponse(msg message.RpcMessage) { + client.gettyRemoting.NotifyRpcMessageResponse(msg) +} diff --git a/pkg/remoting/getty/getty_client_test.go b/pkg/remoting/getty/getty_client_test.go index a5737283f..e3e5e48d5 100644 --- a/pkg/remoting/getty/getty_client_test.go +++ b/pkg/remoting/getty/getty_client_test.go @@ -40,7 +40,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) { }, }, } - gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendSync", + gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendSync", func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) { return respMsg, nil @@ -52,7 +52,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) { // TestGettyRemotingClient_SendAsyncResponse unit test for SendAsyncResponse function func TestGettyRemotingClient_SendAsyncResponse(t *testing.T) { - gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync", + gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync", func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error { return nil }) @@ -77,7 +77,7 @@ func TestGettyRemotingClient_SendAsyncRequest(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync", + gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync", func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error { return nil }) diff --git a/pkg/remoting/getty/getty_init.go b/pkg/remoting/getty/getty_init.go new file mode 100644 index 000000000..37f716010 --- /dev/null +++ b/pkg/remoting/getty/getty_init.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package getty + +import ( + "seata.apache.org/seata-go/pkg/protocol/codec" + "seata.apache.org/seata-go/pkg/remoting/config" +) + +func InitGetty(gettyConfig *config.Config, seataConfig *config.SeataConfig) { + config.InitConfig(seataConfig) + codec.Init() + initSessionManager(gettyConfig) +} diff --git a/pkg/remoting/getty/getty_remoting.go b/pkg/remoting/getty/getty_remoting.go index bd378f64d..3dec63c12 100644 --- a/pkg/remoting/getty/getty_remoting.go +++ b/pkg/remoting/getty/getty_remoting.go @@ -33,11 +33,6 @@ const ( RpcRequestTimeout = 20 * time.Second ) -var ( - gettyRemoting *GettyRemoting - onceGettyRemoting = &sync.Once{} -) - type ( callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) GettyRemoting struct { @@ -46,16 +41,11 @@ type ( } ) -func GetGettyRemotingInstance() *GettyRemoting { - if gettyRemoting == nil { - onceGettyRemoting.Do(func() { - gettyRemoting = &GettyRemoting{ - futures: &sync.Map{}, - mergeMsgMap: &sync.Map{}, - } - }) +func newGettyRemoting() *GettyRemoting { + return &GettyRemoting{ + futures: &sync.Map{}, + mergeMsgMap: &sync.Map{}, } - return gettyRemoting } func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) { @@ -72,7 +62,7 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba return result, err } -func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error { +func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error { if s == nil { s = sessionManager.selectSession(msg) } diff --git a/pkg/remoting/getty/getty_remoting_test.go b/pkg/remoting/getty/getty_remoting_test.go index 8a3e9116e..ef18719f3 100644 --- a/pkg/remoting/getty/getty_remoting_test.go +++ b/pkg/remoting/getty/getty_remoting_test.go @@ -47,14 +47,15 @@ func TestGettyRemoting_GetMessageFuture(t *testing.T) { }, }, } + gettyRemotingClient := GetGettyRemotingClient() for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.messageFuture != nil { - GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture) - messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID) + gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture) + messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID) assert.Equal(t, *test.messageFuture, *messageFuture) } else { - messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID) + messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID) assert.Empty(t, messageFuture) } }) @@ -78,13 +79,14 @@ func TestGettyRemoting_RemoveMessageFuture(t *testing.T) { }, }, } + gettyRemotingClient := GetGettyRemotingClient() for _, test := range tests { t.Run(test.name, func(t *testing.T) { - GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture) - messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID) + gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture) + messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID) assert.Equal(t, messageFuture, test.messageFuture) - GetGettyRemotingInstance().RemoveMessageFuture(test.msgID) - messageFuture = GetGettyRemotingInstance().GetMessageFuture(test.msgID) + gettyRemotingClient.gettyRemoting.RemoveMessageFuture(test.msgID) + messageFuture = gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID) assert.Empty(t, messageFuture) }) } @@ -110,14 +112,15 @@ func TestGettyRemoting_GetMergedMessage(t *testing.T) { }, }, } + gettyRemotingClient := GetGettyRemotingClient() for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.mergedWarpMessage != nil { - GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage) - mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID) + gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage) + mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID) assert.Equal(t, *test.mergedWarpMessage, *mergedWarpMessage) } else { - mergedWarpMessage := GetGettyRemotingInstance().GetMessageFuture(test.msgID) + mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID) assert.Empty(t, mergedWarpMessage) } }) @@ -144,18 +147,19 @@ func TestGettyRemoting_RemoveMergedMessageFuture(t *testing.T) { }, }, } + gettyRemotingClient := GetGettyRemotingClient() for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.mergedWarpMessage != nil { - GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage) - mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID) + gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage) + mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID) assert.NotEmpty(t, mergedWarpMessage) - GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID) - mergedWarpMessage = GetGettyRemotingInstance().GetMergedMessage(test.msgID) + gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID) + mergedWarpMessage = gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID) assert.Empty(t, mergedWarpMessage) } else { - GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID) - mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID) + gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID) + mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID) assert.Empty(t, mergedWarpMessage) } }) diff --git a/pkg/remoting/getty/listener.go b/pkg/remoting/getty/listener.go index 2f2e97489..09886c3a7 100644 --- a/pkg/remoting/getty/listener.go +++ b/pkg/remoting/getty/listener.go @@ -38,22 +38,16 @@ var ( ) type gettyClientHandler struct { - idGenerator *atomic.Uint32 - msgFutures *sync.Map - mergeMsgMap *sync.Map - sessionManager *SessionManager - processorMap map[message.MessageType]processor.RemotingProcessor + idGenerator *atomic.Uint32 + processorMap map[message.MessageType]processor.RemotingProcessor } func GetGettyClientHandlerInstance() *gettyClientHandler { if clientHandler == nil { onceClientHandler.Do(func() { clientHandler = &gettyClientHandler{ - idGenerator: &atomic.Uint32{}, - msgFutures: &sync.Map{}, - mergeMsgMap: &sync.Map{}, - sessionManager: sessionManager, - processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0), + idGenerator: &atomic.Uint32{}, + processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0), } }) } @@ -62,7 +56,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler { func (g *gettyClientHandler) OnOpen(session getty.Session) error { log.Infof("Open new getty session ") - g.sessionManager.registerSession(session) + sessionManager.registerSession(session) conf := config.GetSeataConfig() go func() { request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{ @@ -73,7 +67,7 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error { err := GetGettyRemotingClient().SendAsyncRequest(request) if err != nil { log.Errorf("OnOpen error: {%#v}", err.Error()) - g.sessionManager.releaseSession(session) + sessionManager.releaseSession(session) return } }() @@ -83,12 +77,12 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error { func (g *gettyClientHandler) OnError(session getty.Session, err error) { log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) - g.sessionManager.releaseSession(session) + sessionManager.releaseSession(session) } func (g *gettyClientHandler) OnClose(session getty.Session) { log.Infof("session{%s} is closing......", session.Stat()) - g.sessionManager.releaseSession(session) + sessionManager.releaseSession(session) } func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) { @@ -117,8 +111,19 @@ func (g *gettyClientHandler) OnCron(session getty.Session) { log.Debug("session{%s} Oncron executing", session.Stat()) err := g.transferHeartBeat(session, message.HeartBeatMessagePing) if err != nil { - log.Errorf("failed to send heart beat: {%#v}", err.Error()) - g.sessionManager.releaseSession(session) + log.Warnf("failed to send heart beat: {%#v}", err.Error()) + if session.GetAttribute(heartBeatRetryTimesKey) != nil { + retryTimes := session.GetAttribute(heartBeatRetryTimesKey).(int) + if retryTimes >= maxHeartBeatRetryTimes { + log.Warnf("heartbeat retry times exceed default max retry times{%d}, close the session{%s}", + maxHeartBeatRetryTimes, session.Stat()) + sessionManager.releaseSession(session) + return + } + session.SetAttribute(heartBeatRetryTimesKey, retryTimes+1) + } else { + session.SetAttribute(heartBeatRetryTimesKey, 1) + } } } @@ -130,7 +135,7 @@ func (g *gettyClientHandler) transferHeartBeat(session getty.Session, msg messag Compressor: 0, Body: msg, } - return GetGettyRemotingInstance().SendASync(rpcMessage, session, nil) + return GetGettyRemotingClient().gettyRemoting.SendAsync(rpcMessage, session, nil) } func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) { diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go deleted file mode 100644 index 17b6cec3e..000000000 --- a/pkg/remoting/getty/rpc_client.go +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package getty - -import ( - "crypto/tls" - "fmt" - "net" - "sync" - - getty "github.com/apache/dubbo-getty" - gxsync "github.com/dubbogo/gost/sync" - - "seata.apache.org/seata-go/pkg/discovery" - "seata.apache.org/seata-go/pkg/protocol/codec" - "seata.apache.org/seata-go/pkg/remoting/config" - "seata.apache.org/seata-go/pkg/util/log" -) - -type RpcClient struct { - gettyConf *config.Config - seataConf *config.SeataConfig - gettyClients []getty.Client - futures *sync.Map -} - -func InitRpcClient(gettyConfig *config.Config, seataConfig *config.SeataConfig) { - config.IniConfig(seataConfig) - rpcClient := &RpcClient{ - gettyConf: gettyConfig, - seataConf: seataConfig, - gettyClients: make([]getty.Client, 0), - } - codec.Init() - rpcClient.init() -} - -func (c *RpcClient) init() { - addressList := c.getAvailServerList() - if len(addressList) == 0 { - log.Warn("no have valid seata server list") - } - for _, address := range addressList { - gettyClient := getty.NewTCPClient( - getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)), - // todo if read c.gettyConf.ConnectionNum, will cause the connect to fail - getty.WithConnectionNumber(1), - getty.WithReconnectInterval(c.gettyConf.ReconnectInterval), - getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)), - ) - go gettyClient.RunEventLoop(c.newSession) - // c.gettyClients = append(c.gettyClients, gettyClient) - } -} - -func (c *RpcClient) getAvailServerList() []*discovery.ServiceInstance { - registryService := discovery.GetRegistry() - instances, err := registryService.Lookup(c.seataConf.TxServiceGroup) - if err != nil { - return nil - } - return instances -} - -func (c *RpcClient) newSession(session getty.Session) error { - var ( - ok bool - tcpConn *net.TCPConn - err error - ) - - if c.gettyConf.SessionConfig.CompressEncoding { - session.SetCompressType(getty.CompressZip) - } - if _, ok = session.Conn().(*tls.Conn); ok { - c.setSessionConfig(session) - log.Debugf("server accepts new tls session:%s\n", session.Stat()) - return nil - } - if _, ok = session.Conn().(*net.TCPConn); !ok { - panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp connection\n", session.Stat(), session.Conn())) - } - - if _, ok = session.Conn().(*tls.Conn); !ok { - if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { - return fmt.Errorf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()) - } - - if err = tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil { - return err - } - if err = tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil { - return err - } - if c.gettyConf.SessionConfig.TCPKeepAlive { - if err = tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err != nil { - return err - } - } - if err = tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil { - return err - } - if err = tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil { - return err - } - } - - c.setSessionConfig(session) - log.Debugf("rpc_client new session:%s\n", session.Stat()) - - return nil -} - -func (c *RpcClient) setSessionConfig(session getty.Session) { - session.SetName(c.gettyConf.SessionConfig.SessionName) - session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen) - session.SetPkgHandler(rpcPkgHandler) - session.SetEventListener(GetGettyClientHandlerInstance()) - session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout) - session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout) - session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds())) - session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout) -} diff --git a/pkg/remoting/getty/session_manager.go b/pkg/remoting/getty/session_manager.go index 6a68cf1b1..602fffed0 100644 --- a/pkg/remoting/getty/session_manager.go +++ b/pkg/remoting/getty/session_manager.go @@ -18,38 +18,142 @@ package getty import ( + "crypto/tls" + "fmt" + "net" "reflect" "sync" "sync/atomic" "time" getty "github.com/apache/dubbo-getty" + gxsync "github.com/dubbogo/gost/sync" + "seata.apache.org/seata-go/pkg/discovery" "seata.apache.org/seata-go/pkg/protocol/message" "seata.apache.org/seata-go/pkg/remoting/config" "seata.apache.org/seata-go/pkg/remoting/loadbalance" + "seata.apache.org/seata-go/pkg/util/log" ) const ( - maxCheckAliveRetry = 600 - checkAliveInternal = 100 + maxCheckAliveRetry = 600 + checkAliveInternal = 100 + heartBeatRetryTimesKey = "heartbeat-retry-times" + maxHeartBeatRetryTimes = 3 ) -var sessionManager = newSessionManager() +var ( + sessionManager *SessionManager + onceSessionManager = &sync.Once{} +) type SessionManager struct { // serverAddress -> rpc_client.Session -> bool serverSessions sync.Map allSessions sync.Map sessionSize int32 + gettyConf *config.Config +} + +func initSessionManager(gettyConfig *config.Config) { + if sessionManager == nil { + onceSessionManager.Do(func() { + sessionManager = &SessionManager{ + allSessions: sync.Map{}, + serverSessions: sync.Map{}, + gettyConf: gettyConfig, + } + sessionManager.init() + }) + } } -func newSessionManager() *SessionManager { - return &SessionManager{ - allSessions: sync.Map{}, - // serverAddress -> rpc_client.Session -> bool - serverSessions: sync.Map{}, +func (g *SessionManager) init() { + addressList := g.getAvailServerList() + if len(addressList) == 0 { + log.Warn("no have valid seata server list") + } + for _, address := range addressList { + gettyClient := getty.NewTCPClient( + getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)), + // todo if read c.gettyConf.ConnectionNum, will cause the connect to fail + getty.WithConnectionNumber(1), + getty.WithReconnectInterval(g.gettyConf.ReconnectInterval), + getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)), + ) + go gettyClient.RunEventLoop(g.newSession) + } +} + +func (g *SessionManager) getAvailServerList() []*discovery.ServiceInstance { + registryService := discovery.GetRegistry() + instances, err := registryService.Lookup(config.GetSeataConfig().TxServiceGroup) + if err != nil { + return nil + } + return instances +} + +func (g *SessionManager) setSessionConfig(session getty.Session) { + session.SetName(g.gettyConf.SessionConfig.SessionName) + session.SetMaxMsgLen(g.gettyConf.SessionConfig.MaxMsgLen) + session.SetPkgHandler(rpcPkgHandler) + session.SetEventListener(GetGettyClientHandlerInstance()) + session.SetReadTimeout(g.gettyConf.SessionConfig.TCPReadTimeout) + session.SetWriteTimeout(g.gettyConf.SessionConfig.TCPWriteTimeout) + session.SetCronPeriod((int)(g.gettyConf.SessionConfig.CronPeriod.Milliseconds())) + session.SetWaitTime(g.gettyConf.SessionConfig.WaitTimeout) + session.SetAttribute(heartBeatRetryTimesKey, 0) +} + +func (g *SessionManager) newSession(session getty.Session) error { + var ( + ok bool + tcpConn *net.TCPConn + err error + ) + + if g.gettyConf.SessionConfig.CompressEncoding { + session.SetCompressType(getty.CompressZip) } + if _, ok = session.Conn().(*tls.Conn); ok { + g.setSessionConfig(session) + log.Debugf("server accepts new tls session:%s\n", session.Stat()) + return nil + } + if _, ok = session.Conn().(*net.TCPConn); !ok { + panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp connection\n", session.Stat(), session.Conn())) + } + + if _, ok = session.Conn().(*tls.Conn); !ok { + if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { + return fmt.Errorf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()) + } + + if err = tcpConn.SetNoDelay(g.gettyConf.SessionConfig.TCPNoDelay); err != nil { + return err + } + if err = tcpConn.SetKeepAlive(g.gettyConf.SessionConfig.TCPKeepAlive); err != nil { + return err + } + if g.gettyConf.SessionConfig.TCPKeepAlive { + if err = tcpConn.SetKeepAlivePeriod(g.gettyConf.SessionConfig.KeepAlivePeriod); err != nil { + return err + } + } + if err = tcpConn.SetReadBuffer(g.gettyConf.SessionConfig.TCPRBufSize); err != nil { + return err + } + if err = tcpConn.SetWriteBuffer(g.gettyConf.SessionConfig.TCPWBufSize); err != nil { + return err + } + } + + g.setSessionConfig(session) + log.Debugf("rpc_client new session:%s\n", session.Stat()) + + return nil } func (g *SessionManager) selectSession(msg interface{}) getty.Session { diff --git a/pkg/remoting/processor/client/client_on_response_processor.go b/pkg/remoting/processor/client/client_on_response_processor.go index 55644b770..ada449614 100644 --- a/pkg/remoting/processor/client/client_on_response_processor.go +++ b/pkg/remoting/processor/client/client_on_response_processor.go @@ -46,27 +46,28 @@ type clientOnResponseProcessor struct{} func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error { log.Infof("the rm client received clientOnResponse msg %#v from tc server.", rpcMessage) + gettyRemotingClient := getty.GetGettyRemotingClient() if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok { - mergedMessage := getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID) + mergedMessage := gettyRemotingClient.GetMergedMessage(rpcMessage.ID) if mergedMessage != nil { for i := 0; i < len(mergedMessage.Msgs); i++ { msgID := mergedMessage.MsgIds[i] - response := getty.GetGettyRemotingInstance().GetMessageFuture(msgID) + response := gettyRemotingClient.GetMessageFuture(msgID) if response != nil { response.Response = mergedResult.Msgs[i] response.Done <- struct{}{} - getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID) + gettyRemotingClient.RemoveMessageFuture(msgID) } } - getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID) + gettyRemotingClient.RemoveMergedMessageFuture(rpcMessage.ID) } return nil } else { // 如果是请求消息,做处理逻辑 - msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID) + msgFuture := gettyRemotingClient.GetMessageFuture(rpcMessage.ID) if msgFuture != nil { - getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage) - getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID) + gettyRemotingClient.NotifyRpcMessageResponse(rpcMessage) + gettyRemotingClient.RemoveMessageFuture(rpcMessage.ID) } else { if _, ok := rpcMessage.Body.(message.AbstractResultMessage); ok { log.Infof("the rm client received response msg [{}] from tc server.", msgFuture)