From d41127560fd29f7f393e4e1921cd30c6f677ea66 Mon Sep 17 00:00:00 2001 From: fudongbai <296179868@qq.com> Date: Thu, 30 May 2019 15:25:18 +0800 Subject: [PATCH] monitor: add more metrics about p2p --- consensus/metrics.go | 30 ++++++++++++++++++++++++++++++ consensus/reactor.go | 8 +++++--- mempool/mempool.go | 15 +++++++-------- mempool/metrics.go | 18 ++++++++++++++++++ mempool/reactor.go | 6 +++++- p2p/metrics.go | 9 --------- types/signed_msg_type.go | 13 +++++++++++++ 7 files changed, 78 insertions(+), 21 deletions(-) diff --git a/consensus/metrics.go b/consensus/metrics.go index b5207742c..0bd882f74 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -51,6 +51,15 @@ type Metrics struct { // Number of blockparts transmitted by peer. BlockParts metrics.Counter + + // Number of proposals transmitted by peer. + Proposals metrics.Counter + + // Number of votes transmitted by peer. + Votes metrics.Counter + + // Number of voteSetBits transmitted by peer. + VoteSetBits metrics.Counter } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -154,7 +163,25 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Subsystem: MetricsSubsystem, Name: "block_parts", Help: "Number of blockparts transmitted by peer.", + }, append(labels, "peer_id", "index")).With(labelsAndValues...), + Proposals: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "proposals", + Help: "Number of proposals transmitted by peer.", }, append(labels, "peer_id")).With(labelsAndValues...), + Votes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "votes", + Help: "Number of votes transmitted by peer.", + }, append(labels, "peer_id", "type")).With(labelsAndValues...), + VoteSetBits: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "vote_set_bits", + Help: "Number of voteSetBits transmitted by peer.", + }, append(labels, "peer_id", "type")).With(labelsAndValues...), } } @@ -180,5 +207,8 @@ func NopMetrics() *Metrics { CommittedHeight: discard.NewGauge(), FastSyncing: discard.NewGauge(), BlockParts: discard.NewCounter(), + Proposals: discard.NewCounter(), + Votes: discard.NewCounter(), + VoteSetBits: discard.NewCounter(), } } diff --git a/consensus/reactor.go b/consensus/reactor.go index 52cdfaf21..035c8f413 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -3,6 +3,7 @@ package consensus import ( "fmt" "reflect" + "strconv" "sync" "time" @@ -285,12 +286,13 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) + conR.metrics.Proposals.With("peer_id", string(src.ID())).Add(1) conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} case *ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) - conR.metrics.BlockParts.With("peer_id", string(src.ID())).Add(1) + conR.metrics.BlockParts.With("peer_id", string(src.ID())).With("index", strconv.Itoa(msg.Part.Index)).Add(1) conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -310,7 +312,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - + conR.metrics.Votes.With("peer_id", string(src.ID())).With("type", msg.Vote.Type.String()).Add(1) cs.peerMsgQueue <- msgInfo{msg, src.ID()} default: @@ -329,7 +331,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) cs.mtx.Lock() height, votes := cs.Height, cs.Votes cs.mtx.Unlock() - + conR.metrics.VoteSetBits.With("peer_id", string(src.ID())).With("type", msg.Type.String()).Add(1) if height == msg.Height { var ourVotes *cmn.BitArray switch msg.Type { diff --git a/mempool/mempool.go b/mempool/mempool.go index af035c5fb..03f086c6f 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -161,13 +161,13 @@ func txKey(tx types.Tx) [sha256.Size]byte { type Mempool struct { config *cfg.MempoolConfig - proxyLowMtx sync.Mutex - proxyNextMtx sync.Mutex - proxyBlockingMtx sync.Mutex - proxyAppConn proxy.AppConnMempool - txs *clist.CList // concurrent linked-list of good txs - preCheck PreCheckFunc - postCheck PostCheckFunc + proxyLowMtx sync.Mutex + proxyNextMtx sync.Mutex + proxyBlockingMtx sync.Mutex + proxyAppConn proxy.AppConnMempool + txs *clist.CList // concurrent linked-list of good txs + preCheck PreCheckFunc + postCheck PostCheckFunc // Track whether we're rechecking txs. // These are not protected by a mutex and are expected to be mutated @@ -417,7 +417,6 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo // but they can spam the same tx with little cost to them atm. } } - return ErrTxInCache } // END CACHE diff --git a/mempool/metrics.go b/mempool/metrics.go index 5e4eaf5ed..52c0d5a77 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -24,6 +24,10 @@ type Metrics struct { FailedTxs metrics.Counter // Number of times transactions are rechecked in the mempool. RecheckTimes metrics.Counter + // Number of tx transmitted by peer. + ReceivedTx metrics.Counter + // Number of duplicated tx transmitted by peer. + DuplicateTx metrics.Counter } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -60,6 +64,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "recheck_times", Help: "Number of times transactions are rechecked in the mempool.", }, labels).With(labelsAndValues...), + ReceivedTx: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "received_tx", + Help: "Number of tx transmitted by peer.", + }, append(labels, "peer_id")).With(labelsAndValues...), + DuplicateTx: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "duplicate_tx", + Help: "Number of duplicate tx transmitted by peer.", + }, append(labels, "peer_id")).With(labelsAndValues...), } } @@ -70,5 +86,7 @@ func NopMetrics() *Metrics { TxSizeBytes: discard.NewHistogram(), FailedTxs: discard.NewCounter(), RecheckTimes: discard.NewCounter(), + ReceivedTx: discard.NewCounter(), + DuplicateTx: discard.NewCounter(), } } diff --git a/mempool/reactor.go b/mempool/reactor.go index 20e18d8aa..bd170204b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -23,7 +23,7 @@ const ( maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage MempoolPacketChannelSize = 1024 * 200 // 200K messages can be queued - peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount + peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) @@ -193,9 +193,13 @@ func (memR *MempoolReactor) receiveImpl(chID byte, src p2p.Peer, msgBytes []byte switch msg := msg.(type) { case *TxMessage: + memR.Mempool.metrics.ReceivedTx.With("peer_id", string(src.ID())).Add(1) peerID := memR.ids.GetForPeer(src) err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID}) if err != nil { + if err == ErrTxInCache { + memR.Mempool.metrics.DuplicateTx.With("peer_id", string(src.ID())).Add(1) + } memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err) } // broadcasting happens from go routines per peer diff --git a/p2p/metrics.go b/p2p/metrics.go index 3a6b9568a..1ef481ad1 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -23,8 +23,6 @@ type Metrics struct { PeerSendBytesTotal metrics.Counter // Pending bytes to be sent to a given peer. PeerPendingSendBytes metrics.Gauge - // Number of transactions submitted by each peer. - NumTxs metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -60,12 +58,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_pending_send_bytes", Help: "Number of pending bytes to be sent to a given peer.", }, append(labels, "peer_id")).With(labelsAndValues...), - NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "num_txs", - Help: "Number of transactions submitted by each peer.", - }, append(labels, "peer_id")).With(labelsAndValues...), } } @@ -76,6 +68,5 @@ func NopMetrics() *Metrics { PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), - NumTxs: discard.NewGauge(), } } diff --git a/types/signed_msg_type.go b/types/signed_msg_type.go index 6bd5f057e..b455fa431 100644 --- a/types/signed_msg_type.go +++ b/types/signed_msg_type.go @@ -21,3 +21,16 @@ func IsVoteTypeValid(t SignedMsgType) bool { return false } } + +func (t SignedMsgType) String() string { + switch t { + case PrevoteType: + return "prevote" + case PrecommitType: + return "precommit" + case ProposalType: + return "proposal" + default: + return "unknow" + } +}