Skip to content

Commit

Permalink
add more debug info in p2p (#521)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiang Fu authored and laizy committed Aug 9, 2018
1 parent 6178df7 commit 4e3ff98
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 267 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func initP2PNode(ctx *cli.Context, txpoolSvr *proc.TXPoolServer) (*p2pserver.P2P
txpoolSvr.RegisterActor(tc.NetActor, p2pPID)
hserver.SetNetServerPID(p2pPID)
p2p.WaitForPeersStart()
log.Infof("P2P node init success")
log.Infof("P2P init success")
return p2p, p2pPID, nil
}

Expand Down
8 changes: 4 additions & 4 deletions p2pserver/actor/req/txnpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func SetTxnPoolPid(txnPid *actor.PID) {
//add txn to txnpool
func AddTransaction(transaction *types.Transaction) {
if txnPoolPid == nil {
log.Error("net_server AddTransaction(): txnpool pid is nil")
log.Error("[p2p]net_server AddTransaction(): txnpool pid is nil")
return
}
txReq := &tc.TxReq{
Expand All @@ -55,13 +55,13 @@ func AddTransaction(transaction *types.Transaction) {
//get txn according to hash
func GetTransaction(hash common.Uint256) (*types.Transaction, error) {
if txnPoolPid == nil {
log.Error("net_server tx pool pid is nil")
return nil, errors.NewErr("net_server tx pool pid is nil")
log.Warn("[p2p]net_server tx pool pid is nil")
return nil, errors.NewErr("[p2p]net_server tx pool pid is nil")
}
future := txnPoolPid.RequestFuture(&tc.GetTxnReq{Hash: hash}, txnPoolReqTimeout)
result, err := future.Result()
if err != nil {
log.Errorf("net_server GetTransaction error: %v\n", err)
log.Warnf("[p2p]net_server GetTransaction error: %v\n", err)
return nil, err
}
return result.(tc.GetTxnRsp).Txn, nil
Expand Down
14 changes: 7 additions & 7 deletions p2pserver/actor/server/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func (this *P2PActor) Start() (*actor.PID, error) {
func (this *P2PActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Restarting:
log.Info("p2p actor restarting")
log.Warn("[p2p]actor restarting")
case *actor.Stopping:
log.Info("p2p actor stopping")
log.Warn("[p2p]actor stopping")
case *actor.Stopped:
log.Info("p2p actor stopped")
log.Warn("[p2p]actor stopped")
case *actor.Started:
log.Info("p2p actor started")
log.Debug("[p2p]actor started")
case *actor.Restart:
log.Info("p2p actor restart")
log.Warn("[p2p]actor restart")
case *StopServerReq:
this.handleStopServerReq(ctx, msg)
case *GetPortReq:
Expand Down Expand Up @@ -97,7 +97,7 @@ func (this *P2PActor) Receive(ctx actor.Context) {
default:
err := this.server.Xmit(ctx.Message())
if nil != err {
log.Error("error xmit message ", err.Error(), reflect.TypeOf(ctx.Message()))
log.Warn("[p2p]error xmit message ", err.Error(), reflect.TypeOf(ctx.Message()))
}
}
}
Expand Down Expand Up @@ -240,6 +240,6 @@ func (this *P2PActor) handleTransmitConsensusMsgReq(ctx actor.Context, req *Tran
if peer != nil {
this.server.Send(peer, req.Msg, true)
} else {
log.Errorf("handleTransmit consensus msg failed:%d", req.Target)
log.Warnf("[p2p]can`t transmit consensus msg:no valid neighbor peer: %d\n", req.Target)
}
}
33 changes: 15 additions & 18 deletions p2pserver/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (this *BlockSyncMgr) checkTimeout() {
}
flightInfo.ResetStartTime()
flightInfo.MarkFailedNode()
log.Infof("checkTimeout sync headers:%d timeout after:%d s Times:%d", height, SYNC_HEADER_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes())
log.Debugf("[p2p]checkTimeout sync headers:%d timeout after:%d s Times:%d", height, SYNC_HEADER_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes())
reqNode := this.getNodeWithMinFailedTimes(flightInfo, curBlockHeight)
if reqNode == nil {
break
Expand All @@ -303,7 +303,7 @@ func (this *BlockSyncMgr) checkTimeout() {
msg := msgpack.NewHeadersReq(headerHash)
err := this.server.Send(reqNode, msg, false)
if err != nil {
log.Error("checkTimeout failed build a new headersReq")
log.Warn("[p2p]checkTimeout failed to send a new headersReq:s", err)
} else {
this.appendReqTime(reqNode.GetID())
}
Expand All @@ -317,7 +317,7 @@ func (this *BlockSyncMgr) checkTimeout() {
}
flightInfo.ResetStartTime()
flightInfo.MarkFailedNode()
log.Debugf("checkTimeout sync height:%d block:0x%x timeout after:%d s times:%d", flightInfo.Height, blockHash, SYNC_BLOCK_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes())
log.Debugf("[p2p]checkTimeout sync height:%d block:0x%x timeout after:%d s times:%d", flightInfo.Height, blockHash, SYNC_BLOCK_REQUEST_TIMEOUT, flightInfo.GetTotalFailedTimes())
reqNode := this.getNodeWithMinFailedTimes(flightInfo, curBlockHeight)
if reqNode == nil {
break
Expand All @@ -327,15 +327,11 @@ func (this *BlockSyncMgr) checkTimeout() {
msg := msgpack.NewBlkDataReq(blockHash)
err := this.server.Send(reqNode, msg, false)
if err != nil {
log.Error("checkTimeout NewBlkDataReq error:", err)
log.Warnf("[p2p]checkTimeout reqNode ID:%d Send error:%s", reqNode.GetID(), err)
continue
} else {
this.appendReqTime(reqNode.GetID())
}

if err != nil {
log.Errorf("checkTimeout reqNode ID:%d Send error:%s", reqNode.GetID(), err)
continue
}
}
}
}
Expand Down Expand Up @@ -375,12 +371,12 @@ func (this *BlockSyncMgr) syncHeader() {
msg := msgpack.NewHeadersReq(headerHash)
err := this.server.Send(reqNode, msg, false)
if err != nil {
log.Error("syncHeader failed build a new headersReq")
log.Warn("[p2p]syncHeader failed to send a new headersReq")
} else {
this.appendReqTime(reqNode.GetID())
}

log.Infof("syncHeader request Height:%d", NextHeaderId)
log.Infof("Header sync request height:%d", NextHeaderId)
}

func (this *BlockSyncMgr) syncBlock() {
Expand Down Expand Up @@ -443,7 +439,7 @@ func (this *BlockSyncMgr) syncBlock() {
msg := msgpack.NewBlkDataReq(nextBlockHash)
err := this.server.Send(reqNode, msg, false)
if err != nil {
log.Errorf("syncBlock Height:%d ReqBlkData error:%s", nextBlockHeight, err)
log.Warnf("[p2p]syncBlock Height:%d ReqBlkData error:%s", nextBlockHeight, err)
return
} else {
this.appendReqTime(reqNode.GetID())
Expand All @@ -459,7 +455,7 @@ func (this *BlockSyncMgr) OnHeaderReceive(fromID uint64, headers []*types.Header
if len(headers) == 0 {
return
}
log.Infof("OnHeaderReceive Height:%d - %d", headers[0].Height, headers[len(headers)-1].Height)
log.Infof("Header receive height:%d - %d", headers[0].Height, headers[len(headers)-1].Height)
height := headers[0].Height
curHeaderHeight := this.ledger.GetCurrentHeaderHeight()

Expand All @@ -478,7 +474,7 @@ func (this *BlockSyncMgr) OnHeaderReceive(fromID uint64, headers []*types.Header
if n != nil && n.GetErrorRespCnt() >= SYNC_MAX_ERROR_RESP_TIMES {
this.delNode(fromID)
}
log.Errorf("OnHeaderReceive AddHeaders error:%s", err)
log.Warnf("[p2p]OnHeaderReceive AddHeaders error:%s", err)
return
}
this.syncHeader()
Expand All @@ -488,7 +484,7 @@ func (this *BlockSyncMgr) OnHeaderReceive(fromID uint64, headers []*types.Header
func (this *BlockSyncMgr) OnBlockReceive(fromID uint64, blockSize uint32, block *types.Block) {
height := block.Header.Height
blockHash := block.Hash()
log.Debugf("OnBlockReceive Height:%d", height)
log.Debugf("[p2p]OnBlockReceive Height:%d", height)
flightInfos := this.flightBlocks[blockHash]
for _, flightInfo := range flightInfos {
if flightInfo.GetNodeId() == fromID {
Expand Down Expand Up @@ -517,7 +513,7 @@ func (this *BlockSyncMgr) OnBlockReceive(fromID uint64, blockSize uint32, block

//OnAddNode to node list when a new node added
func (this *BlockSyncMgr) OnAddNode(nodeId uint64) {
log.Infof("OnAddNode:%d", nodeId)
log.Debugf("[p2p]OnAddNode:%d", nodeId)
this.lock.Lock()
defer this.lock.Unlock()
w := NewNodeWeight(nodeId)
Expand All @@ -539,6 +535,7 @@ func (this *BlockSyncMgr) delNode(nodeId uint64) {
if len(this.nodeWeights) == 0 {
log.Warnf("no sync nodes")
}
log.Infof("OnDelNode:%d", nodeId)
}

func (this *BlockSyncMgr) tryGetSyncHeaderLock() bool {
Expand Down Expand Up @@ -643,7 +640,7 @@ func (this *BlockSyncMgr) saveBlock() {
if n != nil && n.GetErrorRespCnt() >= SYNC_MAX_ERROR_RESP_TIMES {
this.delNode(fromID)
}
log.Warnf("saveBlock Height:%d AddBlock error:%s", nextBlockHeight, err)
log.Warnf("[p2p]saveBlock Height:%d AddBlock error:%s", nextBlockHeight, err)
reqNode := this.getNextNode(nextBlockHeight)
if reqNode == nil {
return
Expand All @@ -652,7 +649,7 @@ func (this *BlockSyncMgr) saveBlock() {
msg := msgpack.NewBlkDataReq(nextBlock.Hash())
err := this.server.Send(reqNode, msg, false)
if err != nil {
log.Error("syncBlock error:", err)
log.Warn("[p2p]require new block error:", err)
return
} else {
this.appendReqTime(reqNode.GetID())
Expand Down
12 changes: 10 additions & 2 deletions p2pserver/common/p2p_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package common

import (
"errors"
"strconv"
"strings"

"github.com/ontio/ontology/core/types"
Expand Down Expand Up @@ -153,7 +154,7 @@ type AppendBlock struct {
func ParseIPAddr(s string) (string, error) {
i := strings.Index(s, ":")
if i < 0 {
return s, errors.New("split ip address error")
return "", errors.New("[p2p]split ip address error")
}
return s[:i], nil
}
Expand All @@ -162,7 +163,14 @@ func ParseIPAddr(s string) (string, error) {
func ParseIPPort(s string) (string, error) {
i := strings.Index(s, ":")
if i < 0 {
return s, errors.New("split ip port error")
return "", errors.New("[p2p]split ip port error")
}
port, err := strconv.Atoi(s[i+1:])
if err != nil {
return "", errors.New("[p2p]parse port error")
}
if port <= 0 || port >= 65535 {
return "", errors.New("[p2p]port out of bound")
}
return s[i:], nil
}
11 changes: 6 additions & 5 deletions p2pserver/link/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (this *Link) Rx() {
for {
msg, payloadSize, err := types.ReadMessage(reader)
if err != nil {
log.Error("read connection error ", err)
log.Infof("[p2p]error read from %s :%s", this.GetAddr(), err.Error())
break
}

Expand All @@ -141,6 +141,7 @@ func (this *Link) Rx() {

//disconnectNotify push disconnect msg to channel
func (this *Link) disconnectNotify() {
log.Debugf("[p2p]call disconnectNotify for %s\n", this.GetAddr())
this.CloseConn()

msg, _ := types.MakeEmptyMessage(common.DISCONNECT_TYPE)
Expand All @@ -163,18 +164,18 @@ func (this *Link) CloseConn() {
func (this *Link) Tx(msg types.Message) error {
conn := this.conn
if conn == nil {
return errors.New("tx link invalid")
return errors.New("[p2p]tx link invalid")
}
buf := bytes.NewBuffer(nil)
err := types.WriteMessage(buf, msg)
if err != nil {
log.Error("error serialize messge ", err.Error())
log.Debugf("[p2p]error serialize messge ", err.Error())
return err
}

payload := buf.Bytes()
nByteCnt := len(payload)
log.Debugf("TX buf length: %d\n", nByteCnt)
log.Debugf("[p2p]TX buf length: %d\n", nByteCnt)

nCount := nByteCnt / common.PER_SEND_LEN
if nCount == 0 {
Expand All @@ -183,7 +184,7 @@ func (this *Link) Tx(msg types.Message) error {
conn.SetWriteDeadline(time.Now().Add(time.Duration(nCount*common.WRITE_DEADLINE) * time.Second))
_, err = conn.Write(payload)
if err != nil {
log.Error("error sending messge to peer node ", err.Error())
log.Infof("[p2p]error sending messge to %s :%s", this.GetAddr(), err.Error())
this.disconnectNotify()
return err
}
Expand Down
11 changes: 11 additions & 0 deletions p2pserver/message/msg_pack/msg_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

//Peer address package
func NewAddrs(nodeAddrs []msgCommon.PeerAddr) mt.Message {
log.Debug()
var addr mt.Addr
addr.NodeAddrs = nodeAddrs

Expand All @@ -40,6 +41,7 @@ func NewAddrs(nodeAddrs []msgCommon.PeerAddr) mt.Message {

//Peer address request package
func NewAddrReq() mt.Message {
log.Debug()
var msg mt.AddrReq
return &msg
}
Expand All @@ -55,6 +57,7 @@ func NewBlock(bk *ct.Block) mt.Message {

//blk hdr package
func NewHeaders(headers []*ct.Header) mt.Message {
log.Debug()
var blkHdr mt.BlkHeader
blkHdr.BlkHdr = headers

Expand All @@ -63,6 +66,7 @@ func NewHeaders(headers []*ct.Header) mt.Message {

//blk hdr req package
func NewHeadersReq(curHdrHash common.Uint256) mt.Message {
log.Debug()
var h mt.HeadersReq
h.Len = 1
h.HashEnd = curHdrHash
Expand All @@ -81,6 +85,7 @@ func NewConsensus(cp *mt.ConsensusPayload) mt.Message {

//InvPayload
func NewInvPayload(invType common.InventoryType, msg []common.Uint256) *mt.InvPayload {
log.Debug()
return &mt.InvPayload{
InvType: invType,
Blk: msg,
Expand All @@ -89,6 +94,7 @@ func NewInvPayload(invType common.InventoryType, msg []common.Uint256) *mt.InvPa

//Inv request package
func NewInv(invPayload *mt.InvPayload) mt.Message {
log.Debug()
var inv mt.Inv
inv.P.Blk = invPayload.Blk
inv.P.InvType = invPayload.InvType
Expand Down Expand Up @@ -134,6 +140,7 @@ func NewTxn(txn *ct.Transaction) mt.Message {

//version ack package
func NewVerAck(isConsensus bool) mt.Message {
log.Debug()
var verAck mt.VerACK
verAck.IsConsensus = isConsensus

Expand All @@ -142,6 +149,7 @@ func NewVerAck(isConsensus bool) mt.Message {

//Version package
func NewVersion(n p2pnet.P2P, isCons bool, height uint32) mt.Message {
log.Debug()
var version mt.Version
version.P = mt.VersionPayload{
Version: n.GetVersion(),
Expand Down Expand Up @@ -170,6 +178,7 @@ func NewVersion(n p2pnet.P2P, isCons bool, height uint32) mt.Message {

//transaction request package
func NewTxnDataReq(hash common.Uint256) mt.Message {
log.Debug()
var dataReq mt.DataReq
dataReq.DataType = common.TRANSACTION
dataReq.Hash = hash
Expand All @@ -179,6 +188,7 @@ func NewTxnDataReq(hash common.Uint256) mt.Message {

//block request package
func NewBlkDataReq(hash common.Uint256) mt.Message {
log.Debug()
var dataReq mt.DataReq
dataReq.DataType = common.BLOCK
dataReq.Hash = hash
Expand All @@ -188,6 +198,7 @@ func NewBlkDataReq(hash common.Uint256) mt.Message {

//consensus request package
func NewConsensusDataReq(hash common.Uint256) mt.Message {
log.Debug()
var dataReq mt.DataReq
dataReq.DataType = common.CONSENSUS
dataReq.Hash = hash
Expand Down
Loading

0 comments on commit 4e3ff98

Please sign in to comment.