From a2d41e57fdcef3022b4b95aef3b976556b7f35e9 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 3 Jun 2020 18:48:47 -0400 Subject: [PATCH 1/4] feat: added a simple dht crawler --- crawler/crawler.go | 224 +++++++++++++++++++++++++++++++++++++++++++++ crawler/options.go | 51 +++++++++++ 2 files changed, 275 insertions(+) create mode 100644 crawler/crawler.go create mode 100644 crawler/options.go diff --git a/crawler/crawler.go b/crawler/crawler.go new file mode 100644 index 000000000..c882f2827 --- /dev/null +++ b/crawler/crawler.go @@ -0,0 +1,224 @@ +package crawler + +import ( + "context" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-msgio/protoio" + + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + kbucket "github.com/libp2p/go-libp2p-kbucket" +) + +var logger = logging.Logger("dht-crawler") + +type Crawler struct { + parallelism int + host host.Host + dhtRPC *pb.ProtocolMessenger +} + +func New(host host.Host, opts ...Option) (*Crawler, error) { + o := new(options) + if err := defaults(o); err != nil { + return nil, err + } + for _, opt := range opts { + if err := opt(o); err != nil { + return nil, err + } + } + + pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout}) + if err != nil { + return nil, err + } + + return &Crawler{ + parallelism: o.parallelism, + host: host, + dhtRPC: pm, + }, nil +} + +// MessageSender handles sending wire protocol messages to a given peer +type messageSender struct { + h host.Host + protocols []protocol.ID + timeout time.Duration +} + +// SendRequest sends a peer a message and waits for its response +func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + s, err := ms.h.NewStream(ctx, p, ms.protocols...) + if err != nil { + return nil, err + } + + w := protoio.NewDelimitedWriter(s) + if err := w.WriteMsg(pmes); err != nil { + return nil, err + } + + r := protoio.NewDelimitedReader(s, network.MessageSizeMax) + tctx, cancel := context.WithTimeout(ctx, ms.timeout) + defer cancel() + defer func() { _ = s.Close() }() + + msg := new(pb.Message) + if err := ctxReadMsg(tctx, r, msg); err != nil { + _ = s.Reset() + return nil, err + } + + return msg, nil +} + +func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error { + errc := make(chan error, 1) + go func(r protoio.ReadCloser) { + defer close(errc) + err := r.ReadMsg(mes) + errc <- err + }(rc) + + select { + case err := <-errc: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// SendMessage sends a peer a message without waiting on a response +func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { + s, err := ms.h.NewStream(ctx, p, ms.protocols...) + if err != nil { + return err + } + defer func() { _ = s.Close() }() + + w := protoio.NewDelimitedWriter(s) + return w.WriteMsg(pmes) +} + +type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) +type HandleQueryFail func(p peer.ID, err error) + +func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { + jobs := make(chan peer.ID, 1) + results := make(chan *queryResult, 1) + + // Start worker goroutines + var wg sync.WaitGroup + wg.Add(c.parallelism) + for i := 0; i < c.parallelism; i++ { + go func() { + defer wg.Done() + for p := range jobs { + res := queryPeer(ctx, c.host, c.dhtRPC, p) + results <- res + } + }() + } + + defer wg.Wait() + defer close(jobs) + + toDial := make([]*peer.AddrInfo, 0, 1000) + peersSeen := make(map[peer.ID]struct{}) + + for _, ai := range startingPeers { + toDial = append(toDial, ai) + peersSeen[ai.ID] = struct{}{} + } + + numQueried := 0 + outstanding := 0 + + for len(toDial) > 0 || outstanding > 0 { + var jobCh chan peer.ID + var nextPeerID peer.ID + if len(toDial) > 0 { + jobCh = jobs + nextPeerID = toDial[0].ID + } + + select { + case res := <-results: + if len(res.data) > 0 { + logger.Debugf("peer %v had %d peers", res.peer, len(res.data)) + rtPeers := make([]*peer.AddrInfo, 0, len(res.data)) + for p, ai := range res.data { + c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour) + if _, ok := peersSeen[p]; !ok { + peersSeen[p] = struct{}{} + toDial = append(toDial, ai) + } + rtPeers = append(rtPeers, ai) + } + if handleSuccess != nil { + handleSuccess(res.peer, rtPeers) + } + } else if handleFail != nil { + handleFail(res.peer, res.err) + } + outstanding-- + case jobCh <- nextPeerID: + outstanding++ + numQueried++ + toDial = toDial[1:] + logger.Debugf("starting %d out of %d", numQueried, len(peersSeen)) + } + } +} + +type queryResult struct { + peer peer.ID + data map[peer.ID]*peer.AddrInfo + err error +} + +func queryPeer(ctx context.Context, host host.Host, dhtRPC *pb.ProtocolMessenger, nextPeer peer.ID) *queryResult { + tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, host.Peerstore(), time.Hour, nil) + if err != nil { + logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) + return &queryResult{nextPeer, nil, err} + } + + connCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + err = host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) + if err != nil { + logger.Errorf("could not connect to peer %v: %v", nextPeer, err) + return &queryResult{nextPeer, nil, err} + } + + localPeers := make(map[peer.ID]*peer.AddrInfo) + var retErr error + for cpl := 0; cpl <= 15; cpl++ { + generatePeer, err := tmpRT.GenRandPeerID(uint(cpl)) + if err != nil { + panic(err) + } + peers, err := dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) + if err != nil { + logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err) + retErr = err + break + } + for _, ai := range peers { + if _, ok := localPeers[ai.ID]; !ok { + localPeers[ai.ID] = ai + } + } + } + return &queryResult{nextPeer, localPeers, retErr} +} diff --git a/crawler/options.go b/crawler/options.go new file mode 100644 index 000000000..92147f2bb --- /dev/null +++ b/crawler/options.go @@ -0,0 +1,51 @@ +package crawler + +import ( + "github.com/libp2p/go-libp2p-core/protocol" + "time" +) + +// Option DHT Crawler option type. +type Option func(*options) error + +type options struct { + protocols []protocol.ID + parallelism int + crawlInterval time.Duration + perMsgTimeout time.Duration +} + +// defaults are the default crawler options. This option will be automatically +// prepended to any options you pass to the crawler constructor. +var defaults = func(o *options) error { + o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"} + o.parallelism = 1000 + o.crawlInterval = time.Hour + o.perMsgTimeout = time.Second * 5 + + return nil +} + +// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes +func WithProtocols(protocols []protocol.ID) Option { + return func(o *options) error { + o.protocols = append([]protocol.ID{}, protocols...) + return nil + } +} + +// WithParallelism defines the number of queries that can be issued in parallel +func WithParallelism(parallelism int) Option { + return func(o *options) error { + o.parallelism = parallelism + return nil + } +} + +// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed +func WithMsgTimeout(timeout time.Duration) Option { + return func(o *options) error { + o.perMsgTimeout = timeout + return nil + } +} From 9e9f654b0d7a7c19b02bd7fd826b4f935dddf443 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 18 Jan 2021 12:35:02 -0800 Subject: [PATCH 2/4] Add option for peer connection timeout --- crawler/crawler.go | 32 ++++++++++++++++++++------------ crawler/options.go | 21 ++++++++++++++++----- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index c882f2827..63b7a4d43 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -19,12 +19,15 @@ import ( var logger = logging.Logger("dht-crawler") +// Crawler connects to hosts in the DHT to track routing tables of peers. type Crawler struct { - parallelism int - host host.Host - dhtRPC *pb.ProtocolMessenger + parallelism int + connectTimeout time.Duration + host host.Host + dhtRPC *pb.ProtocolMessenger } +// New creates a new Crawler func New(host host.Host, opts ...Option) (*Crawler, error) { o := new(options) if err := defaults(o); err != nil { @@ -42,9 +45,10 @@ func New(host host.Host, opts ...Option) (*Crawler, error) { } return &Crawler{ - parallelism: o.parallelism, - host: host, - dhtRPC: pm, + parallelism: o.parallelism, + connectTimeout: o.connectTimeout, + host: host, + dhtRPC: pm, }, nil } @@ -109,9 +113,13 @@ func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Me return w.WriteMsg(pmes) } +// HandleQueryResult is a callback on successful peer query type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) + +// HandleQueryFail is a callback on failed peer query type HandleQueryFail func(p peer.ID, err error) +// Run crawls dht peers from an initial seed of `startingPeers` func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { jobs := make(chan peer.ID, 1) results := make(chan *queryResult, 1) @@ -123,7 +131,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl go func() { defer wg.Done() for p := range jobs { - res := queryPeer(ctx, c.host, c.dhtRPC, p) + res := c.queryPeer(ctx, p) results <- res } }() @@ -186,16 +194,16 @@ type queryResult struct { err error } -func queryPeer(ctx context.Context, host host.Host, dhtRPC *pb.ProtocolMessenger, nextPeer peer.ID) *queryResult { - tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, host.Peerstore(), time.Hour, nil) +func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult { + tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil) if err != nil { logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) return &queryResult{nextPeer, nil, err} } - connCtx, cancel := context.WithTimeout(ctx, time.Second*5) + connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout) defer cancel() - err = host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) + err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) if err != nil { logger.Errorf("could not connect to peer %v: %v", nextPeer, err) return &queryResult{nextPeer, nil, err} @@ -208,7 +216,7 @@ func queryPeer(ctx context.Context, host host.Host, dhtRPC *pb.ProtocolMessenger if err != nil { panic(err) } - peers, err := dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) + peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) if err != nil { logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err) retErr = err diff --git a/crawler/options.go b/crawler/options.go index 92147f2bb..ba90d3dc4 100644 --- a/crawler/options.go +++ b/crawler/options.go @@ -1,18 +1,20 @@ package crawler import ( - "github.com/libp2p/go-libp2p-core/protocol" "time" + + "github.com/libp2p/go-libp2p-core/protocol" ) // Option DHT Crawler option type. type Option func(*options) error type options struct { - protocols []protocol.ID - parallelism int - crawlInterval time.Duration - perMsgTimeout time.Duration + protocols []protocol.ID + parallelism int + crawlInterval time.Duration + connectTimeout time.Duration + perMsgTimeout time.Duration } // defaults are the default crawler options. This option will be automatically @@ -21,6 +23,7 @@ var defaults = func(o *options) error { o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"} o.parallelism = 1000 o.crawlInterval = time.Hour + o.connectTimeout = time.Second * 5 o.perMsgTimeout = time.Second * 5 return nil @@ -49,3 +52,11 @@ func WithMsgTimeout(timeout time.Duration) Option { return nil } } + +// WithConnectTimeout defines the time for peer connection before timing out +func WithConnectTimeout(timeout time.Duration) Option { + return func(o *options) error { + o.connectTimeout = timeout + return nil + } +} From 5c79e76ec0240f1e1d3825ac50c74648ef16d511 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 27 Jan 2021 12:30:42 -0800 Subject: [PATCH 3/4] trim unused option --- crawler/crawler.go | 2 +- crawler/options.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index 63b7a4d43..e86f9b9f7 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -140,7 +140,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl defer wg.Wait() defer close(jobs) - toDial := make([]*peer.AddrInfo, 0, 1000) + toDial := make([]*peer.AddrInfo, 0, len(startingPeers)) peersSeen := make(map[peer.ID]struct{}) for _, ai := range startingPeers { diff --git a/crawler/options.go b/crawler/options.go index ba90d3dc4..b33175009 100644 --- a/crawler/options.go +++ b/crawler/options.go @@ -12,7 +12,6 @@ type Option func(*options) error type options struct { protocols []protocol.ID parallelism int - crawlInterval time.Duration connectTimeout time.Duration perMsgTimeout time.Duration } @@ -22,7 +21,6 @@ type options struct { var defaults = func(o *options) error { o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"} o.parallelism = 1000 - o.crawlInterval = time.Hour o.connectTimeout = time.Second * 5 o.perMsgTimeout = time.Second * 5 From bdc66b8a9f88b256b99d6802616dffd4bf573b4a Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 22 Apr 2021 17:28:37 -0400 Subject: [PATCH 4/4] fixes: switch failed to connect to peer errors from logging as Error to Info. Stop returning partial peersets if a peer cannot give us their full routing table. Keep starting peer addresses alive in the peerstore. --- crawler/crawler.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index e86f9b9f7..5f4ec7732 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -146,6 +146,9 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl for _, ai := range startingPeers { toDial = append(toDial, ai) peersSeen[ai.ID] = struct{}{} + extendAddrs := c.host.Peerstore().Addrs(ai.ID) + extendAddrs = append(extendAddrs, ai.Addrs...) + c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, time.Hour) } numQueried := 0 @@ -205,7 +208,7 @@ func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult defer cancel() err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) if err != nil { - logger.Errorf("could not connect to peer %v: %v", nextPeer, err) + logger.Infof("could not connect to peer %v: %v", nextPeer, err) return &queryResult{nextPeer, nil, err} } @@ -228,5 +231,10 @@ func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult } } } + + if retErr != nil { + return &queryResult{nextPeer, nil, retErr} + } + return &queryResult{nextPeer, localPeers, retErr} }