Skip to content

Commit

Permalink
feat: peer message counts (#334)
Browse files Browse the repository at this point in the history
* feat: peer message counts

* add remove peer messages
  • Loading branch information
minhd-vu authored Aug 6, 2024
1 parent b3795cd commit 896553a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 68 deletions.
102 changes: 96 additions & 6 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"slices"
"sync"
"syscall"
"time"
Expand All @@ -17,13 +18,15 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
ethp2p "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

Expand Down Expand Up @@ -180,7 +183,7 @@ var SensorCmd = &cobra.Command{
Namespace: "sensor",
Name: "messages",
Help: "The number and type of messages the sensor has received",
}, []string{"code", "message"})
}, []string{"message", "url"})

opts := p2p.EthProtocolOptions{
Context: cmd.Context(),
Expand Down Expand Up @@ -247,12 +250,15 @@ var SensorCmd = &cobra.Command{
peers[node.ID()] = node.URLv4()
}

go handleAPI(&server)
go handleAPI(&server, msgCounter)

for {
select {
case <-ticker.C:
peersGauge.Set(float64(server.PeerCount()))
if err := removePeerMessages(msgCounter, server.Peers()); err != nil {
log.Error().Err(err).Msg("Failed to clean up peer messages")
}
case peer := <-opts.Peers:
// Update the peer list and the nodes file.
if _, ok := peers[peer.ID()]; !ok {
Expand All @@ -276,13 +282,22 @@ var SensorCmd = &cobra.Command{
},
}

// handlePprof starts a server for performance profiling using pprof on the
// specified port. This allows for real-time monitoring and analysis of the
// sensor's performance. The port number is configured through
// inputSensorParams.PprofPort. An error is logged if the server fails to start.
func handlePprof() {
addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Error().Err(err).Msg("Failed to start pprof")
}
}

// handlePrometheus starts a server to expose Prometheus metrics at the /metrics
// endpoint. This enables Prometheus to scrape and collect metrics data for
// monitoring purposes. The port number is configured through
// inputSensorParams.PrometheusPort. An error is logged if the server fails to
// start.
func handlePrometheus() {
http.Handle("/metrics", promhttp.Handler())
addr := fmt.Sprintf(":%d", inputSensorParams.PrometheusPort)
Expand All @@ -291,7 +306,10 @@ func handlePrometheus() {
}
}

func handleAPI(server *ethp2p.Server) {
// handleAPI sets up the API for interacting with the sensor. The `/peers`
// endpoint returns a list of all peers connected to the sensor, including the
// types and counts of eth packets sent by each peer.
func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) {
http.HandleFunc("/peers", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
Expand All @@ -301,12 +319,13 @@ func handleAPI(server *ethp2p.Server) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

urls := []string{}
peers := make(map[string]p2p.MessageCount)
for _, peer := range server.Peers() {
urls = append(urls, peer.Node().URLv4())
url := peer.Node().URLv4()
peers[url] = getPeerMessages(url, counter)
}

err := json.NewEncoder(w).Encode(urls)
err := json.NewEncoder(w).Encode(peers)
if err != nil {
log.Error().Err(err).Msg("Failed to encode peers")
}
Expand All @@ -318,6 +337,77 @@ func handleAPI(server *ethp2p.Server) {
}
}

// getPeerMessages retrieves the count of various types of eth packets sent by a
// peer.
func getPeerMessages(url string, counter *prometheus.CounterVec) p2p.MessageCount {
return p2p.MessageCount{
BlockHeaders: getCounterValue(new(eth.BlockHeadersPacket), url, counter),
BlockBodies: getCounterValue(new(eth.BlockBodiesPacket), url, counter),
Blocks: getCounterValue(new(eth.NewBlockPacket), url, counter),
BlockHashes: getCounterValue(new(eth.NewBlockHashesPacket), url, counter),
BlockHeaderRequests: getCounterValue(new(eth.GetBlockHeadersPacket), url, counter),
BlockBodiesRequests: getCounterValue(new(eth.GetBlockBodiesPacket), url, counter),
Transactions: getCounterValue(new(eth.TransactionsPacket), url, counter) +
getCounterValue(new(eth.PooledTransactionsPacket), url, counter),
TransactionHashes: getCounterValue(new(eth.NewPooledTransactionHashesPacket67), url, counter) +
getCounterValue(new(eth.NewPooledTransactionHashesPacket68), url, counter),
TransactionRequests: getCounterValue(new(eth.GetPooledTransactionsRequest), url, counter),
}
}

// getCounterValue retrieves the count of packets for a specific type from the
// Prometheus counter.
func getCounterValue(packet eth.Packet, url string, counter *prometheus.CounterVec) int64 {
metric := &dto.Metric{}

err := counter.WithLabelValues(packet.Name(), url).Write(metric)
if err != nil {
log.Error().Err(err).Send()
return 0
}

return int64(metric.GetCounter().GetValue())
}

// removePeerMessages removes all the counters of peers that disconnected from
// the sensor. This prevents the metrics list from infinitely growing.
func removePeerMessages(counter *prometheus.CounterVec, peers []*ethp2p.Peer) error {
urls := []string{}
for _, peer := range peers {
urls = append(urls, peer.Node().URLv4())
}

families, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return err
}

var family *dto.MetricFamily
for _, f := range families {
if f.GetName() == "sensor_messages" {
family = f
break
}
}

if family == nil {
return errors.New("could not find sensor_messages metric family")
}

for _, metric := range family.GetMetric() {
for _, label := range metric.GetLabel() {
url := label.GetValue()
if label.GetName() != "url" || slices.Contains(urls, url) {
continue
}

counter.DeletePartialMatch(prometheus.Labels{"url": url})
}
}

return nil
}

// getLatestBlock will get the latest block from an RPC provider.
func getLatestBlock(url string) (*rpctypes.RawBlockResponse, error) {
client, err := rpc.Dial(url)
Expand Down
76 changes: 38 additions & 38 deletions p2p/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,53 @@ import (
// logging. It can be used to count the different types of messages received
// across all peer connections to provide a summary.
type MessageCount struct {
BlockHeaders int32 `json:",omitempty"`
BlockBodies int32 `json:",omitempty"`
Blocks int32 `json:",omitempty"`
BlockHashes int32 `json:",omitempty"`
BlockHeaderRequests int32 `json:",omitempty"`
BlockBodiesRequests int32 `json:",omitempty"`
Transactions int32 `json:",omitempty"`
TransactionHashes int32 `json:",omitempty"`
TransactionRequests int32 `json:",omitempty"`
Pings int32 `json:",omitempty"`
Errors int32 `json:",omitempty"`
Disconnects int32 `json:",omitempty"`
BlockHeaders int64 `json:"block_headers,omitempty"`
BlockBodies int64 `json:"block_bodies,omitempty"`
Blocks int64 `json:"blocks,omitempty"`
BlockHashes int64 `json:"block_hashes,omitempty"`
BlockHeaderRequests int64 `json:"block_header_requests,omitempty"`
BlockBodiesRequests int64 `json:"block_bodies_requests,omitempty"`
Transactions int64 `json:"transactions,omitempty"`
TransactionHashes int64 `json:"transaction_hashes,omitempty"`
TransactionRequests int64 `json:"transaction_requests,omitempty"`
Pings int64 `json:"pings,omitempty"`
Errors int64 `json:"errors,omitempty"`
Disconnects int64 `json:"disconnects,omitempty"`
}

// Load takes a snapshot of all the counts in a thread-safe manner. Make sure
// you call this and read from the returned object.
func (count *MessageCount) Load() MessageCount {
return MessageCount{
BlockHeaders: atomic.LoadInt32(&count.BlockHeaders),
BlockBodies: atomic.LoadInt32(&count.BlockBodies),
Blocks: atomic.LoadInt32(&count.Blocks),
BlockHashes: atomic.LoadInt32(&count.BlockHashes),
BlockHeaderRequests: atomic.LoadInt32(&count.BlockHeaderRequests),
BlockBodiesRequests: atomic.LoadInt32(&count.BlockBodiesRequests),
Transactions: atomic.LoadInt32(&count.Transactions),
TransactionHashes: atomic.LoadInt32(&count.TransactionHashes),
TransactionRequests: atomic.LoadInt32(&count.TransactionRequests),
Pings: atomic.LoadInt32(&count.Pings),
Errors: atomic.LoadInt32(&count.Errors),
Disconnects: atomic.LoadInt32(&count.Disconnects),
BlockHeaders: atomic.LoadInt64(&count.BlockHeaders),
BlockBodies: atomic.LoadInt64(&count.BlockBodies),
Blocks: atomic.LoadInt64(&count.Blocks),
BlockHashes: atomic.LoadInt64(&count.BlockHashes),
BlockHeaderRequests: atomic.LoadInt64(&count.BlockHeaderRequests),
BlockBodiesRequests: atomic.LoadInt64(&count.BlockBodiesRequests),
Transactions: atomic.LoadInt64(&count.Transactions),
TransactionHashes: atomic.LoadInt64(&count.TransactionHashes),
TransactionRequests: atomic.LoadInt64(&count.TransactionRequests),
Pings: atomic.LoadInt64(&count.Pings),
Errors: atomic.LoadInt64(&count.Errors),
Disconnects: atomic.LoadInt64(&count.Disconnects),
}
}

// Clear clears all of the counts from the message counter.
func (count *MessageCount) Clear() {
atomic.StoreInt32(&count.BlockHeaders, 0)
atomic.StoreInt32(&count.BlockBodies, 0)
atomic.StoreInt32(&count.Blocks, 0)
atomic.StoreInt32(&count.BlockHashes, 0)
atomic.StoreInt32(&count.BlockHeaderRequests, 0)
atomic.StoreInt32(&count.BlockBodiesRequests, 0)
atomic.StoreInt32(&count.Transactions, 0)
atomic.StoreInt32(&count.TransactionHashes, 0)
atomic.StoreInt32(&count.TransactionRequests, 0)
atomic.StoreInt32(&count.Pings, 0)
atomic.StoreInt32(&count.Errors, 0)
atomic.StoreInt32(&count.Disconnects, 0)
atomic.StoreInt64(&count.BlockHeaders, 0)
atomic.StoreInt64(&count.BlockBodies, 0)
atomic.StoreInt64(&count.Blocks, 0)
atomic.StoreInt64(&count.BlockHashes, 0)
atomic.StoreInt64(&count.BlockHeaderRequests, 0)
atomic.StoreInt64(&count.BlockBodiesRequests, 0)
atomic.StoreInt64(&count.Transactions, 0)
atomic.StoreInt64(&count.TransactionHashes, 0)
atomic.StoreInt64(&count.TransactionRequests, 0)
atomic.StoreInt64(&count.Pings, 0)
atomic.StoreInt64(&count.Errors, 0)
atomic.StoreInt64(&count.Disconnects, 0)
}

// IsEmpty checks whether the sum of all the counts is empty. Make sure to call
Expand All @@ -76,8 +76,8 @@ func (c *MessageCount) IsEmpty() bool {
) == 0
}

func sum(ints ...int32) int32 {
var sum int32 = 0
func sum(ints ...int64) int64 {
var sum int64 = 0
for _, i := range ints {
sum += i
}
Expand Down
20 changes: 10 additions & 10 deletions p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet)))
c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(packet)))

