From fb8a54a9f603d34efb7c19ecd57110c8adaaf3de Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 15 Sep 2021 11:08:44 +0300 Subject: [PATCH] feat: skip validation if peer is known (#2491) --- pkg/hive/hive.go | 45 +++++++++++++++++++++++++++++++++++++++---- pkg/hive/hive_test.go | 8 ++++---- pkg/node/node.go | 6 +++++- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index e4d0a51846b..02e27ba5336 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -28,6 +28,7 @@ import ( "github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/ratelimit" "github.com/ethersphere/bee/pkg/swarm" + lru "github.com/hashicorp/golang-lru" ma "github.com/multiformats/go-multiaddr" ) @@ -39,6 +40,9 @@ const ( maxBatchSize = 30 pingTimeout = time.Second * 5 // time to wait for ping to succeed batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation + cacheSize = 100000 + bitsPerByte = 8 + cachePrefix = swarm.MaxBins / bitsPerByte // enough bytes (32 bits) to uniquely identify a peer ) var ( @@ -62,9 +66,17 @@ type Service struct { wg sync.WaitGroup peersChan chan pb.Peers sem *semaphore.Weighted + lru *lru.Cache // cache for unreachable peers + bootnode bool } -func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service { +func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, logger logging.Logger) (*Service, error) { + + lruCache, err := lru.New(cacheSize) + if err != nil { + return nil, err + } + svc := &Service{ streamer: streamer, logger: logger, @@ -76,9 +88,15 @@ func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, network quit: make(chan struct{}), peersChan: make(chan pb.Peers), sem: semaphore.NewWeighted(int64(31)), + lru: lruCache, + bootnode: bootnode, } - svc.startCheckPeersHandler() - return svc + + if !bootnode { + svc.startCheckPeersHandler() + } + + return svc, nil } func (s *Service) Protocol() p2p.ProtocolSpec { @@ -207,6 +225,10 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St // but we still want to handle not closed stream from the other side to avoid zombie stream go stream.FullClose() + if s.bootnode { + return nil + } + select { case s.peersChan <- peersReq: case <-s.quit: @@ -263,6 +285,20 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { wg := sync.WaitGroup{} for _, p := range peers.Peers { + + overlay := swarm.NewAddress(p.Overlay) + cacheOverlay := overlay.ByteString()[:cachePrefix] + + // cached peer, skip + if _, ok := s.lru.Get(cacheOverlay); ok { + continue + } + + // if peer exists already in the addressBook, skip + if _, err := s.addressBook.Get(overlay); err == nil { + continue + } + err := s.sem.Acquire(ctx, 1) if err != nil { return @@ -275,6 +311,8 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { defer func() { s.sem.Release(1) + // mark peer as seen + _ = s.lru.Add(cacheOverlay, nil) wg.Done() }() @@ -320,7 +358,6 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { mtx.Unlock() }(p) } - wg.Wait() if s.addPeersHandler != nil && len(peersToAdd) > 0 { diff --git a/pkg/hive/hive_test.go b/pkg/hive/hive_test.go index 728febea4cb..f0147a0d260 100644 --- a/pkg/hive/hive_test.go +++ b/pkg/hive/hive_test.go @@ -49,7 +49,7 @@ func TestHandlerRateLimit(t *testing.T) { // new recorder for handling Ping streamer := streamtest.New() // create a hive server that handles the incoming stream - server := hive.New(streamer, addressbookclean, networkID, logger) + server, _ := hive.New(streamer, addressbookclean, networkID, false, logger) serverAddress := test.RandomAddress() @@ -88,7 +88,7 @@ func TestHandlerRateLimit(t *testing.T) { } // create a hive client that will do broadcast - client := hive.New(serverRecorder, addressbook, networkID, logger) + client, _ := hive.New(serverRecorder, addressbook, networkID, false, logger) err := client.BroadcastPeers(context.Background(), serverAddress, peers...) if err != nil { t.Fatal(err) @@ -229,7 +229,7 @@ func TestBroadcastPeers(t *testing.T) { streamer = streamtest.New() } // create a hive server that handles the incoming stream - server := hive.New(streamer, addressbookclean, networkID, logger) + server, _ := hive.New(streamer, addressbookclean, networkID, false, logger) // setup the stream recorder to record stream data recorder := streamtest.New( @@ -237,7 +237,7 @@ func TestBroadcastPeers(t *testing.T) { ) // create a hive client that will do broadcast - client := hive.New(recorder, addressbook, networkID, logger) + client, _ := hive.New(recorder, addressbook, networkID, false, logger) if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil { t.Fatal(err) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 9e983f55e2e..14a18d8bd6b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -496,7 +496,11 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo return nil, fmt.Errorf("pingpong service: %w", err) } - hive := hive.New(p2ps, addressbook, networkID, logger) + hive, err := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, logger) + if err != nil { + return nil, fmt.Errorf("hive: %w", err) + } + if err = p2ps.AddProtocol(hive.Protocol()); err != nil { return nil, fmt.Errorf("hive service: %w", err) }