Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #69 from weichang-bianjie/dev
Browse files Browse the repository at this point in the history
R4R: raise err and add sync logic about error resolve
  • Loading branch information
weichang-x authored Feb 8, 2021
2 parents bd0c26d + 1342191 commit 2ecaab6
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 81 deletions.
3 changes: 0 additions & 3 deletions block/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ var (
Bech32PrefixConsAddr = conf.SvrConf.Bech32ChainPrefix + PrefixConsensus + PrefixAddress
// Bech32PrefixConsPub defines the Bech32 prefix of a consensus node public key
Bech32PrefixConsPub = conf.SvrConf.Bech32ChainPrefix + PrefixConsensus + PrefixPublic

TxStatusSuccess = "success"
TxStatusFail = "fail"
)

func init() {
Expand Down
68 changes: 29 additions & 39 deletions block/parse_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,14 @@ func SaveDocsWithTxn(blockDoc *model.Block, txs []*model.Tx, txMsgs []model.TxMs
return nil
}

func ParseBlock(b int64, client *pool.Client) (resBlock *model.Block, resTxs []*model.Tx, resTxMsgs []model.TxMsg, resErr error) {
func ParseBlock(b int64, client *pool.Client) (*model.Block, []*model.Tx, []model.TxMsg, error) {

defer func() {
if err := recover(); err != nil {
logger.Error("parse block fail", logger.Int64("height", b),
logger.Any("err", err))
resErr = fmt.Errorf("%v", err)
}
}()

resBlock = &model.Block{
Height: b,
CreateTime: time.Now().Unix(),
}

txs, msgs, err := ParseTxs(b, client)
if err != nil {
resErr = err
return
}

resTxs = txs
resTxMsgs = msgs

return
}

func ParseTxs(b int64, client *pool.Client) ([]*model.Tx, []model.TxMsg, error) {
ctx := context.Background()
resblock, err := client.Block(ctx, &b)
if err != nil {
Expand All @@ -118,40 +98,49 @@ func ParseTxs(b int64, client *pool.Client) ([]*model.Tx, []model.TxMsg, error)
resblock, err2 = client2.Block(ctx, &b)
client2.Release()
if err2 != nil {
return nil, nil, err2
return nil, nil, nil, utils.ConvertErr(b, "", "ParseBlock", err2)
}
}
blockDoc := model.Block{
Height: b,
CreateTime: time.Now().Unix(),
}
txs := make([]*model.Tx, 0, len(resblock.Block.Txs))
var docMsgs []model.TxMsg
for _, tx := range resblock.Block.Txs {
tx, msgs := ParseTx(tx, resblock.Block, client)
tx, msgs, err := ParseTx(tx, resblock.Block, client)
if err != nil {
return &blockDoc, txs, docMsgs, err
}
if tx.Height > 0 {
txs = append(txs, &tx)
docMsgs = append(docMsgs, msgs...)
}
}
return txs, docMsgs, nil
return &blockDoc, txs, docMsgs, nil
}

// parse iris tx from iris block result tx
func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.Tx, []model.TxMsg) {
func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.Tx, []model.TxMsg, error) {

var (
docMsgs []model.TxMsg
docTxMsgs []msgsdktypes.TxMsg
docTx model.Tx
actualFee msgsdktypes.Coin
)
height := block.Height
txHash := utils.BuildHex(txBytes.Hash())
authTx, err := codec.GetSigningTx(txBytes)
if err != nil {
logger.Error("TxDecoder have error", logger.String("err", err.Error()),
logger.Warn(err.Error(),
logger.String("errTag", "TxDecoder"),
logger.String("txhash", txHash),
logger.Int64("height", block.Height))
return docTx, docMsgs
return docTx, docMsgs, nil
}
fee := msgsdktypes.BuildFee(authTx.GetFee(), authTx.GetGas())
memo := authTx.GetMemo()
height := block.Height
txHash := utils.BuildHex(txBytes.Hash())
ctx := context.Background()
res, err := client.Tx(ctx, txBytes.Hash(), false)
if err != nil {
Expand All @@ -161,10 +150,7 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T
res, err1 = client2.Tx(ctx, txBytes.Hash(), false)
client2.Release()
if err1 != nil {
logger.Error("get txResult err",
logger.String("txHash", txHash),
logger.String("err", err1.Error()))
return docTx, docMsgs
return docTx, docMsgs, utils.ConvertErr(block.Height, txHash, "TxResult", err1)
}
}

Expand All @@ -181,9 +167,9 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T
Memo: memo,
TxIndex: res.Index,
}
docTx.Status = TxStatusSuccess
docTx.Status = utils.TxStatusSuccess
if res.TxResult.Code != 0 {
docTx.Status = TxStatusFail
docTx.Status = utils.TxStatusFail
docTx.Log = res.TxResult.Log

}
Expand All @@ -195,7 +181,7 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T

msgs := authTx.GetMsgs()
if len(msgs) == 0 {
return docTx, docMsgs
return docTx, docMsgs, nil
}
for i, v := range msgs {
msgDocInfo := HandleTxMsg(v)
Expand Down Expand Up @@ -237,15 +223,19 @@ func ParseTx(txBytes types.Tx, block *types.Block, client *pool.Client) (model.T
docTx.Msgs = docTxMsgs

// don't save txs which have not parsed
if docTx.TxHash == "" {
return model.Tx{}, docMsgs
if len(docTx.Addrs) == 0 {
logger.Warn(utils.NoSupportMsgTypeTag,
logger.String("errTag", "TxMsg"),
logger.String("txhash", txHash),
logger.Int64("height", height))
return docTx, docMsgs, nil
}

for i, _ := range docMsgs {
docMsgs[i].TxAddrs = docTx.Addrs
docMsgs[i].TxSigners = docTx.Signers
}
return docTx, docMsgs
return docTx, docMsgs, nil

}

Expand Down
8 changes: 5 additions & 3 deletions block/parse_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ func TestIris_Block_ParseIrisTx(t *testing.T) {
{
name: "test parse iris tx",
args: args{
b: 4435,
b: 559216,
client: client,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, msg, err := ParseTxs(tt.args.b, tt.args.client)
block, res, msg, err := ParseBlock(tt.args.b, tt.args.client)
if err != nil {
t.Fatal(err)
}
resBytes, _ := json.Marshal(res)
resBytes, _ := json.Marshal(block)
t.Log(string(resBytes))
resBytes, _ = json.Marshal(res)
t.Log(string(resBytes))
resBytes, _ = json.Marshal(msg)
t.Log(string(resBytes))
Expand Down
34 changes: 0 additions & 34 deletions script/mongodb.js

This file was deleted.

17 changes: 15 additions & 2 deletions task/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"os"
"strings"
"time"
)

Expand Down Expand Up @@ -171,8 +172,13 @@ func (s *TaskIrisService) executeTask(blockNumPerWorkerHandle, maxWorkerSleepTim
// parse data from block
blockDoc, txDocs, txMsgs, err := block.ParseBlock(inProcessBlock, client)
if err != nil {
logger.Error("Parse block fail", logger.Int64("block", inProcessBlock),
logger.Error("Parse block fail",
logger.Int64("height", inProcessBlock),
logger.String("errTag", utils.GetErrTag(err)),
logger.String("err", err.Error()))
//continue to assert task is valid
blockChainLatestHeight, isValid = assertTaskValid(task, blockNumPerWorkerHandle)
continue
}

// check task owner
Expand All @@ -192,7 +198,14 @@ func (s *TaskIrisService) executeTask(blockNumPerWorkerHandle, maxWorkerSleepTim

err := block.SaveDocsWithTxn(blockDoc, txDocs, txMsgs, taskDoc)
if err != nil {
logger.Error("save docs fail", logger.String("err", err.Error()))
if !strings.Contains(err.Error(), utils.ErrDbNotFindTransaction) {
logger.Error("save docs fail",
logger.Int64("height", inProcessBlock),
logger.String("err", err.Error()))
//continue to assert task is valid
blockChainLatestHeight, isValid = assertTaskValid(task, blockNumPerWorkerHandle)
continue
}
} else {
task.CurrentHeight = inProcessBlock
}
Expand Down
10 changes: 10 additions & 0 deletions utils/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package utils

const (
TxStatusSuccess = "success"
TxStatusFail = "fail"
NoSupportMsgTypeTag = "no support msg parse"

//cannot find transaction 601bf70ccdee4dde1c8be0d2_f018677a in queue for document {sync_task ObjectIdHex(\"601bdb0ccdee4dd7c214d167\")}
ErrDbNotFindTransaction = "cannot find transaction"
)
12 changes: 12 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ func BuildHex(bytes []byte) string {
return strings.ToUpper(hex.EncodeToString(bytes))
}

func ConvertErr(height int64, txHash, errTag string, err error) error {
return fmt.Errorf("%v-%v-%v-%v", err.Error(), errTag, height, txHash)
}

func GetErrTag(err error) string {
slice := strings.Split(err.Error(), "-")
if len(slice) == 4 {
return slice[2]
}
return ""
}

func Min(x, y int64) int64 {
if x < y {
return x
Expand Down

0 comments on commit 2ecaab6

Please sign in to comment.