From 0134d12292266914f6a37fb24e005cbbd40dfce4 Mon Sep 17 00:00:00 2001 From: huangweichang Date: Mon, 5 Jul 2021 18:07:47 +0800 Subject: [PATCH 1/3] refactor module parse replace if with route --- block/handler.go | 163 ----------------------- block/parse_tx.go | 41 ++++-- block/types.go | 8 -- block/util.go | 32 ----- lib/msgparser/msgparser.go | 263 ++++++++++++++++++++++++++++++++++++- lib/msgparser/router.go | 94 +++++++++++++ lib/msgparser/types.go | 48 +++++++ utils/utils.go | 14 ++ 8 files changed, 449 insertions(+), 214 deletions(-) delete mode 100644 block/handler.go delete mode 100644 block/types.go delete mode 100644 block/util.go create mode 100644 lib/msgparser/router.go create mode 100644 lib/msgparser/types.go diff --git a/block/handler.go b/block/handler.go deleted file mode 100644 index a907104..0000000 --- a/block/handler.go +++ /dev/null @@ -1,163 +0,0 @@ -package block - -import ( - "github.com/irisnet/rainbow-sync/lib/msgparser" - . "github.com/kaifei-bianjie/msg-parser/modules" - "github.com/kaifei-bianjie/msg-parser/modules/bank" - "github.com/kaifei-bianjie/msg-parser/modules/coinswap" - "github.com/kaifei-bianjie/msg-parser/modules/distribution" - "github.com/kaifei-bianjie/msg-parser/modules/ibc" - "github.com/kaifei-bianjie/msg-parser/modules/staking" - "github.com/kaifei-bianjie/msg-parser/types" - "strings" -) - -func HandleTxMsg(v types.SdkMsg) CustomMsgDocInfo { - var ( - msgDoc CustomMsgDocInfo - denoms []string - ) - if bankDocInfo, ok := msgparser.MsgClient.Bank.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = bankDocInfo - switch bankDocInfo.DocTxMsg.Type { - case MsgTypeSend: - doc := bankDocInfo.DocTxMsg.Msg.(*bank.DocMsgSend) - denoms = parseDenoms(doc.Amount) - case MsgTypeMultiSend: - doc := bankDocInfo.DocTxMsg.Msg.(*bank.DocMsgMultiSend) - if len(doc.Inputs) > 0 { - for _, v := range doc.Inputs { - denoms = append(denoms, parseDenoms(v.Coins)...) - } - } - if len(doc.Outputs) > 0 { - for _, v := range doc.Outputs { - denoms = append(denoms, parseDenoms(v.Coins)...) - } - } - } - msgDoc.Denoms = removeDuplicatesFromSlice(denoms) - return msgDoc - } - if iServiceDocInfo, ok := msgparser.MsgClient.Service.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = iServiceDocInfo - return msgDoc - } - if nftDocInfo, ok := msgparser.MsgClient.Nft.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = nftDocInfo - return msgDoc - } - if recordDocInfo, ok := msgparser.MsgClient.Record.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = recordDocInfo - return msgDoc - } - if tokenDocInfo, ok := msgparser.MsgClient.Token.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = tokenDocInfo - return msgDoc - } - if coinswapDocInfo, ok := msgparser.MsgClient.Coinswap.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = coinswapDocInfo - switch coinswapDocInfo.DocTxMsg.Type { - case MsgTypeSwapOrder: - doc := coinswapDocInfo.DocTxMsg.Msg.(*coinswap.DocTxMsgSwapOrder) - denoms = append(denoms, parseDenoms([]types.Coin{doc.Input.Coin})...) - denoms = append(denoms, parseDenoms([]types.Coin{doc.Output.Coin})...) - case MsgTypeAddLiquidity: - doc := coinswapDocInfo.DocTxMsg.Msg.(*coinswap.DocTxMsgAddLiquidity) - denoms = append(denoms, parseDenoms([]types.Coin{doc.MaxToken})...) - case MsgTypeRemoveLiquidity: - doc := coinswapDocInfo.DocTxMsg.Msg.(*coinswap.DocTxMsgRemoveLiquidity) - denoms = append(denoms, parseDenoms([]types.Coin{doc.WithdrawLiquidity})...) - } - msgDoc.Denoms = removeDuplicatesFromSlice(denoms) - return msgDoc - } - if crisisDocInfo, ok := msgparser.MsgClient.Crisis.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = crisisDocInfo - return msgDoc - } - if distrubutionDocInfo, ok := msgparser.MsgClient.Distribution.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = distrubutionDocInfo - switch distrubutionDocInfo.DocTxMsg.Type { - case MsgTypeMsgFundCommunityPool: - doc := distrubutionDocInfo.DocTxMsg.Msg.(*distribution.DocTxMsgFundCommunityPool) - denoms = append(denoms, parseDenoms(doc.Amount)...) - case MsgTypeWithdrawDelegatorReward: - case MsgTypeMsgWithdrawValidatorCommission: - break - } - msgDoc.Denoms = removeDuplicatesFromSlice(denoms) - return msgDoc - } - if slashingDocInfo, ok := msgparser.MsgClient.Slashing.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = slashingDocInfo - return msgDoc - } - if evidenceDocInfo, ok := msgparser.MsgClient.Evidence.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = evidenceDocInfo - return msgDoc - } - if htlcDocInfo, ok := msgparser.MsgClient.Htlc.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = htlcDocInfo - return msgDoc - } - if stakingDocInfo, ok := msgparser.MsgClient.Staking.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = stakingDocInfo - switch stakingDocInfo.DocTxMsg.Type { - case MsgTypeStakeDelegate: - doc := stakingDocInfo.DocTxMsg.Msg.(*staking.DocTxMsgDelegate) - denoms = append(denoms, parseDenoms(convertCoins([]Coin{doc.Amount}))...) - case MsgTypeStakeBeginUnbonding: - doc := stakingDocInfo.DocTxMsg.Msg.(*staking.DocTxMsgBeginUnbonding) - denoms = append(denoms, parseDenoms([]types.Coin{doc.Amount})...) - case MsgTypeBeginRedelegate: - doc := stakingDocInfo.DocTxMsg.Msg.(*staking.DocTxMsgBeginRedelegate) - denoms = append(denoms, parseDenoms([]types.Coin{doc.Amount})...) - } - msgDoc.Denoms = removeDuplicatesFromSlice(denoms) - return msgDoc - } - if govDocInfo, ok := msgparser.MsgClient.Gov.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = govDocInfo - return msgDoc - } - if ibcDocInfo, ok := msgparser.MsgClient.Ibc.HandleTxMsg(v); ok { - msgDoc.MsgDocInfo = ibcDocInfo - switch ibcDocInfo.DocTxMsg.Type { - case MsgTypeIBCTransfer: - doc := ibcDocInfo.DocTxMsg.Msg.(*ibc.DocMsgTransfer) - denoms = append(denoms, doc.Token.Denom) - case MsgTypeTimeout: - doc := ibcDocInfo.DocTxMsg.Msg.(*ibc.DocMsgTimeout) - denom := doc.Packet.Data.Denom - if strings.Contains(denom, "/") { - denom = ibc.GetIbcPacketDenom(doc.Packet, doc.Packet.Data.Denom) - } - denoms = append(denoms, denom) - case MsgTypeRecvPacket: - doc := ibcDocInfo.DocTxMsg.Msg.(*ibc.DocMsgRecvPacket) - denom := doc.Packet.Data.Denom - if strings.Contains(denom, "/") { - denom = ibc.GetIbcPacketDenom(doc.Packet, doc.Packet.Data.Denom) - } - denoms = append(denoms, denom) - } - msgDoc.Denoms = removeDuplicatesFromSlice(denoms) - return msgDoc - } - return msgDoc -} - -func removeDuplicatesFromSlice(data []string) (result []string) { - tempSet := make(map[string]string, len(data)) - for _, val := range data { - if _, ok := tempSet[val]; ok || val == "" { - continue - } - tempSet[val] = val - } - for one := range tempSet { - result = append(result, one) - } - return -} diff --git a/block/parse_tx.go b/block/parse_tx.go index a43b2c2..90c2d8e 100644 --- a/block/parse_tx.go +++ b/block/parse_tx.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/irisnet/rainbow-sync/db" "github.com/irisnet/rainbow-sync/lib/logger" + "github.com/irisnet/rainbow-sync/lib/msgparser" "github.com/irisnet/rainbow-sync/lib/pool" "github.com/irisnet/rainbow-sync/model" "github.com/irisnet/rainbow-sync/utils" @@ -19,6 +20,30 @@ import ( "time" ) +var _parser msgparser.MsgParser + +func init() { + router := msgparser.RegisteRouter() + //if conf.Server.OnlySupportModule != "" { + // modules := strings.Split(conf.Server.OnlySupportModule, ",") + // msgRoute := msgparser.NewRouter() + // for _, one := range modules { + // fn, exist := msgparser.RouteHandlerMap[one] + // if !exist { + // logger.Fatal("no support module: " + one) + // } + // msgRoute = msgRoute.AddRoute(one, fn) + // if one == msgparser.IbcRouteKey { + // msgRoute = msgRoute.AddRoute(msgparser.IbcTransferRouteKey, msgparser.RouteHandlerMap[one]) + // } + // } + // if msgRoute.GetRoutesLen() > 0 { + // router = msgRoute + // } + // + //} + _parser = msgparser.NewMsgParser(router) +} func SaveDocsWithTxn(blockDoc *model.Block, txs []*model.Tx, txMsgs []model.TxMsg, taskDoc model.SyncTask) error { var ( ops, insertOps []txn.Op @@ -187,7 +212,7 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T return docTx, docMsgs, nil } for i, v := range msgs { - msgDocInfo := HandleTxMsg(v) + msgDocInfo := _parser.HandleTxMsg(v) if len(msgDocInfo.Addrs) == 0 { continue } @@ -208,8 +233,8 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T } } - docTx.Signers = append(docTx.Signers, removeDuplicatesFromSlice(msgDocInfo.Signers)...) - docTx.Addrs = append(docTx.Addrs, removeDuplicatesFromSlice(msgDocInfo.Addrs)...) + docTx.Signers = append(docTx.Signers, utils.RemoveDuplicatesFromSlice(msgDocInfo.Signers)...) + docTx.Addrs = append(docTx.Addrs, utils.RemoveDuplicatesFromSlice(msgDocInfo.Addrs)...) docTxMsgs = append(docTxMsgs, msgDocInfo.DocTxMsg) docTx.Types = append(docTx.Types, msgDocInfo.DocTxMsg.Type) @@ -231,15 +256,15 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T if val, ok := eventsIndexMap[i]; ok { docMsg.Events = val.Events } - docMsg.Addrs = removeDuplicatesFromSlice(msgDocInfo.Addrs) - docMsg.Signers = removeDuplicatesFromSlice(msgDocInfo.Signers) + docMsg.Addrs = utils.RemoveDuplicatesFromSlice(msgDocInfo.Addrs) + docMsg.Signers = utils.RemoveDuplicatesFromSlice(msgDocInfo.Signers) docMsg.Denoms = msgDocInfo.Denoms docMsgs = append(docMsgs, docMsg) } - docTx.Addrs = removeDuplicatesFromSlice(docTx.Addrs) - docTx.Types = removeDuplicatesFromSlice(docTx.Types) - docTx.Signers = removeDuplicatesFromSlice(docTx.Signers) + docTx.Addrs = utils.RemoveDuplicatesFromSlice(docTx.Addrs) + docTx.Types = utils.RemoveDuplicatesFromSlice(docTx.Types) + docTx.Signers = utils.RemoveDuplicatesFromSlice(docTx.Signers) docTx.Msgs = docTxMsgs // don't save txs which have not parsed diff --git a/block/types.go b/block/types.go deleted file mode 100644 index 59586fd..0000000 --- a/block/types.go +++ /dev/null @@ -1,8 +0,0 @@ -package block - -import m "github.com/kaifei-bianjie/msg-parser/modules" - -type CustomMsgDocInfo struct { - m.MsgDocInfo - Denoms []string -} diff --git a/block/util.go b/block/util.go deleted file mode 100644 index 536060b..0000000 --- a/block/util.go +++ /dev/null @@ -1,32 +0,0 @@ -package block - -import "github.com/kaifei-bianjie/msg-parser/types" -import m "github.com/kaifei-bianjie/msg-parser/modules" - -func parseDenoms(coins []types.Coin) []string { - if len(coins) == 0 { - return nil - } - var denoms []string - for _, v := range coins { - denoms = append(denoms, v.Denom) - } - - return denoms -} - -// convert coins defined in modules to coins defined in types -func convertCoins(mCoins []m.Coin) types.Coins { - var coins types.Coins - if len(mCoins) == 0 { - return coins - } - for _, v := range mCoins { - coins = append(coins, types.Coin{ - Denom: v.Denom, - Amount: v.Amount, - }) - } - - return coins -} diff --git a/lib/msgparser/msgparser.go b/lib/msgparser/msgparser.go index ad3cd68..7d64761 100644 --- a/lib/msgparser/msgparser.go +++ b/lib/msgparser/msgparser.go @@ -1,11 +1,268 @@ package msgparser -import "github.com/kaifei-bianjie/msg-parser" +import ( + "github.com/irisnet/rainbow-sync/lib/logger" + "github.com/irisnet/rainbow-sync/utils" + "github.com/kaifei-bianjie/msg-parser" + . "github.com/kaifei-bianjie/msg-parser/modules" + "github.com/kaifei-bianjie/msg-parser/modules/bank" + "github.com/kaifei-bianjie/msg-parser/modules/coinswap" + "github.com/kaifei-bianjie/msg-parser/modules/distribution" + "github.com/kaifei-bianjie/msg-parser/modules/ibc" + "github.com/kaifei-bianjie/msg-parser/modules/staking" + "github.com/kaifei-bianjie/msg-parser/types" + "strings" +) + +type MsgParser interface { + HandleTxMsg(v types.SdkMsg) CustomMsgDocInfo +} var ( - MsgClient msg_parser.MsgClient + _client msg_parser.MsgClient ) +func NewMsgParser(router Router) MsgParser { + return &msgParser{ + rh: router, + } +} + +type msgParser struct { + rh Router +} + +func (parser *msgParser) HandleTxMsg(v types.SdkMsg) CustomMsgDocInfo { + handleFunc, err := parser.rh.GetRoute(v.Route()) + if err != nil { + logger.Error(err.Error(), + logger.String("route", v.Route()), + logger.String("type", v.Type())) + return CustomMsgDocInfo{} + } + return handleFunc(v) +} + func init() { - MsgClient = msg_parser.NewMsgClient() + _client = msg_parser.NewMsgClient() +} + +func handleBank(v types.SdkMsg) CustomMsgDocInfo { + var ( + msgDoc CustomMsgDocInfo + denoms []string + ) + bankDocInfo, _ := _client.Bank.HandleTxMsg(v) + msgDoc.MsgDocInfo = bankDocInfo + switch bankDocInfo.DocTxMsg.Type { + case MsgTypeSend: + doc := bankDocInfo.DocTxMsg.Msg.(*bank.DocMsgSend) + denoms = parseDenoms(doc.Amount) + case MsgTypeMultiSend: + doc := bankDocInfo.DocTxMsg.Msg.(*bank.DocMsgMultiSend) + if len(doc.Inputs) > 0 { + for _, v := range doc.Inputs { + denoms = append(denoms, parseDenoms(v.Coins)...) + } + } + if len(doc.Outputs) > 0 { + for _, v := range doc.Outputs { + denoms = append(denoms, parseDenoms(v.Coins)...) + } + } + } + msgDoc.Denoms = utils.RemoveDuplicatesFromSlice(denoms) + return msgDoc +} +func handleCrisis(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Crisis.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} +func handleDistribution(v types.SdkMsg) CustomMsgDocInfo { + var ( + msgDoc CustomMsgDocInfo + denoms []string + ) + distrubutionDocInfo, _ := _client.Distribution.HandleTxMsg(v) + msgDoc.MsgDocInfo = distrubutionDocInfo + switch distrubutionDocInfo.DocTxMsg.Type { + case MsgTypeMsgFundCommunityPool: + doc := distrubutionDocInfo.DocTxMsg.Msg.(*distribution.DocTxMsgFundCommunityPool) + denoms = append(denoms, parseDenoms(doc.Amount)...) + case MsgTypeWithdrawDelegatorReward: + case MsgTypeMsgWithdrawValidatorCommission: + break + } + msgDoc.Denoms = utils.RemoveDuplicatesFromSlice(denoms) + return msgDoc +} +func handleSlashing(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Slashing.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} +func handleStaking(v types.SdkMsg) CustomMsgDocInfo { + var ( + msgDoc CustomMsgDocInfo + denoms []string + ) + stakingDocInfo, _ := _client.Staking.HandleTxMsg(v) + msgDoc.MsgDocInfo = stakingDocInfo + switch stakingDocInfo.DocTxMsg.Type { + case MsgTypeStakeDelegate: + doc := stakingDocInfo.DocTxMsg.Msg.(*staking.DocTxMsgDelegate) + denoms = append(denoms, parseDenoms(convertCoins([]Coin{doc.Amount}))...) + case MsgTypeStakeBeginUnbonding: + doc := stakingDocInfo.DocTxMsg.Msg.(*staking.DocTxMsgBeginUnbonding) + denoms = append(denoms, parseDenoms([]types.Coin{doc.Amount})...) + case MsgTypeBeginRedelegate: + doc := stakingDocInfo.DocTxMsg.Msg.(*staking.DocTxMsgBeginRedelegate) + denoms = append(denoms, parseDenoms([]types.Coin{doc.Amount})...) + } + msgDoc.Denoms = utils.RemoveDuplicatesFromSlice(denoms) + return msgDoc +} +func handleEvidence(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Evidence.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} +func handleGov(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Gov.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} +func handleIbc(v types.SdkMsg) CustomMsgDocInfo { + var ( + msgDoc CustomMsgDocInfo + denoms []string + ) + ibcDocInfo, _ := _client.Ibc.HandleTxMsg(v) + msgDoc.MsgDocInfo = ibcDocInfo + switch ibcDocInfo.DocTxMsg.Type { + case MsgTypeIBCTransfer: + doc := ibcDocInfo.DocTxMsg.Msg.(*ibc.DocMsgTransfer) + denoms = append(denoms, doc.Token.Denom) + case MsgTypeTimeout: + doc := ibcDocInfo.DocTxMsg.Msg.(*ibc.DocMsgTimeout) + denom := doc.Packet.Data.Denom + if strings.Contains(denom, "/") { + denom = ibc.GetIbcPacketDenom(doc.Packet, doc.Packet.Data.Denom) + } + denoms = append(denoms, denom) + case MsgTypeRecvPacket: + doc := ibcDocInfo.DocTxMsg.Msg.(*ibc.DocMsgRecvPacket) + denom := doc.Packet.Data.Denom + if strings.Contains(denom, "/") { + denom = ibc.GetIbcPacketDenom(doc.Packet, doc.Packet.Data.Denom) + } + denoms = append(denoms, denom) + } + msgDoc.Denoms = utils.RemoveDuplicatesFromSlice(denoms) + return msgDoc +} + +func handleNft(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Nft.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleService(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Service.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleToken(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Token.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleOracle(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Oracle.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleRecord(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Record.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleRandom(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Random.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleHtlc(v types.SdkMsg) CustomMsgDocInfo { + docInfo, _ := _client.Htlc.HandleTxMsg(v) + var msgDoc CustomMsgDocInfo + msgDoc.MsgDocInfo = docInfo + return msgDoc +} + +func handleCoinswap(v types.SdkMsg) CustomMsgDocInfo { + var ( + msgDoc CustomMsgDocInfo + denoms []string + ) + coinswapDocInfo, _ := _client.Coinswap.HandleTxMsg(v) + msgDoc.MsgDocInfo = coinswapDocInfo + switch coinswapDocInfo.DocTxMsg.Type { + case MsgTypeSwapOrder: + doc := coinswapDocInfo.DocTxMsg.Msg.(*coinswap.DocTxMsgSwapOrder) + denoms = append(denoms, parseDenoms([]types.Coin{doc.Input.Coin})...) + denoms = append(denoms, parseDenoms([]types.Coin{doc.Output.Coin})...) + case MsgTypeAddLiquidity: + doc := coinswapDocInfo.DocTxMsg.Msg.(*coinswap.DocTxMsgAddLiquidity) + denoms = append(denoms, parseDenoms([]types.Coin{doc.MaxToken})...) + case MsgTypeRemoveLiquidity: + doc := coinswapDocInfo.DocTxMsg.Msg.(*coinswap.DocTxMsgRemoveLiquidity) + denoms = append(denoms, parseDenoms([]types.Coin{doc.WithdrawLiquidity})...) + } + msgDoc.Denoms = utils.RemoveDuplicatesFromSlice(denoms) + return msgDoc +} + +func parseDenoms(coins []types.Coin) []string { + if len(coins) == 0 { + return nil + } + var denoms []string + for _, v := range coins { + denoms = append(denoms, v.Denom) + } + + return denoms +} + +// convert coins defined in modules to coins defined in types +func convertCoins(mCoins []Coin) types.Coins { + var coins types.Coins + if len(mCoins) == 0 { + return coins + } + for _, v := range mCoins { + coins = append(coins, types.Coin{ + Denom: v.Denom, + Amount: v.Amount, + }) + } + + return coins } diff --git a/lib/msgparser/router.go b/lib/msgparser/router.go new file mode 100644 index 0000000..c6feb12 --- /dev/null +++ b/lib/msgparser/router.go @@ -0,0 +1,94 @@ +package msgparser + +import ( + "fmt" + "github.com/irisnet/rainbow-sync/lib/logger" + "github.com/kaifei-bianjie/msg-parser/types" + "regexp" +) + +var ( + // IsAlphaNumeric defines a regular expression for matching against alpha-numeric + // values. + IsAlphaNumeric = regexp.MustCompile(`^[a-zA-Z0-9]+$`).MatchString +) + +type Handler func(v types.SdkMsg) CustomMsgDocInfo + +var _ Router = (*router)(nil) + +// Router implements a msg-parser Handler router. +type Router interface { + AddRoute(r string, h Handler) (rtr Router) + HasRoute(r string) bool + GetRoute(path string) (h Handler, err error) + GetRoutesLen() int +} + +type router struct { + routes map[string]Handler +} + +// NewRouter creates a new Router interface instance +func NewRouter() Router { + return &router{ + routes: make(map[string]Handler), + } +} + +// AddRoute adds a governance handler for a given path. It returns the Router +// so AddRoute calls can be linked. It will panic if the router is sealed. +func (rtr *router) AddRoute(path string, h Handler) Router { + + if !IsAlphaNumeric(path) { + logger.Warn("addroute failed for route expressions can only contain alphanumeric characters") + return rtr + } + if rtr.HasRoute(path) { + logger.Warn(fmt.Sprintf("route %s has already been initialized", path)) + return rtr + } + + rtr.routes[path] = h + return rtr +} + +// HasRoute returns true if the router has a path registered or false otherwise. +func (rtr *router) HasRoute(path string) bool { + return rtr.routes[path] != nil +} + +// GetRoute returns a Handler for a given path. +func (rtr *router) GetRoute(path string) (Handler, error) { + if !rtr.HasRoute(path) { + return nil, fmt.Errorf("route \"%s\" does not exist", path) + } + + return rtr.routes[path], nil +} + +func (rtr *router) GetRoutesLen() int { + return len(rtr.routes) +} + +func RegisteRouter() Router { + msgRoute := NewRouter() + msgRoute.AddRoute(BankRouteKey, handleBank). + AddRoute(StakingRouteKey, handleStaking). + AddRoute(DistributionRouteKey, handleDistribution). + AddRoute(CrisisRouteKey, handleCrisis). + AddRoute(EvidenceRouteKey, handleEvidence). + AddRoute(GovRouteKey, handleGov). + AddRoute(SlashingRouteKey, handleSlashing). + AddRoute(IbcRouteKey, handleIbc). + AddRoute(IbcTransferRouteKey, handleIbc). + AddRoute(NftRouteKey, handleNft). + AddRoute(ServiceRouteKey, handleService). + AddRoute(TokenRouteKey, handleToken). + AddRoute(HtlcRouteKey, handleHtlc). + AddRoute(CoinswapRouteKey, handleCoinswap). + AddRoute(RandomRouteKey, handleRandom). + AddRoute(OracleRouteKey, handleOracle). + AddRoute(RecordRouteKey, handleRecord) + return msgRoute +} diff --git a/lib/msgparser/types.go b/lib/msgparser/types.go new file mode 100644 index 0000000..35ddb55 --- /dev/null +++ b/lib/msgparser/types.go @@ -0,0 +1,48 @@ +package msgparser + +import m "github.com/kaifei-bianjie/msg-parser/modules" + +const ( + BankRouteKey string = "bank" + StakingRouteKey string = "staking" + DistributionRouteKey string = "distribution" + CrisisRouteKey string = "crisis" + EvidenceRouteKey string = "evidence" + GovRouteKey string = "gov" + SlashingRouteKey string = "slashing" + IbcRouteKey string = "ibc" + IbcTransferRouteKey string = "transfer" + NftRouteKey string = "nft" + ServiceRouteKey string = "service" + TokenRouteKey string = "token" + HtlcRouteKey string = "htlc" + CoinswapRouteKey string = "coinswap" + RandomRouteKey string = "random" + OracleRouteKey string = "oracle" + RecordRouteKey string = "record" +) + +var RouteHandlerMap = map[string]Handler{ + BankRouteKey: handleBank, + StakingRouteKey: handleStaking, + DistributionRouteKey: handleDistribution, + CrisisRouteKey: handleCrisis, + EvidenceRouteKey: handleEvidence, + GovRouteKey: handleGov, + SlashingRouteKey: handleSlashing, + IbcRouteKey: handleIbc, + IbcTransferRouteKey: handleIbc, + NftRouteKey: handleNft, + ServiceRouteKey: handleService, + TokenRouteKey: handleToken, + HtlcRouteKey: handleHtlc, + CoinswapRouteKey: handleCoinswap, + RandomRouteKey: handleRandom, + OracleRouteKey: handleOracle, + RecordRouteKey: handleRecord, +} + +type CustomMsgDocInfo struct { + m.MsgDocInfo + Denoms []string +} diff --git a/utils/utils.go b/utils/utils.go index 5e43418..af7ce00 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -74,3 +74,17 @@ func MarshalJsonIgnoreErr(v interface{}) string { func UnMarshalJsonIgnoreErr(data string, v interface{}) { json.Unmarshal([]byte(data), &v) } + +func RemoveDuplicatesFromSlice(data []string) (result []string) { + tempSet := make(map[string]string, len(data)) + for _, val := range data { + if _, ok := tempSet[val]; ok || val == "" { + continue + } + tempSet[val] = val + } + for one := range tempSet { + result = append(result, one) + } + return +} From 874814e3732a761a7f0eee059e1014631337db7f Mon Sep 17 00:00:00 2001 From: huangweichang Date: Tue, 6 Jul 2021 17:40:42 +0800 Subject: [PATCH 2/3] add sync task woking status metric and fix nodetimeGap --- model/sync_task.go | 30 +++++++++++++++++++++++++++++- monitor/monitor.go | 34 ++++++++++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/model/sync_task.go b/model/sync_task.go index 4bf6391..cfc8d2a 100644 --- a/model/sync_task.go +++ b/model/sync_task.go @@ -1,9 +1,9 @@ package model import ( + "github.com/irisnet/rainbow-sync/db" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" - "github.com/irisnet/rainbow-sync/db" "time" ) @@ -223,3 +223,31 @@ func (d SyncTask) UpdateLastUpdateTime(task SyncTask) error { return db.ExecCollection(d.Name(), fn) } + +// query valid follow way +func (d SyncTask) QueryValidFollowTasks() (bool, error) { + var syncTasks []SyncTask + q := bson.M{} + + q["status"] = db.SyncTaskStatusUnderway + + q["end_height"] = bson.M{ + "$eq": 0, + } + + fn := func(c *mgo.Collection) error { + return c.Find(q).All(&syncTasks) + } + + err := db.ExecCollection(d.Name(), fn) + + if err != nil { + return false, err + } + + if len(syncTasks) == 1 { + return true, nil + } + + return false, nil +} diff --git a/monitor/monitor.go b/monitor/monitor.go index 768ccd8..f46e5e1 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -17,6 +17,9 @@ const ( NodeStatusNotReachable = 0 NodeStatusSyncing = 1 NodeStatusCatchingUp = 2 + + SyncTaskFollowing = 1 + SyncTaskCatchingUp = 0 ) type clientNode struct { @@ -24,6 +27,7 @@ type clientNode struct { nodeHeight metrics.Guage dbHeight metrics.Guage nodeTimeGap metrics.Guage + syncWorkWay metrics.Guage } func NewMetricNode(server metrics.Monitor) clientNode { @@ -55,22 +59,31 @@ func NewMetricNode(server metrics.Monitor) clientNode { "the seconds gap between node block time with sync db block time", nil, ) - server.RegisterMetrics(nodeHeightMetric, dbHeightMetric, nodeStatusMetric, nodeTimeGapMetric) + syncWorkwayMetric := metrics.NewGuage( + "sync", + "", + "task_working_status", + "sync task working status(0:CatchingUp 1:Following)", + nil, + ) + server.RegisterMetrics(nodeHeightMetric, dbHeightMetric, nodeStatusMetric, nodeTimeGapMetric, syncWorkwayMetric) nodeHeight, _ := metrics.CovertGuage(nodeHeightMetric) dbHeight, _ := metrics.CovertGuage(dbHeightMetric) nodeStatus, _ := metrics.CovertGuage(nodeStatusMetric) nodeTimeGap, _ := metrics.CovertGuage(nodeTimeGapMetric) + syncWorkway, _ := metrics.CovertGuage(syncWorkwayMetric) return clientNode{ nodeStatus: nodeStatus, nodeHeight: nodeHeight, dbHeight: dbHeight, nodeTimeGap: nodeTimeGap, + syncWorkWay: syncWorkway, } } func (node *clientNode) Report() { for { - t := time.NewTimer(time.Duration(5) * time.Second) + t := time.NewTimer(time.Duration(10) * time.Second) select { case <-t.C: node.nodeStatusReport() @@ -106,8 +119,21 @@ func (node *clientNode) nodeStatusReport() { node.nodeStatus.Set(float64(NodeStatusSyncing)) } node.nodeHeight.Set(float64(status.SyncInfo.LatestBlockHeight)) - timeGap := status.SyncInfo.LatestBlockTime.Unix() - block.CreateTime - node.nodeTimeGap.Set(float64(timeGap)) + follow, err := new(model.SyncTask).QueryValidFollowTasks() + if err != nil { + logger.Error("query valid follow task exception", logger.String("error", err.Error())) + return + } + if follow && block.CreateTime > 0 { + timeGap := time.Now().Unix() - block.CreateTime + node.nodeTimeGap.Set(float64(timeGap)) + } + + if !follow { + node.syncWorkWay.Set(float64(SyncTaskFollowing)) + } else { + node.syncWorkWay.Set(float64(SyncTaskCatchingUp)) + } return } From 84d6aea1239f1eecd4fa0f27d64a82280a2132bd Mon Sep 17 00:00:00 2001 From: huangweichang Date: Tue, 6 Jul 2021 17:43:27 +0800 Subject: [PATCH 3/3] fix node_seconds_gap metric help content --- monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor/monitor.go b/monitor/monitor.go index f46e5e1..72a10d0 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -56,7 +56,7 @@ func NewMetricNode(server metrics.Monitor) clientNode { "sync", "status", "node_seconds_gap", - "the seconds gap between node block time with sync db block time", + "the seconds gap between running env current time with sync db block time", nil, ) syncWorkwayMetric := metrics.NewGuage(