hashes := make([]common.Hash, 0, len(packet))
for _, hash := range packet {
Expand All @@ -315,7 +315,7 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), txs.Name()).Add(float64(len(txs)))
c.counter.WithLabelValues(txs.Name(), c.node.URLv4()).Add(float64(len(txs)))

c.db.WriteTransactions(ctx, c.node, txs)

Expand All @@ -328,7 +328,7 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error {
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Inc()
c.counter.WithLabelValues(request.Name(), c.node.URLv4()).Inc()

return ethp2p.Send(
c.rw,
Expand All @@ -344,7 +344,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
}

headers := packet.BlockHeadersRequest
c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(headers)))
c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(headers)))

for _, header := range headers {
if err := c.getParentBlock(ctx, header); err != nil {
Expand All @@ -363,7 +363,7 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error {
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetBlockBodiesRequest)))
c.counter.WithLabelValues(request.Name(), c.node.URLv4()).Add(float64(len(request.GetBlockBodiesRequest)))

return ethp2p.Send(
c.rw,
Expand All @@ -382,7 +382,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
return nil
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.BlockBodiesResponse)))
c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(packet.BlockBodiesResponse)))

var hash *common.Hash
for e := c.requests.Front(); e != nil; e = e.Next() {
Expand Down Expand Up @@ -411,7 +411,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), block.Name()).Inc()
c.counter.WithLabelValues(block.Name(), c.node.URLv4()).Inc()

// Set the head block if newer.
c.headMutex.Lock()
Expand Down Expand Up @@ -442,7 +442,7 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error {
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetPooledTransactionsRequest)))
c.counter.WithLabelValues(request.Name(), c.node.URLv4()).Add(float64(len(request.GetPooledTransactionsRequest)))

return ethp2p.Send(
c.rw,
Expand Down Expand Up @@ -473,7 +473,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
return errors.New("protocol version not found")
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), name).Add(float64(len(hashes)))
c.counter.WithLabelValues(name, c.node.URLv4()).Add(float64(len(hashes)))

if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() {
return nil
Expand All @@ -492,7 +492,7 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err
return err
}

c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.PooledTransactionsResponse)))
c.counter.WithLabelValues(packet.Name(), c.node.URLv4()).Add(float64(len(packet.PooledTransactionsResponse)))

c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse)

Expand Down
Loading

0 comments on commit 896553a

Please sign in to comment.