Skip to content

Commit

Permalink
refactor: refact getty module (#746)
Browse files Browse the repository at this point in the history
* feat:add more linter

* feat:change golangclilint version to 1.57.x to support more linter

* feat:adjust lint conf and adjust the code to pass the check

* style: format some code; fix: some sql statement or rows was not been closed

* feat:close session when send heart beat message failed

* refactor: change GettyRemoting from singleton to GettyRemotingClient member

* remove rpc client

* refactor: complete GettyRemoting refactoring

* refactor: change getter name, improve variable naming and remove unused fields

* update something

* feat:add heart-beat failed retry times

* Refactor getty.
>
>
Co-authored-by: marsevilspirit <[email protected]>
Co-authored-by: solisamicus <[email protected]>
Co-authored-by: No-SilverBullet  <[email protected]>

* Refactor getty.

Co-authored-by: marsevilspirit <[email protected]>
Co-authored-by: solisamicus <[email protected]>
Co-authored-by: No-SilverBullet  <[email protected]>

---------

Co-authored-by: JayLiu <[email protected]>
Co-authored-by: SolisAmicus <[email protected]>
Co-authored-by: marsevilspirit <[email protected]>
  • Loading branch information
4 people authored Dec 21, 2024
1 parent d7b6a4c commit 8ce1006
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 214 deletions.
8 changes: 5 additions & 3 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ func initTmClient(cfg *Config) {
})
}

// initRemoting init rpc client
// initRemoting init remoting
func initRemoting(cfg *Config) {
getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{
seataConfig := remoteConfig.SeataConfig{
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
ServiceGrouplist: cfg.ServiceConfig.Grouplist,
LoadBalanceType: cfg.GettyConfig.LoadBalanceType,
})
}

getty.InitGetty(&cfg.GettyConfig, &seataConfig)
}

// InitRmClient init client rm client
Expand Down
2 changes: 1 addition & 1 deletion pkg/remoting/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type SeataConfig struct {
LoadBalanceType string
}

func IniConfig(seataConf *SeataConfig) {
func InitConfig(seataConf *SeataConfig) {
seataConfig = seataConf
}

Expand Down
34 changes: 28 additions & 6 deletions pkg/remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ var (
)

type GettyRemotingClient struct {
idGenerator *atomic.Uint32
idGenerator *atomic.Uint32
gettyRemoting *GettyRemoting
}

func GetGettyRemotingClient() *GettyRemotingClient {
if gettyRemotingClient == nil {
onceGettyRemotingClient.Do(func() {
gettyRemotingClient = &GettyRemotingClient{
idGenerator: &atomic.Uint32{},
idGenerator: &atomic.Uint32{},
gettyRemoting: newGettyRemoting(),
}
})
}
Expand All @@ -63,7 +65,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback)
return client.gettyRemoting.SendAsync(rpcMessage, nil, client.asyncCallback)
}

func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error {
Expand All @@ -74,7 +76,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
return client.gettyRemoting.SendAsync(rpcMessage, nil, nil)
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
Expand All @@ -85,7 +87,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
return client.gettyRemoting.SendSync(rpcMessage, nil, client.syncCallback)
}

func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
Expand All @@ -96,10 +98,30 @@ func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *
func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(RpcRequestTimeout):
GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
g.gettyRemoting.RemoveMergedMessageFuture(reqMsg.ID)
log.Errorf("wait resp timeout: %#v", reqMsg)
return nil, fmt.Errorf("wait response timeout, request: %#v", reqMsg)
case <-respMsg.Done:
return respMsg.Response, respMsg.Err
}
}

func (client *GettyRemotingClient) GetMergedMessage(msgID int32) *message.MergedWarpMessage {
return client.gettyRemoting.GetMergedMessage(msgID)
}

func (client *GettyRemotingClient) GetMessageFuture(msgID int32) *message.MessageFuture {
return client.gettyRemoting.GetMessageFuture(msgID)
}

func (client *GettyRemotingClient) RemoveMessageFuture(msgID int32) {
client.gettyRemoting.RemoveMessageFuture(msgID)
}

func (client *GettyRemotingClient) RemoveMergedMessageFuture(msgID int32) {
client.gettyRemoting.RemoveMergedMessageFuture(msgID)
}

func (client *GettyRemotingClient) NotifyRpcMessageResponse(msg message.RpcMessage) {
client.gettyRemoting.NotifyRpcMessageResponse(msg)
}
6 changes: 3 additions & 3 deletions pkg/remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
},
},
}
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendSync",
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendSync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{},
error) {
return respMsg, nil
Expand All @@ -52,7 +52,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {

// TestGettyRemotingClient_SendAsyncResponse unit test for SendAsyncResponse function
func TestGettyRemotingClient_SendAsyncResponse(t *testing.T) {
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
return nil
})
Expand All @@ -77,7 +77,7 @@ func TestGettyRemotingClient_SendAsyncRequest(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
return nil
})
Expand Down
29 changes: 29 additions & 0 deletions pkg/remoting/getty/getty_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"seata.apache.org/seata-go/pkg/protocol/codec"
"seata.apache.org/seata-go/pkg/remoting/config"
)

