From 1d92bf12e2c2d32d14043a78ea3aa9a783c27150 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Mon, 4 Feb 2019 00:52:15 +0530 Subject: [PATCH 1/2] optimize routes calculation for fully connected mesh of node --- peer.go | 4 +++- router.go | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/peer.go b/peer.go index 22b4138..1287f02 100644 --- a/peer.go +++ b/peer.go @@ -100,7 +100,9 @@ func (peer *Peer) routes(stopAt *Peer, establishedAndSymmetric bool) (bool, map[ } curPeer.forEachConnectedPeer(establishedAndSymmetric, routes, func(remotePeer *Peer) { - nextWorklist = append(nextWorklist, remotePeer) + if !SingleHopTopolgy { + nextWorklist = append(nextWorklist, remotePeer) + } remoteName := remotePeer.Name // We now know how to get to remoteName: the same // way we get to curPeer. Except, if curPeer is diff --git a/router.go b/router.go index 27be495..927cc9c 100644 --- a/router.go +++ b/router.go @@ -17,6 +17,10 @@ var ( // ChannelSize is the buffer size used by so-called actor goroutines // throughout mesh. ChannelSize = 16 + + // SingleHopTopolgy is used to indicate a topology of nodes participating + // in the mesh where each node is fully connected to other nodes + SingleHopTopolgy = false ) const ( @@ -37,6 +41,7 @@ type Config struct { ProtocolMinVersion byte PeerDiscovery bool TrustedSubnets []*net.IPNet + SingleHopTopolgy *bool } // Router manages communication between this peer and the rest of the mesh. @@ -78,6 +83,9 @@ func NewRouter(config Config, name PeerName, nickName string, overlay Overlay, l } router.topologyGossip = gossip router.acceptLimiter = newTokenBucket(acceptMaxTokens, acceptTokenDelay) + if config.SingleHopTopolgy != nil { + SingleHopTopolgy = *config.SingleHopTopolgy + } return router, nil } From 391dacdef65362191dca5352b417c3eafb5a9242 Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Tue, 5 Mar 2019 15:58:42 +0530 Subject: [PATCH 2/2] optimize route calculation and topology updates for full-connected topology --- local_peer.go | 20 ++++++++++++++++---- peer.go | 34 ++++++++++++++++++++++++++++++---- peers.go | 6 +++++- router.go | 11 +++-------- routes.go | 12 ++++++++++-- 5 files changed, 64 insertions(+), 19 deletions(-) diff --git a/local_peer.go b/local_peer.go index 7e633c9..93f64f0 100644 --- a/local_peer.go +++ b/local_peer.go @@ -192,8 +192,9 @@ func (peer *localPeer) handleAddConnection(conn ourConnection, isRestartedPeer b } peer.router.Routes.recalculate() - peer.broadcastPeerUpdate(conn.Remote()) - + if !peer.isFullyConnectedTopology() { + peer.broadcastPeerUpdate(conn.Remote()) + } return nil } @@ -209,7 +210,9 @@ func (peer *localPeer) handleConnectionEstablished(conn ourConnection) { conn.logf("connection fully established") peer.router.Routes.recalculate() - peer.broadcastPeerUpdate() + if !peer.isFullyConnectedTopology() { + peer.broadcastPeerUpdate(conn.Remote()) + } } func (peer *localPeer) handleDeleteConnection(conn ourConnection) { @@ -229,7 +232,9 @@ func (peer *localPeer) handleDeleteConnection(conn ourConnection) { // update with unreachable peers (can cause looping) peer.router.Peers.GarbageCollect() peer.router.Routes.recalculate() - peer.broadcastPeerUpdate() + if !peer.isFullyConnectedTopology() { + peer.broadcastPeerUpdate(conn.Remote()) + } } // helpers @@ -294,3 +299,10 @@ func (peer *localPeer) setVersionBeyond(version uint64) bool { } return false } + +func (peer *localPeer) isFullyConnectedTopology() bool { + if peer.router != nil { + return peer.router.Config.SingleHopTopolgy + } + return false +} diff --git a/peer.go b/peer.go index 1287f02..4d0a68a 100644 --- a/peer.go +++ b/peer.go @@ -86,7 +86,35 @@ func (peer *Peer) String() string { // // NB: This function should generally be invoked while holding a read lock on // Peers and LocalPeer. -func (peer *Peer) routes(stopAt *Peer, establishedAndSymmetric bool) (bool, map[PeerName]PeerName) { +func (peer *Peer) routes(stopAt *Peer, establishedAndSymmetric, fullyConnectedTopology bool) (bool, map[PeerName]PeerName) { + if fullyConnectedTopology { + return peer.routesForFullyConnectedTopology(stopAt, establishedAndSymmetric) + } else { + return peer.routesForGenericTopology(stopAt, establishedAndSymmetric) + } +} + +// routesForFullyConnectedTopology calculates routes for a special case of network topology where each peer in the +// mesh is fully connected to rest of the peers. So each peer is single hop and directly reachable from the peer. +func (peer *Peer) routesForFullyConnectedTopology(stopAt *Peer, establishedAndSymmetric bool) (bool, map[PeerName]PeerName) { + routes := make(unicastRoutes) + + for remoteName, conn := range peer.connections { + if establishedAndSymmetric && !conn.isEstablished() { + continue + } + remotePeer := conn.Remote() + if remotePeer == stopAt { + return true, routes + } + if remoteConn, found := remotePeer.connections[peer.Name]; !establishedAndSymmetric || (found && remoteConn.isEstablished()) { + routes[remoteName] = remoteName + } + } + return false, routes +} + +func (peer *Peer) routesForGenericTopology(stopAt *Peer, establishedAndSymmetric bool) (bool, map[PeerName]PeerName) { routes := make(unicastRoutes) routes[peer.Name] = UnknownPeerName nextWorklist := []*Peer{peer} @@ -100,9 +128,7 @@ func (peer *Peer) routes(stopAt *Peer, establishedAndSymmetric bool) (bool, map[ } curPeer.forEachConnectedPeer(establishedAndSymmetric, routes, func(remotePeer *Peer) { - if !SingleHopTopolgy { - nextWorklist = append(nextWorklist, remotePeer) - } + nextWorklist = append(nextWorklist, remotePeer) remoteName := remotePeer.Name // We now know how to get to remoteName: the same // way we get to curPeer. Except, if curPeer is diff --git a/peers.go b/peers.go index 1190450..46482ad 100644 --- a/peers.go +++ b/peers.go @@ -416,7 +416,11 @@ func (peers *Peers) GarbageCollect() { func (peers *Peers) garbageCollect(pending *peersPendingNotifications) { peers.ourself.RLock() - _, reached := peers.ourself.routes(nil, false) + singleHopTopology := false + if peers.ourself.router != nil { + singleHopTopology = peers.ourself.router.Config.SingleHopTopolgy + } + _, reached := peers.ourself.routes(nil, false, singleHopTopology) peers.ourself.RUnlock() for name, peer := range peers.byName { diff --git a/router.go b/router.go index 927cc9c..82a7313 100644 --- a/router.go +++ b/router.go @@ -17,10 +17,6 @@ var ( // ChannelSize is the buffer size used by so-called actor goroutines // throughout mesh. ChannelSize = 16 - - // SingleHopTopolgy is used to indicate a topology of nodes participating - // in the mesh where each node is fully connected to other nodes - SingleHopTopolgy = false ) const ( @@ -41,7 +37,9 @@ type Config struct { ProtocolMinVersion byte PeerDiscovery bool TrustedSubnets []*net.IPNet - SingleHopTopolgy *bool + // SingleHopTopolgy is used to indicate a topology of nodes participating + // in the mesh where each node is fully connected to other nodes + SingleHopTopolgy bool } // Router manages communication between this peer and the rest of the mesh. @@ -83,9 +81,6 @@ func NewRouter(config Config, name PeerName, nickName string, overlay Overlay, l } router.topologyGossip = gossip router.acceptLimiter = newTokenBucket(acceptMaxTokens, acceptTokenDelay) - if config.SingleHopTopolgy != nil { - SingleHopTopolgy = *config.SingleHopTopolgy - } return router, nil } diff --git a/routes.go b/routes.go index 10ff315..ed88233 100644 --- a/routes.go +++ b/routes.go @@ -227,7 +227,11 @@ func (r *routes) calculate() { // to exchange knowledge of MAC addresses, nor any constraints on // the routes that we construct. func (r *routes) calculateUnicast(establishedAndSymmetric bool) unicastRoutes { - _, unicast := r.ourself.routes(nil, establishedAndSymmetric) + singleHopTopology := false + if r.ourself.router != nil { + singleHopTopology = r.ourself.router.Config.SingleHopTopolgy + } + _, unicast := r.ourself.routes(nil, establishedAndSymmetric, singleHopTopology) return unicast } @@ -255,7 +259,11 @@ func (r *routes) calculateBroadcast(name PeerName, establishedAndSymmetric bool) if !found { return hops } - if found, reached := peer.routes(r.ourself.Peer, establishedAndSymmetric); found { + singleHopTopology := false + if r.ourself.router != nil { + singleHopTopology = r.ourself.router.Config.SingleHopTopolgy + } + if found, reached := peer.routes(r.ourself.Peer, establishedAndSymmetric, singleHopTopology); found { r.ourself.forEachConnectedPeer(establishedAndSymmetric, reached, func(remotePeer *Peer) { hops = append(hops, remotePeer.Name) }) }