From c39a6b6e3ff4ad53bf6467ded3aeb1693565d74a Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 30 Nov 2022 15:55:48 +0100 Subject: [PATCH] fix: further development --- .github/workflows/cla.yml | 29 ----------------------------- network/frost/frost.go | 15 +++++++++++---- network/frost/frost_stream.go | 18 +++++++++++------- network/identity/identity.go | 28 ---------------------------- network/server.go | 33 +++++++++++++++++++++++++-------- network/server_frost.go | 1 - 6 files changed, 47 insertions(+), 77 deletions(-) delete mode 100644 .github/workflows/cla.yml diff --git a/.github/workflows/cla.yml b/.github/workflows/cla.yml deleted file mode 100644 index 1a3e412870..0000000000 --- a/.github/workflows/cla.yml +++ /dev/null @@ -1,29 +0,0 @@ ---- -name: "CLA Assistant" -on: # yamllint disable-line rule:truthy - issue_comment: - types: - - created - pull_request_target: - types: - - opened - - closed - - synchronize - -jobs: - CLAssistant: - runs-on: ubuntu-latest - steps: - - name: "CLA Assistant" - if: (github.event.comment.body == 'recheck' || github.event.comment.body == 'I have read the CLA Document and I hereby sign the CLA') || github.event_name == 'pull_request_target' - # Beta Release - uses: cla-assistant/github-action@v2.1.3-beta - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - # the below token should have repo scope and must be manually added by you in the repository's secret - PERSONAL_ACCESS_TOKEN: ${{ secrets.PERSONAL_ACCESS_TOKEN }} - with: - path-to-signatures: 'cla.json' - path-to-document: 'https://github.com/0xPolygon/polygon-edge/blob/develop/CLA.md' - branch: 'cla-signatures' - allowlist: dependabot[bot],dependabot-preview[bot] diff --git a/network/frost/frost.go b/network/frost/frost.go index f0cb99a77c..08d1518378 100644 --- a/network/frost/frost.go +++ b/network/frost/frost.go @@ -3,6 +3,7 @@ package frost import ( "errors" "fmt" + "time" "github.com/0xPolygon/polygon-edge/network/event" "github.com/hashicorp/go-hclog" @@ -26,7 +27,7 @@ type networkingServer interface { NewFrostClient(peerID peer.ID) (network.Stream, error) // DisconnectFromPeer attempts to disconnect from the specified peer - DisconnectFromPeer(peerID peer.ID, reason string) + DisconnectFromFrostPeer(peerID peer.ID, reason string) // AddPeer adds a peer to the networking server's peer store AddFrostPeer(id peer.ID, direction network.Direction) @@ -150,13 +151,12 @@ func (f *FrostService) GetNotifyBundle() *network.NotifyBundle { // disconnectFromPeer disconnects from the specified peer func (f *FrostService) disconnectFromPeer(peerID peer.ID, reason string) { - f.baseServer.DisconnectFromPeer(peerID, reason) + f.baseServer.DisconnectFromFrostPeer(peerID, reason) } // handleConnected handles new network connections (handshakes) func (f *FrostService) handleConnected(peerID peer.ID, direction network.Direction) error { - fmt.Println(">>>>>>>>>>>>> FROST handle connected peer id: ", peerID) - _, clientErr := f.baseServer.NewFrostClient(peerID) + stream, clientErr := f.baseServer.NewFrostClient(peerID) if clientErr != nil { return fmt.Errorf( "unable to create new frost client connection, %w", @@ -165,6 +165,12 @@ func (f *FrostService) handleConnected(peerID peer.ID, direction network.Directi } fmt.Println(">>>>>>>>>>>>> FROST Here we should communicate and check frost info of the peer: ", peerID) + stream.Write([]byte("HelloToposNode o yee")) + time.Sleep(time.Second * 5) + + rd := []byte{} + num, _ := stream.Read(rd) + fmt.Println(">>>>>>>>>>>>> FROST Read response: ", rd, " number of bytes:", num) // Validate that the peers are working on the same chain // if status.Chain != resp.Chain { @@ -172,6 +178,7 @@ func (f *FrostService) handleConnected(peerID peer.ID, direction network.Directi // } // If this is a NOT temporary connection, save it + f.baseServer.AddFrostPeer(peerID, direction) return nil diff --git a/network/frost/frost_stream.go b/network/frost/frost_stream.go index df5c16d173..6f3e3bb1c9 100644 --- a/network/frost/frost_stream.go +++ b/network/frost/frost_stream.go @@ -33,16 +33,20 @@ func (s *FrostStream) Client(stream network.Stream) *network.Stream { } func (s *FrostStream) Handler() func(network.Stream) { + fmt.Println(">>>>>>>>>>>>>>> FROST Handler called:") + return func(stream network.Stream) { + go func() { + for { + b := <-s.streamCh + fmt.Println(">>>>>>>>>>>>>>> FROST RECEIVED BYTES >> :", b) + } + }() + select { case <-s.ctx.Done(): return case s.streamCh <- stream: - fmt.Println(">>>>>>>>>>>>>>> RECEIVED BYTES:") - //b := make([]byte, len(p)) - b := <-s.streamCh - fmt.Println(">>>>>>>>>>>>>>> RECEIVED BYTES >> :", b) - } } } @@ -59,11 +63,11 @@ func (s *FrostStream) Accept() (net.Conn, error) { } // Addr implements the net.Listener interface -func (g *FrostStream) Addr() net.Addr { +func (s *FrostStream) Addr() net.Addr { return fakeLocalAddr() } -func (g *FrostStream) Close() error { +func (s *FrostStream) Close() error { return nil } diff --git a/network/identity/identity.go b/network/identity/identity.go index a09e09250d..3ebb022906 100644 --- a/network/identity/identity.go +++ b/network/identity/identity.go @@ -133,34 +133,6 @@ func (i *IdentityService) GetNotifyBundle() *network.NotifyBundle { } } -// hasPendingStatus checks if a peer is pending handshake [Thread safe] -// func (i *IdentityService) hasPendingStatus(id peer.ID) bool { -// _, ok := i.pendingPeerConnections.Load(id) - -// return ok -// } - -// removePendingStatus removes the pending status from a peer, -// and updates adequate counter information [Thread safe] -// func (i *IdentityService) removePendingStatus(peerID peer.ID) { -// if value, loaded := i.pendingPeerConnections.LoadAndDelete(peerID); loaded { -// direction, ok := value.(network.Direction) -// if !ok { -// return -// } - -// i.baseServer.UpdatePendingConnCount(-1, direction) -// } -// } - -// addPendingStatus adds the pending status to a peer, -// and updates adequate counter information [Thread safe] -// func (i *IdentityService) addPendingStatus(id peer.ID, direction network.Direction) { -// if _, loaded := i.pendingPeerConnections.LoadOrStore(id, direction); !loaded { -// i.baseServer.UpdatePendingConnCount(1, direction) -// } -// } - // disconnectFromPeer disconnects from the specified peer func (i *IdentityService) disconnectFromPeer(peerID peer.ID, reason string) { i.baseServer.DisconnectFromPeer(peerID, reason) diff --git a/network/server.go b/network/server.go index 95e8948af9..add20fc53e 100644 --- a/network/server.go +++ b/network/server.go @@ -90,7 +90,7 @@ type Server struct { frostPeers map[peer.ID]*FrostPeerConnInfo // map of all topos node connections frostPeersLock sync.Mutex // lock for the peer map of topos nodes - frostPendingPeerConnections sync.Map // Map that keeps track of the pending status of peers; peerID -> bool + frostPendingPeerConnections sync.Map // Map that keeps track of the frost pending status frostConnectionCounts *ConnectionInfo } @@ -626,7 +626,8 @@ func (s *Server) updateBootnodeConnCount(peerID peer.ID, delta int64) { // DisconnectFromPeer disconnects the networking server from the specified peer func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) { - if s.host.Network().Connectedness(peer) == network.Connected { + _, frostPeerConection := s.frostPeers[peer] + if (s.host.Network().Connectedness(peer) == network.Connected) && (!frostPeerConection) { s.logger.Info(fmt.Sprintf("Closing connection to peer [%s] for reason [%s]", peer.String(), reason)) if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil { @@ -635,6 +636,17 @@ func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) { } } +func (s *Server) DisconnectFromFrostPeer(peer peer.ID, reason string) { + _, peerConection := s.peers[peer] + if (s.host.Network().Connectedness(peer) == network.Connected) && (!peerConection) { + s.logger.Info(fmt.Sprintf("Closing frost connection to peer [%s] for reason [%s]", peer.String(), reason)) + + if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil { + s.logger.Error(fmt.Sprintf("Unable to gracefully close frost peer connection, %v", closeErr)) + } + } +} + var ( // Anything below 35s is prone to false timeouts, as seen from empirical test data DefaultJoinTimeout = 100 * time.Second @@ -898,10 +910,12 @@ func (s *Server) updatePendingConnCountMetrics(direction network.Direction) { func (s *Server) updateFrostConnCountMetrics(direction network.Direction) { switch direction { case network.DirInbound: - metrics.SetGauge([]string{"inbound_frost_connections_count"}, float32(s.frostConnectionCounts.GetInboundConnCount())) + metrics.SetGauge([]string{"inbound_frost_connections_count"}, + float32(s.frostConnectionCounts.GetInboundConnCount())) case network.DirOutbound: - metrics.SetGauge([]string{"outbound_frost_connections_count"}, float32(s.frostConnectionCounts.GetOutboundConnCount())) + metrics.SetGauge([]string{"outbound_frost_connections_count"}, + float32(s.frostConnectionCounts.GetOutboundConnCount())) } } @@ -922,28 +936,30 @@ func (s *Server) updateFrostPendingConnCountMetrics(direction network.Direction) func (s *Server) HasPendingStatus(peerID peer.ID) bool { _, pendingPeerConection := s.pendingPeerConnections.Load(peerID) _, pendingFrostPeerConection := s.frostPendingPeerConnections.Load(peerID) + return pendingPeerConection || pendingFrostPeerConection } func (s *Server) HasIdentityPendingStatus(peerID peer.ID) bool { _, pendingPeerConection := s.pendingPeerConnections.Load(peerID) + return pendingPeerConection } func (s *Server) HasFrostPendingStatus(peerID peer.ID) bool { _, pendingFrostPeerConection := s.frostPendingPeerConnections.Load(peerID) + return pendingFrostPeerConection } func (s *Server) AddPendingPeer(peerID peer.ID, direction network.Direction) { - s.logger.Debug("Add pending peer", "id", peerID) if _, loaded := s.pendingPeerConnections.LoadOrStore(peerID, direction); !loaded { s.UpdatePendingConnCount(1, direction) + s.logger.Debug("Add pending peer", "id", peerID) } } func (s *Server) RemovePendingPeer(peerID peer.ID) { - s.logger.Debug("Remove pending peer", "id", peerID) if value, loaded := s.pendingPeerConnections.LoadAndDelete(peerID); loaded { direction, ok := value.(network.Direction) if !ok { @@ -951,18 +967,18 @@ func (s *Server) RemovePendingPeer(peerID peer.ID) { } s.UpdatePendingConnCount(-1, direction) + s.logger.Debug("Remove pending peer", "id", peerID) } } func (s *Server) AddFrostPendingPeer(peerID peer.ID, direction network.Direction) { - s.logger.Debug("Add frost pending peer", "id", peerID) if _, loaded := s.frostPendingPeerConnections.LoadOrStore(peerID, direction); !loaded { s.UpdateFrostPendingConnCount(1, direction) + s.logger.Debug("Added frost pending peer", "id", peerID) } } func (s *Server) RemoveFrostPendingPeer(peerID peer.ID) { - s.logger.Debug("Remove frost pending peer", "id", peerID) if value, loaded := s.frostPendingPeerConnections.LoadAndDelete(peerID); loaded { direction, ok := value.(network.Direction) if !ok { @@ -970,5 +986,6 @@ func (s *Server) RemoveFrostPendingPeer(peerID peer.ID) { } s.UpdateFrostPendingConnCount(-1, direction) + s.logger.Debug("Removed frost pending peer", "id", peerID) } } diff --git a/network/server_frost.go b/network/server_frost.go index 128ec1c0de..944d34ec60 100644 --- a/network/server_frost.go +++ b/network/server_frost.go @@ -13,7 +13,6 @@ import ( // NewFrostClient returns a new frost stream client connection func (s *Server) NewFrostClient(peerID peer.ID) (network.Stream, error) { - // Create a new stream connection and return it stream, err := s.NewStream(common.Frost, peerID) if err != nil {