Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic crawler #663

Merged
merged 4 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package crawler

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-msgio/protoio"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

var logger = logging.Logger("dht-crawler")

type Crawler struct {
parallelism int
host host.Host
dhtRPC *pb.ProtocolMessenger
}

func New(host host.Host, opts ...Option) (*Crawler, error) {
o := new(options)
if err := defaults(o); err != nil {
return nil, err
}
for _, opt := range opts {
if err := opt(o); err != nil {
return nil, err
}
}

pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
if err != nil {
return nil, err
}

return &Crawler{
parallelism: o.parallelism,
host: host,
dhtRPC: pm,
}, nil
}

// MessageSender handles sending wire protocol messages to a given peer
type messageSender struct {
h host.Host
protocols []protocol.ID
timeout time.Duration
}

// SendRequest sends a peer a message and waits for its response
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return nil, err
}

w := protoio.NewDelimitedWriter(s)
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}

r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
tctx, cancel := context.WithTimeout(ctx, ms.timeout)
defer cancel()
defer func() { _ = s.Close() }()

msg := new(pb.Message)
if err := ctxReadMsg(tctx, r, msg); err != nil {
_ = s.Reset()
return nil, err
}

return msg, nil
}

func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r protoio.ReadCloser) {
defer close(errc)
err := r.ReadMsg(mes)
errc <- err
}(rc)

select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// SendMessage sends a peer a message without waiting on a response
func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return err
}
defer func() { _ = s.Close() }()

w := protoio.NewDelimitedWriter(s)
return w.WriteMsg(pmes)
}

type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
type HandleQueryFail func(p peer.ID, err error)

func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)

// Start worker goroutines
var wg sync.WaitGroup
wg.Add(c.parallelism)
for i := 0; i < c.parallelism; i++ {
go func() {
defer wg.Done()
for p := range jobs {
res := queryPeer(ctx, c.host, c.dhtRPC, p)
results <- res
}
}()
}

defer wg.Wait()
defer close(jobs)

toDial := make([]*peer.AddrInfo, 0, 1000)
willscott marked this conversation as resolved.
Show resolved Hide resolved
peersSeen := make(map[peer.ID]struct{})

for _, ai := range startingPeers {
toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
}

numQueried := 0
outstanding := 0

for len(toDial) > 0 || outstanding > 0 {
var jobCh chan peer.ID
var nextPeerID peer.ID
if len(toDial) > 0 {
jobCh = jobs
nextPeerID = toDial[0].ID
}

select {
case res := <-results:
if len(res.data) > 0 {
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hour seems potentially racy. could imagine this lasting longer than that.

if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
}
rtPeers = append(rtPeers, ai)
}
if handleSuccess != nil {
handleSuccess(res.peer, rtPeers)
}
} else if handleFail != nil {
handleFail(res.peer, res.err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit more scaffolding categorizing errors would be useful. how many attempted connections are timing out? how many are failing to connect?

}
outstanding--
case jobCh <- nextPeerID:
outstanding++
numQueried++
toDial = toDial[1:]
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen))
}
}
}

type queryResult struct {
peer peer.ID
data map[peer.ID]*peer.AddrInfo
err error
}

func queryPeer(ctx context.Context, host host.Host, dhtRPC *pb.ProtocolMessenger, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

connCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
err = host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Errorf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

localPeers := make(map[peer.ID]*peer.AddrInfo)
var retErr error
for cpl := 0; cpl <= 15; cpl++ {
generatePeer, err := tmpRT.GenRandPeerID(uint(cpl))
if err != nil {
panic(err)
}
peers, err := dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer)
if err != nil {
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err)
retErr = err
break
}
for _, ai := range peers {
if _, ok := localPeers[ai.ID]; !ok {
localPeers[ai.ID] = ai
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many peers have a valid dial-able address in their address info?

}
}
}
return &queryResult{nextPeer, localPeers, retErr}
}
51 changes: 51 additions & 0 deletions crawler/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package crawler

import (
"github.com/libp2p/go-libp2p-core/protocol"
"time"
)

// Option DHT Crawler option type.
type Option func(*options) error

type options struct {
protocols []protocol.ID
parallelism int
crawlInterval time.Duration
perMsgTimeout time.Duration
}

// defaults are the default crawler options. This option will be automatically
// prepended to any options you pass to the crawler constructor.
var defaults = func(o *options) error {
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"}
o.parallelism = 1000
o.crawlInterval = time.Hour
willscott marked this conversation as resolved.
Show resolved Hide resolved
o.perMsgTimeout = time.Second * 5

return nil
}

// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes
func WithProtocols(protocols []protocol.ID) Option {
return func(o *options) error {
o.protocols = append([]protocol.ID{}, protocols...)
return nil
}
}

// WithParallelism defines the number of queries that can be issued in parallel
func WithParallelism(parallelism int) Option {
return func(o *options) error {
o.parallelism = parallelism
return nil
}
}

// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed
func WithMsgTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.perMsgTimeout = timeout
return nil
}
}