Skip to content

Commit

Permalink
Merge pull request #73 from migalabs/feat/speedup-attestation-streamings
Browse files Browse the repository at this point in the history
Swap DB call to get HostInfo to local peerstore calls
  • Loading branch information
cortze authored Mar 22, 2024
2 parents 0607a58 + 5045cea commit e74c3aa
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/crawler/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig)
}

// Build the event forwarder
eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, dbClient, ethMsgHandler)
eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, host, ethMsgHandler)

// generate the CrawlerBase
crawler := &EthereumCrawler{
Expand Down
30 changes: 14 additions & 16 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/http"
"sync"

"github.com/migalabs/armiarma/pkg/db/postgresql"
"github.com/migalabs/armiarma/pkg/hosts"
"github.com/migalabs/armiarma/pkg/networks/ethereum"
"github.com/r3labs/sse/v2"
log "github.com/sirupsen/logrus"
Expand All @@ -20,7 +20,7 @@ type Forwarder struct {
port int

server *sse.Server
db *postgresql.DBClient
h *hosts.BasicLibp2pHost
ethMsgHandler *ethereum.EthMessageHandler

// Store downstream attestation events in a channel so
Expand All @@ -31,7 +31,7 @@ type Forwarder struct {
}

// NewForwarder creates a new Forwarder
func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder {
func NewForwarder(ip string, port int, h *hosts.BasicLibp2pHost, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder {
server := sse.New()

// Disable auto replay. If a consumer is not connected, it will never receive the event.
Expand All @@ -41,7 +41,7 @@ func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *e
ip: ip,
port: port,
server: server,
db: db,
h: h,
ethMsgHandler: ethMsgHandler,
attestationCh: make(chan *ethereum.AttestationReceievedEvent, 10000),
}
Expand Down Expand Up @@ -137,12 +137,10 @@ func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEven
log.WithError(err).Error("error publishing raw attestation to SSE server")
}

// Build the timed attestation event
info, err := f.db.GetFullHostInfo(e.PeerID)
// compose the info of the remote peer sending the attestation
hostInfo, err := f.h.GetHostInfo(e.PeerID)
if err != nil {
log.WithError(err).Error("error getting host info from DB when handling a new attestation")

return
log.WithError(err).Error("unable to extract info from peer that shared attestation")
}

// Publish the timed event
Expand All @@ -155,13 +153,13 @@ func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEven
TimeInSlot: e.TrackedAttestation.TimeInSlot,
},
PeerInfo: &PeerInfo{
ID: fmt.Sprintf("%s", info.ID),
IP: info.IP,
Port: info.Port,
UserAgent: info.PeerInfo.UserAgent,
Latency: info.PeerInfo.Latency,
Protocols: info.PeerInfo.Protocols,
ProtocolVersion: info.PeerInfo.ProtocolVersion,
ID: e.PeerID.String(),
IP: hostInfo.IP,
Port: hostInfo.Port,
UserAgent: hostInfo.PeerInfo.UserAgent,
Latency: hostInfo.PeerInfo.Latency,
Protocols: hostInfo.PeerInfo.Protocols,
ProtocolVersion: hostInfo.PeerInfo.ProtocolVersion,
},
}); err != nil {
log.WithError(err).Error("error publishing timed attestation to SSE server")
Expand Down
30 changes: 30 additions & 0 deletions pkg/hosts/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,33 @@ func (b *BasicLibp2pHost) RecIdentEvent(identEvent IdentificationEvent) {
func (b *BasicLibp2pHost) IdentEventNotChannel() chan IdentificationEvent {
return b.identNotChannel
}

func (b *BasicLibp2pHost) GetHostInfo(peerID peer.ID) (models.HostInfo, error) {
var hostInfo models.HostInfo

// Connectivity of the node
hostInfo.MAddrs = b.host.Peerstore().Addrs(peerID)
addr := utils.GetPublicAddrsFromAddrArray(hostInfo.MAddrs)
hostInfo.IP = utils.ExtractIPFromMAddr(addr).String()
hostInfo.Port = utils.GetPortFromMaddrs(addr)

// Protocols
pv, err := b.host.Peerstore().Get(peerID, "ProtocolVersion")
if err == nil {
hostInfo.PeerInfo.ProtocolVersion = pv.(string)
}
prots, err := b.host.Peerstore().GetProtocols(peerID)
if err == nil {
supportedProtocols := make([]string, len(prots))
for i, prot := range prots {
supportedProtocols[i] = string(prot)
}
hostInfo.PeerInfo.Protocols = supportedProtocols
}
// Get user agent
ua, err := b.host.Peerstore().Get(peerID, "AgentVersion")
if err == nil {
hostInfo.PeerInfo.UserAgent = ua.(string)
}
return hostInfo, nil
}

0 comments on commit e74c3aa

Please sign in to comment.