Skip to content

Commit

Permalink
send graph sync messages to both validators and relayers
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricfung committed May 21, 2024
1 parent 49dd630 commit 99b6111
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion kernel/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func (node *Node) Loop() error {
}

go node.listenConsumers()
go node.sendGraphToConcensusNodes()
go node.sendGraphToConcensusNodesAndPeers()
go node.loopCacheQueue()
go node.MintLoop()
node.ElectionLoop()
Expand Down
14 changes: 11 additions & 3 deletions kernel/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,14 +489,22 @@ func (node *Node) ReadSnapshotsForNodeRound(nodeIdWithNetwork crypto.Hash, round
return node.persistStore.ReadSnapshotsForNodeRound(nodeIdWithNetwork, round)
}

func (node *Node) sendGraphToConcensusNodes() {
graphTicker := time.NewTicker(time.Duration(config.SnapshotRoundGap / 2))
func (node *Node) sendGraphToConcensusNodesAndPeers() {
graphTicker := time.NewTicker(time.Duration(config.SnapshotRoundGap))
defer graphTicker.Stop()

for {
nodes := node.NodesListWithoutState(uint64(clock.Now().UnixNano()), true)
neighbors := node.Peer.Neighbors()
peers := make(map[crypto.Hash]bool)
for _, cn := range nodes {
node.Peer.SendGraphMessage(cn.IdForNetwork)
peers[cn.IdForNetwork] = true
}
for _, p := range neighbors {
peers[p.IdForNetwork] = true
}
for id := range peers {
node.Peer.SendGraphMessage(id)
}
<-graphTicker.C
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
PeerMessageTypePing = 1
PeerMessageTypePing = 1 // not used because too more than enough graph sync messages
PeerMessageTypeAuthentication = 3
PeerMessageTypeGraph = 4
PeerMessageTypeSnapshotConfirm = 5
Expand Down
12 changes: 8 additions & 4 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ func (me *Peer) Metric() map[string]*MetricPool {
}

func NewPeer(handle SyncHandle, idForNetwork crypto.Hash, addr string, isRelayer bool) *Peer {
ringSize := uint64(MaxIncomingStreams * 16)
peer := &Peer{
IdForNetwork: idForNetwork,
Address: addr,
relayers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
consumers: &neighborMap{m: make(map[crypto.Hash]*Peer)},
highRing: util.NewRingBuffer(1024),
normalRing: util.NewRingBuffer(1024),
syncRing: util.NewRingBuffer(1024),
highRing: util.NewRingBuffer(ringSize),
normalRing: util.NewRingBuffer(ringSize),
syncRing: util.NewRingBuffer(ringSize),
handle: handle,
sentMetric: &MetricPool{enabled: false},
receivedMetric: &MetricPool{enabled: false},
Expand Down Expand Up @@ -286,7 +287,7 @@ func (me *Peer) loopSendingStream(p *Peer, consumer Client) (*ChanMsg, error) {
}

for _, m := range msgs {
if m.key != nil && me.snapshotsCaches.contains(m.key, time.Minute) {
if me.snapshotsCaches.contains(m.key, time.Minute) {
continue
}
err := consumer.Send(m.data)
Expand Down Expand Up @@ -460,6 +461,9 @@ type confirmMap struct {
}

func (m *confirmMap) contains(key []byte, duration time.Duration) bool {
if key == nil {
return false
}
val, found := m.cache.Get(key)
if found {
ts := time.Unix(0, int64(binary.BigEndian.Uint64(val.([]byte))))
Expand Down
4 changes: 2 additions & 2 deletions p2p/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ const (
MaxIncomingStreams = 1024
HandshakeTimeout = 10 * time.Second
IdleTimeout = 60 * time.Second
ReadDeadline = 60 * time.Second
WriteDeadline = 10 * time.Second
ReadDeadline = 2 * WriteDeadline
)

type QuicClient struct {
Expand All @@ -46,7 +46,7 @@ func NewQuicRelayer(listenAddr string) (*QuicRelayer, error) {
MaxIncomingStreams: MaxIncomingStreams,
HandshakeIdleTimeout: HandshakeTimeout,
MaxIdleTimeout: IdleTimeout,
KeepAlivePeriod: IdleTimeout / 2,
KeepAlivePeriod: 0,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 99b6111

Please sign in to comment.