Skip to content

Commit

Permalink
feat: skip validation if peer is known (#2491)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 15, 2021
1 parent 1d6fde6 commit fb8a54a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
45 changes: 41 additions & 4 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/ratelimit"
"github.com/ethersphere/bee/pkg/swarm"
lru "github.com/hashicorp/golang-lru"
ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -39,6 +40,9 @@ const (
maxBatchSize = 30
pingTimeout = time.Second * 5 // time to wait for ping to succeed
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
cacheSize = 100000
bitsPerByte = 8
cachePrefix = swarm.MaxBins / bitsPerByte // enough bytes (32 bits) to uniquely identify a peer
)

var (
Expand All @@ -62,9 +66,17 @@ type Service struct {
wg sync.WaitGroup
peersChan chan pb.Peers
sem *semaphore.Weighted
lru *lru.Cache // cache for unreachable peers
bootnode bool
}

func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, logger logging.Logger) (*Service, error) {

lruCache, err := lru.New(cacheSize)
if err != nil {
return nil, err
}

svc := &Service{
streamer: streamer,
logger: logger,
Expand All @@ -76,9 +88,15 @@ func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, network
quit: make(chan struct{}),
peersChan: make(chan pb.Peers),
sem: semaphore.NewWeighted(int64(31)),
lru: lruCache,
bootnode: bootnode,
}
svc.startCheckPeersHandler()
return svc

if !bootnode {
svc.startCheckPeersHandler()
}

return svc, nil
}

func (s *Service) Protocol() p2p.ProtocolSpec {
Expand Down Expand Up @@ -207,6 +225,10 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
// but we still want to handle not closed stream from the other side to avoid zombie stream
go stream.FullClose()

if s.bootnode {
return nil
}

select {
case s.peersChan <- peersReq:
case <-s.quit:
Expand Down Expand Up @@ -263,6 +285,20 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
wg := sync.WaitGroup{}

for _, p := range peers.Peers {

overlay := swarm.NewAddress(p.Overlay)
cacheOverlay := overlay.ByteString()[:cachePrefix]

// cached peer, skip
if _, ok := s.lru.Get(cacheOverlay); ok {
continue
}

// if peer exists already in the addressBook, skip
if _, err := s.addressBook.Get(overlay); err == nil {
continue
}

err := s.sem.Acquire(ctx, 1)
if err != nil {
return
Expand All @@ -275,6 +311,8 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {

defer func() {
s.sem.Release(1)
// mark peer as seen
_ = s.lru.Add(cacheOverlay, nil)
wg.Done()
}()

Expand Down Expand Up @@ -320,7 +358,6 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
mtx.Unlock()
}(p)
}

wg.Wait()

if s.addPeersHandler != nil && len(peersToAdd) > 0 {
Expand Down
8 changes: 4 additions & 4 deletions pkg/hive/hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestHandlerRateLimit(t *testing.T) {
// new recorder for handling Ping
streamer := streamtest.New()
// create a hive server that handles the incoming stream
server := hive.New(streamer, addressbookclean, networkID, logger)
server, _ := hive.New(streamer, addressbookclean, networkID, false, logger)

serverAddress := test.RandomAddress()

Expand Down Expand Up @@ -88,7 +88,7 @@ func TestHandlerRateLimit(t *testing.T) {
}

// create a hive client that will do broadcast
client := hive.New(serverRecorder, addressbook, networkID, logger)
client, _ := hive.New(serverRecorder, addressbook, networkID, false, logger)
err := client.BroadcastPeers(context.Background(), serverAddress, peers...)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -229,15 +229,15 @@ func TestBroadcastPeers(t *testing.T) {
streamer = streamtest.New()
}
// create a hive server that handles the incoming stream
server := hive.New(streamer, addressbookclean, networkID, logger)
server, _ := hive.New(streamer, addressbookclean, networkID, false, logger)

// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
)

// create a hive client that will do broadcast
client := hive.New(recorder, addressbook, networkID, logger)
client, _ := hive.New(recorder, addressbook, networkID, false, logger)
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,11 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
return nil, fmt.Errorf("pingpong service: %w", err)
}

hive := hive.New(p2ps, addressbook, networkID, logger)
hive, err := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, logger)
if err != nil {
return nil, fmt.Errorf("hive: %w", err)
}

if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
}
Expand Down

0 comments on commit fb8a54a

Please sign in to comment.