Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #96 from binance-chain/metrics
Browse files Browse the repository at this point in the history
[R4R]monitor: add more metrics about p2p
  • Loading branch information
unclezoro authored Jul 18, 2019
2 parents 9625a44 + d411275 commit 93865b9
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 21 deletions.
30 changes: 30 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ type Metrics struct {

// Number of blockparts transmitted by peer.
BlockParts metrics.Counter

// Number of proposals transmitted by peer.
Proposals metrics.Counter

// Number of votes transmitted by peer.
Votes metrics.Counter

// Number of voteSetBits transmitted by peer.
VoteSetBits metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -154,7 +163,25 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Subsystem: MetricsSubsystem,
Name: "block_parts",
Help: "Number of blockparts transmitted by peer.",
}, append(labels, "peer_id", "index")).With(labelsAndValues...),
Proposals: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "proposals",
Help: "Number of proposals transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
Votes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "votes",
Help: "Number of votes transmitted by peer.",
}, append(labels, "peer_id", "type")).With(labelsAndValues...),
VoteSetBits: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "vote_set_bits",
Help: "Number of voteSetBits transmitted by peer.",
}, append(labels, "peer_id", "type")).With(labelsAndValues...),
}
}

Expand All @@ -180,5 +207,8 @@ func NopMetrics() *Metrics {
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
Proposals: discard.NewCounter(),
Votes: discard.NewCounter(),
VoteSetBits: discard.NewCounter(),
}
}
8 changes: 5 additions & 3 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"fmt"
"reflect"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -285,12 +286,13 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.metrics.Proposals.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.metrics.BlockParts.With("peer_id", string(src.ID())).With("index", strconv.Itoa(msg.Part.Index)).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand All @@ -310,7 +312,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)

conR.metrics.Votes.With("peer_id", string(src.ID())).With("type", msg.Vote.Type.String()).Add(1)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}

default:
Expand All @@ -329,7 +331,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()

conR.metrics.VoteSetBits.With("peer_id", string(src.ID())).With("type", msg.Type.String()).Add(1)
if height == msg.Height {
var ourVotes *cmn.BitArray
switch msg.Type {
Expand Down
15 changes: 7 additions & 8 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ func txKey(tx types.Tx) [sha256.Size]byte {
type Mempool struct {
config *cfg.MempoolConfig

proxyLowMtx sync.Mutex
proxyNextMtx sync.Mutex
proxyBlockingMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
preCheck PreCheckFunc
postCheck PostCheckFunc
proxyLowMtx sync.Mutex
proxyNextMtx sync.Mutex
proxyBlockingMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
preCheck PreCheckFunc
postCheck PostCheckFunc

// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated
Expand Down Expand Up @@ -417,7 +417,6 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
// but they can spam the same tx with little cost to them atm.
}
}

return ErrTxInCache
}
// END CACHE
Expand Down
18 changes: 18 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Metrics struct {
FailedTxs metrics.Counter
// Number of times transactions are rechecked in the mempool.
RecheckTimes metrics.Counter
// Number of tx transmitted by peer.
ReceivedTx metrics.Counter
// Number of duplicated tx transmitted by peer.
DuplicateTx metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -60,6 +64,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "recheck_times",
Help: "Number of times transactions are rechecked in the mempool.",
}, labels).With(labelsAndValues...),
ReceivedTx: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "received_tx",
Help: "Number of tx transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
DuplicateTx: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duplicate_tx",
Help: "Number of duplicate tx transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
}
}

Expand All @@ -70,5 +86,7 @@ func NopMetrics() *Metrics {
TxSizeBytes: discard.NewHistogram(),
FailedTxs: discard.NewCounter(),
RecheckTimes: discard.NewCounter(),
ReceivedTx: discard.NewCounter(),
DuplicateTx: discard.NewCounter(),
}
}
6 changes: 5 additions & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage

MempoolPacketChannelSize = 1024 * 200 // 200K messages can be queued
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount

// UnknownPeerID is the peer ID to use when running CheckTx when there is
// no peer (e.g. RPC)
Expand Down Expand Up @@ -193,9 +193,13 @@ func (memR *MempoolReactor) receiveImpl(chID byte, src p2p.Peer, msgBytes []byte

switch msg := msg.(type) {
case *TxMessage:
memR.Mempool.metrics.ReceivedTx.With("peer_id", string(src.ID())).Add(1)
peerID := memR.ids.GetForPeer(src)
err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID})
if err != nil {
if err == ErrTxInCache {
memR.Mempool.metrics.DuplicateTx.With("peer_id", string(src.ID())).Add(1)
}
memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
}
// broadcasting happens from go routines per peer
Expand Down
9 changes: 0 additions & 9 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type Metrics struct {
PeerSendBytesTotal metrics.Counter
// Pending bytes to be sent to a given peer.
PeerPendingSendBytes metrics.Gauge
// Number of transactions submitted by each peer.
NumTxs metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -60,12 +58,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "peer_pending_send_bytes",
Help: "Number of pending bytes to be sent to a given peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_txs",
Help: "Number of transactions submitted by each peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
}
}

Expand All @@ -76,6 +68,5 @@ func NopMetrics() *Metrics {
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
}
}
13 changes: 13 additions & 0 deletions types/signed_msg_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,16 @@ func IsVoteTypeValid(t SignedMsgType) bool {
return false
}
}

func (t SignedMsgType) String() string {
switch t {
case PrevoteType:
return "prevote"
case PrecommitType:
return "precommit"
case ProposalType:
return "proposal"
default:
return "unknow"
}
}

0 comments on commit 93865b9

Please sign in to comment.