Skip to content

Commit

Permalink
fix: further development
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Nov 30, 2022
1 parent 2448d75 commit c39a6b6
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 77 deletions.
29 changes: 0 additions & 29 deletions .github/workflows/cla.yml

This file was deleted.

15 changes: 11 additions & 4 deletions network/frost/frost.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frost
import (
"errors"
"fmt"
"time"

"github.com/0xPolygon/polygon-edge/network/event"
"github.com/hashicorp/go-hclog"
Expand All @@ -26,7 +27,7 @@ type networkingServer interface {
NewFrostClient(peerID peer.ID) (network.Stream, error)

// DisconnectFromPeer attempts to disconnect from the specified peer
DisconnectFromPeer(peerID peer.ID, reason string)
DisconnectFromFrostPeer(peerID peer.ID, reason string)

// AddPeer adds a peer to the networking server's peer store
AddFrostPeer(id peer.ID, direction network.Direction)
Expand Down Expand Up @@ -150,13 +151,12 @@ func (f *FrostService) GetNotifyBundle() *network.NotifyBundle {

// disconnectFromPeer disconnects from the specified peer
func (f *FrostService) disconnectFromPeer(peerID peer.ID, reason string) {
f.baseServer.DisconnectFromPeer(peerID, reason)
f.baseServer.DisconnectFromFrostPeer(peerID, reason)
}

// handleConnected handles new network connections (handshakes)
func (f *FrostService) handleConnected(peerID peer.ID, direction network.Direction) error {
fmt.Println(">>>>>>>>>>>>> FROST handle connected peer id: ", peerID)
_, clientErr := f.baseServer.NewFrostClient(peerID)
stream, clientErr := f.baseServer.NewFrostClient(peerID)
if clientErr != nil {
return fmt.Errorf(
"unable to create new frost client connection, %w",
Expand All @@ -165,13 +165,20 @@ func (f *FrostService) handleConnected(peerID peer.ID, direction network.Directi
}

fmt.Println(">>>>>>>>>>>>> FROST Here we should communicate and check frost info of the peer: ", peerID)
stream.Write([]byte("HelloToposNode o yee"))
time.Sleep(time.Second * 5)

rd := []byte{}
num, _ := stream.Read(rd)
fmt.Println(">>>>>>>>>>>>> FROST Read response: ", rd, " number of bytes:", num)

// Validate that the peers are working on the same chain
// if status.Chain != resp.Chain {
// return ErrInvalidChainID
// }

// If this is a NOT temporary connection, save it

f.baseServer.AddFrostPeer(peerID, direction)

return nil
Expand Down
18 changes: 11 additions & 7 deletions network/frost/frost_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ func (s *FrostStream) Client(stream network.Stream) *network.Stream {
}

func (s *FrostStream) Handler() func(network.Stream) {
fmt.Println(">>>>>>>>>>>>>>> FROST Handler called:")

return func(stream network.Stream) {
go func() {
for {
b := <-s.streamCh
fmt.Println(">>>>>>>>>>>>>>> FROST RECEIVED BYTES >> :", b)
}
}()

select {
case <-s.ctx.Done():
return
case s.streamCh <- stream:
fmt.Println(">>>>>>>>>>>>>>> RECEIVED BYTES:")
//b := make([]byte, len(p))
b := <-s.streamCh
fmt.Println(">>>>>>>>>>>>>>> RECEIVED BYTES >> :", b)

}
}
}
Expand All @@ -59,11 +63,11 @@ func (s *FrostStream) Accept() (net.Conn, error) {
}

// Addr implements the net.Listener interface
func (g *FrostStream) Addr() net.Addr {
func (s *FrostStream) Addr() net.Addr {
return fakeLocalAddr()
}

func (g *FrostStream) Close() error {
func (s *FrostStream) Close() error {
return nil
}

Expand Down
28 changes: 0 additions & 28 deletions network/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,34 +133,6 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle {
}
}

// hasPendingStatus checks if a peer is pending handshake [Thread safe]
// func (i *IdentityService) hasPendingStatus(id peer.ID) bool {
// _, ok := i.pendingPeerConnections.Load(id)

// return ok
// }

// removePendingStatus removes the pending status from a peer,
// and updates adequate counter information [Thread safe]
// func (i *IdentityService) removePendingStatus(peerID peer.ID) {
// if value, loaded := i.pendingPeerConnections.LoadAndDelete(peerID); loaded {
// direction, ok := value.(network.Direction)
// if !ok {
// return
// }

// i.baseServer.UpdatePendingConnCount(-1, direction)
// }
// }

// addPendingStatus adds the pending status to a peer,
// and updates adequate counter information [Thread safe]
// func (i *IdentityService) addPendingStatus(id peer.ID, direction network.Direction) {
// if _, loaded := i.pendingPeerConnections.LoadOrStore(id, direction); !loaded {
// i.baseServer.UpdatePendingConnCount(1, direction)
// }
// }

// disconnectFromPeer disconnects from the specified peer
func (i *IdentityService) disconnectFromPeer(peerID peer.ID, reason string) {
i.baseServer.DisconnectFromPeer(peerID, reason)
Expand Down
33 changes: 25 additions & 8 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Server struct {

frostPeers map[peer.ID]*FrostPeerConnInfo // map of all topos node connections
frostPeersLock sync.Mutex // lock for the peer map of topos nodes
frostPendingPeerConnections sync.Map // Map that keeps track of the pending status of peers; peerID -> bool
frostPendingPeerConnections sync.Map // Map that keeps track of the frost pending status
frostConnectionCounts *ConnectionInfo
}

Expand Down Expand Up @@ -626,7 +626,8 @@ func (s *Server) updateBootnodeConnCount(peerID peer.ID, delta int64) {

// DisconnectFromPeer disconnects the networking server from the specified peer
func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) {
if s.host.Network().Connectedness(peer) == network.Connected {
_, frostPeerConection := s.frostPeers[peer]
if (s.host.Network().Connectedness(peer) == network.Connected) && (!frostPeerConection) {
s.logger.Info(fmt.Sprintf("Closing connection to peer [%s] for reason [%s]", peer.String(), reason))

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
Expand All @@ -635,6 +636,17 @@ func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) {
}
}

func (s *Server) DisconnectFromFrostPeer(peer peer.ID, reason string) {
_, peerConection := s.peers[peer]
if (s.host.Network().Connectedness(peer) == network.Connected) && (!peerConection) {
s.logger.Info(fmt.Sprintf("Closing frost connection to peer [%s] for reason [%s]", peer.String(), reason))

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
s.logger.Error(fmt.Sprintf("Unable to gracefully close frost peer connection, %v", closeErr))
}
}
}

var (
// Anything below 35s is prone to false timeouts, as seen from empirical test data
DefaultJoinTimeout = 100 * time.Second
Expand Down Expand Up @@ -898,10 +910,12 @@ func (s *Server) updatePendingConnCountMetrics(direction network.Direction) {
func (s *Server) updateFrostConnCountMetrics(direction network.Direction) {
switch direction {
case network.DirInbound:
metrics.SetGauge([]string{"inbound_frost_connections_count"}, float32(s.frostConnectionCounts.GetInboundConnCount()))
metrics.SetGauge([]string{"inbound_frost_connections_count"},
float32(s.frostConnectionCounts.GetInboundConnCount()))

case network.DirOutbound:
metrics.SetGauge([]string{"outbound_frost_connections_count"}, float32(s.frostConnectionCounts.GetOutboundConnCount()))
metrics.SetGauge([]string{"outbound_frost_connections_count"},
float32(s.frostConnectionCounts.GetOutboundConnCount()))
}
}

Expand All @@ -922,53 +936,56 @@ func (s *Server) updateFrostPendingConnCountMetrics(direction network.Direction)
func (s *Server) HasPendingStatus(peerID peer.ID) bool {
_, pendingPeerConection := s.pendingPeerConnections.Load(peerID)
_, pendingFrostPeerConection := s.frostPendingPeerConnections.Load(peerID)

return pendingPeerConection || pendingFrostPeerConection
}

func (s *Server) HasIdentityPendingStatus(peerID peer.ID) bool {
_, pendingPeerConection := s.pendingPeerConnections.Load(peerID)

return pendingPeerConection
}

func (s *Server) HasFrostPendingStatus(peerID peer.ID) bool {
_, pendingFrostPeerConection := s.frostPendingPeerConnections.Load(peerID)

return pendingFrostPeerConection
}

func (s *Server) AddPendingPeer(peerID peer.ID, direction network.Direction) {
s.logger.Debug("Add pending peer", "id", peerID)
if _, loaded := s.pendingPeerConnections.LoadOrStore(peerID, direction); !loaded {
s.UpdatePendingConnCount(1, direction)
s.logger.Debug("Add pending peer", "id", peerID)
}
}

func (s *Server) RemovePendingPeer(peerID peer.ID) {
s.logger.Debug("Remove pending peer", "id", peerID)
if value, loaded := s.pendingPeerConnections.LoadAndDelete(peerID); loaded {
direction, ok := value.(network.Direction)
if !ok {
return
}

s.UpdatePendingConnCount(-1, direction)
s.logger.Debug("Remove pending peer", "id", peerID)
}
}

func (s *Server) AddFrostPendingPeer(peerID peer.ID, direction network.Direction) {
s.logger.Debug("Add frost pending peer", "id", peerID)
if _, loaded := s.frostPendingPeerConnections.LoadOrStore(peerID, direction); !loaded {
s.UpdateFrostPendingConnCount(1, direction)
s.logger.Debug("Added frost pending peer", "id", peerID)
}
}

func (s *Server) RemoveFrostPendingPeer(peerID peer.ID) {
s.logger.Debug("Remove frost pending peer", "id", peerID)
if value, loaded := s.frostPendingPeerConnections.LoadAndDelete(peerID); loaded {
direction, ok := value.(network.Direction)
if !ok {
return
}

s.UpdateFrostPendingConnCount(-1, direction)
s.logger.Debug("Removed frost pending peer", "id", peerID)
}
}
1 change: 0 additions & 1 deletion network/server_frost.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

// NewFrostClient returns a new frost stream client connection
func (s *Server) NewFrostClient(peerID peer.ID) (network.Stream, error) {

// Create a new stream connection and return it
stream, err := s.NewStream(common.Frost, peerID)
if err != nil {
Expand Down

0 comments on commit c39a6b6

Please sign in to comment.