func InitGetty(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
config.InitConfig(seataConfig)
codec.Init()
initSessionManager(gettyConfig)
}
20 changes: 5 additions & 15 deletions pkg/remoting/getty/getty_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ const (
RpcRequestTimeout = 20 * time.Second
)

var (
gettyRemoting *GettyRemoting
onceGettyRemoting = &sync.Once{}
)

type (
callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error)
GettyRemoting struct {
Expand All @@ -46,16 +41,11 @@ type (
}
)

func GetGettyRemotingInstance() *GettyRemoting {
if gettyRemoting == nil {
onceGettyRemoting.Do(func() {
gettyRemoting = &GettyRemoting{
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
}
})
func newGettyRemoting() *GettyRemoting {
return &GettyRemoting{
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
}
return gettyRemoting
}

func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
Expand All @@ -72,7 +62,7 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba
return result, err
}

func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession(msg)
}
Expand Down
36 changes: 20 additions & 16 deletions pkg/remoting/getty/getty_remoting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func TestGettyRemoting_GetMessageFuture(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.messageFuture != nil {
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Equal(t, *test.messageFuture, *messageFuture)
} else {
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, messageFuture)
}
})
Expand All @@ -78,13 +79,14 @@ func TestGettyRemoting_RemoveMessageFuture(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Equal(t, messageFuture, test.messageFuture)
GetGettyRemotingInstance().RemoveMessageFuture(test.msgID)
messageFuture = GetGettyRemotingInstance().GetMessageFuture(test.msgID)
gettyRemotingClient.gettyRemoting.RemoveMessageFuture(test.msgID)
messageFuture = gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, messageFuture)
})
}
Expand All @@ -110,14 +112,15 @@ func TestGettyRemoting_GetMergedMessage(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mergedWarpMessage != nil {
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Equal(t, *test.mergedWarpMessage, *mergedWarpMessage)
} else {
mergedWarpMessage := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, mergedWarpMessage)
}
})
Expand All @@ -144,18 +147,19 @@ func TestGettyRemoting_RemoveMergedMessageFuture(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mergedWarpMessage != nil {
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.NotEmpty(t, mergedWarpMessage)
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage = GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage = gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Empty(t, mergedWarpMessage)
} else {
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Empty(t, mergedWarpMessage)
}
})
Expand Down
39 changes: 22 additions & 17 deletions pkg/remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,16 @@ var (
)

type gettyClientHandler struct {
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
sessionManager *SessionManager
processorMap map[message.MessageType]processor.RemotingProcessor
idGenerator *atomic.Uint32
processorMap map[message.MessageType]processor.RemotingProcessor
}

func GetGettyClientHandlerInstance() *gettyClientHandler {
if clientHandler == nil {
onceClientHandler.Do(func() {
clientHandler = &gettyClientHandler{
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
sessionManager: sessionManager,
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
idGenerator: &atomic.Uint32{},
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
Expand All @@ -62,7 +56,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {

func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
g.sessionManager.registerSession(session)
sessionManager.registerSession(session)
conf := config.GetSeataConfig()
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Expand All @@ -73,7 +67,7 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {
err := GetGettyRemotingClient().SendAsyncRequest(request)
if err != nil {
log.Errorf("OnOpen error: {%#v}", err.Error())
g.sessionManager.releaseSession(session)
sessionManager.releaseSession(session)
return
}
}()
Expand All @@ -83,12 +77,12 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {

func (g *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
g.sessionManager.releaseSession(session)
sessionManager.releaseSession(session)
}

func (g *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
g.sessionManager.releaseSession(session)
sessionManager.releaseSession(session)
}

func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
Expand Down Expand Up @@ -117,8 +111,19 @@ func (g *gettyClientHandler) OnCron(session getty.Session) {
log.Debug("session{%s} Oncron executing", session.Stat())
err := g.transferHeartBeat(session, message.HeartBeatMessagePing)
if err != nil {
log.Errorf("failed to send heart beat: {%#v}", err.Error())
g.sessionManager.releaseSession(session)
log.Warnf("failed to send heart beat: {%#v}", err.Error())
if session.GetAttribute(heartBeatRetryTimesKey) != nil {
retryTimes := session.GetAttribute(heartBeatRetryTimesKey).(int)
if retryTimes >= maxHeartBeatRetryTimes {
log.Warnf("heartbeat retry times exceed default max retry times{%d}, close the session{%s}",
maxHeartBeatRetryTimes, session.Stat())
sessionManager.releaseSession(session)
return
}
session.SetAttribute(heartBeatRetryTimesKey, retryTimes+1)
} else {
session.SetAttribute(heartBeatRetryTimesKey, 1)
}
}
}

Expand All @@ -130,7 +135,7 @@ func (g *gettyClientHandler) transferHeartBeat(session getty.Session, msg messag
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
return GetGettyRemotingClient().gettyRemoting.SendAsync(rpcMessage, session, nil)
}

func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
Expand Down
Loading

0 comments on commit 8ce1006

Please sign in to comment.