diff --git a/dual/dual_test.go b/dual/dual_test.go index 323ff1169..e6d892a23 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -15,6 +15,8 @@ import ( record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + + dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing" ) var wancid, lancid cid.Cid @@ -263,7 +265,7 @@ func TestSearchValue(t *testing.T) { _ = wan.PutValue(ctx, "/v/hello", []byte("valid")) - valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) + valCh, err := d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0)) if err != nil { t.Fatal(err) } @@ -291,7 +293,7 @@ func TestSearchValue(t *testing.T) { t.Error(err) } - valCh, err = d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) + valCh, err = d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0)) if err != nil { t.Fatal(err) } diff --git a/lookup.go b/lookup.go index dff8bb244..badd8cb93 100644 --- a/lookup.go +++ b/lookup.go @@ -17,6 +17,11 @@ import ( // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { + return dht.GetClosestPeersSeeded(ctx, key, nil, true) +} + +// GetClosestPeersSeeded is the Kademlia 'node lookup' operation +func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID, useRTPeers bool) (<-chan peer.ID, error) { if key == "" { return nil, fmt.Errorf("can't lookup empty key") } @@ -45,6 +50,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee return peers, err }, func() bool { return false }, + seedPeers, useRTPeers, ) if err != nil { diff --git a/query.go b/query.go index 47d87df05..a1d12b9b9 100644 --- a/query.go +++ b/query.go @@ -76,9 +76,9 @@ type lookupWithFollowupResult struct { // // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. -func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) { // run the query - lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn) + lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers, useRTPeers) if err != nil { return nil, err } @@ -145,10 +145,23 @@ processFollowUp: return lookupRes, nil } -func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, manuallySeededPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) { // pick the K closest peers to the key in our Routing table. targetKadID := kb.ConvertKey(target) - seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) + + seedPeerSet := peer.NewSet() + for _, p := range manuallySeededPeers { + seedPeerSet.Add(p) + } + + if manuallySeededPeers == nil || useRTPeers { + RTSeedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) + for _, p := range RTSeedPeers { + seedPeerSet.Add(p) + } + } + + seedPeers := seedPeerSet.Peers() if len(seedPeers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, diff --git a/routing.go b/routing.go index d14e3845f..5fabc4b04 100644 --- a/routing.go +++ b/routing.go @@ -16,6 +16,7 @@ import ( u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p-kad-dht/qpeerset" + dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" "github.com/multiformats/go-multihash" @@ -25,24 +26,29 @@ import ( // Basic Put/Get -// PutValue adds value corresponding to given Key. -// This is the top level "Store" operation of the DHT -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { +// PutValueExtended adds value corresponding to given Key. +func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []byte, opts ...routing.Option) ([]peer.ID, error) { if !dht.enableValues { - return routing.ErrNotSupported + return nil, routing.ErrNotSupported } logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key)) + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return nil, err + } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + // don't even allow local users to put bad values. if err := dht.Validator.Validate(key, value); err != nil { - return err + return nil, err } old, err := dht.getLocal(key) if err != nil { // Means something is wrong with the datastore. - return err + return nil, err } // Check if we have an old value that's not the same as the new one. @@ -50,10 +56,10 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts // Check to see if the new one is better. i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()}) if err != nil { - return err + return nil, err } if i != 0 { - return fmt.Errorf("can't replace a newer value with an older value") + return nil, fmt.Errorf("can't replace a newer value with an older value") } } @@ -61,16 +67,18 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts rec.TimeReceived = u.FormatRFC3339(time.Now()) err = dht.putLocal(key, rec) if err != nil { - return err + return nil, err } - pchan, err := dht.GetClosestPeers(ctx, key) + pchan, err := dht.GetClosestPeersSeeded(ctx, key, seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers) if err != nil { - return err + return nil, err } + closestPeers := make([]peer.ID, 0, dht.bucketSize) wg := sync.WaitGroup{} for p := range pchan { + closestPeers = append(closestPeers, p) wg.Add(1) go func(p peer.ID) { ctx, cancel := context.WithCancel(ctx) @@ -89,28 +97,25 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts } wg.Wait() - return nil + return closestPeers, nil } -// RecvdVal stores a value and the peer from which we got the value. -type RecvdVal struct { - Val []byte - From peer.ID +// PutValue adds value corresponding to given Key. +// This is the top level "Store" operation of the DHT +func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { + _, err := dht.PutValueExtended(ctx, key, value, opts...) + return err } +// RecvdVal stores a value and the peer from which we got the value. +type RecvdVal dhtrouting.RecvdVal + // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { +func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { if !dht.enableValues { return nil, routing.ErrNotSupported } - // apply defaultQuorum if relevant - var cfg routing.Options - if err := cfg.Apply(opts...); err != nil { - return nil, err - } - opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum))) - responses, err := dht.SearchValue(ctx, key, opts...) if err != nil { return nil, err @@ -132,75 +137,66 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op return best, nil } -// SearchValue searches for the value corresponding to given Key and streams the results. -func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { +// SearchValueExtended searches for the value corresponding to given Key and streams the results. +func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, <-chan []peer.ID, error) { if !dht.enableValues { - return nil, routing.ErrNotSupported + return nil, nil, routing.ErrNotSupported } var cfg routing.Options if err := cfg.Apply(opts...); err != nil { - return nil, err + return nil, nil, err } - responsesNeeded := 0 - if !cfg.Offline { - responsesNeeded = getQuorum(&cfg, defaultQuorum) - } - - stopCh := make(chan struct{}) - valCh, lookupRes := dht.getValues(ctx, key, stopCh) - - out := make(chan []byte) - go func() { - defer close(out) - best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) - if best == nil || aborted { - return + processors := dhtrouting.GetProcessors(&cfg) + if processors == nil { + validation := &dhtrouting.ValidationFilter{ + Key: key, + Validator: dht.Validator, } - updatePeers := make([]peer.ID, 0, dht.bucketSize) - select { - case l := <-lookupRes: - if l == nil { - return - } - - for _, p := range l.peers { - if _, ok := peersWithBest[p]; !ok { - updatePeers = append(updatePeers, p) - } - } - case <-ctx.Done(): - return + quorum := &dhtrouting.CountStopper{ + MaxCount: dhtrouting.GetQuorum(&cfg), } - dht.updatePeerValues(dht.Context(), key, best, updatePeers) - }() - - return out, nil -} + bestValue := &dhtrouting.BestValueFilterRecorder{ + Key: key, + Validator: dht.Validator, + PeersWithBest: make(map[peer.ID]struct{}), + } -func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, - out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { - numResponses := 0 - return dht.processValues(ctx, key, valCh, - func(ctx context.Context, v RecvdVal, better bool) bool { - numResponses++ - if better { - select { - case out <- v.Val: - case <-ctx.Done(): - return false - } + processors = []dhtrouting.Processor{validation, quorum, bestValue} + } + + return dhtrouting.SearchValue(ctx, key, dht.getValues, processors, + func(ctx context.Context, best []byte, closestPeers []peer.ID) { + fixupRec := record.MakePutRecord(key, best) + for _, p := range closestPeers { + go func(p peer.ID) { + //TODO: Is this possible? + if p == dht.self { + err := dht.putLocal(key, fixupRec) + if err != nil { + logger.Error("Error correcting local dht entry:", err) + } + return + } + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + err := dht.protoMessenger.PutValue(ctx, p, fixupRec) + if err != nil { + logger.Debug("Error correcting DHT entry: ", err) + } + }(p) } + }, + &cfg) +} - if nvals > 0 && numResponses > nvals { - close(stopCh) - return true - } - return false - }) +// SearchValue searches for the value corresponding to given Key and streams the results. +func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + out, _, err := dht.SearchValueExtended(ctx, key, opts...) + return out, err } // GetValues gets nvals values corresponding to the given key. @@ -211,11 +207,14 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R queryCtx, cancel := context.WithCancel(ctx) defer cancel() - valCh, _ := dht.getValues(queryCtx, key, nil) + valCh, _ := dht.getValues(queryCtx, context.Background(), key, dhtrouting.SeedPeersOptions{ + SeedPeers: nil, + UseRTPeers: true, + }) out := make([]RecvdVal, 0, nvals) for val := range valCh { - out = append(out, val) + out = append(out, RecvdVal(val)) if len(out) == nvals { cancel() } @@ -224,80 +223,15 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R return out, ctx.Err() } -func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, - newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { -loop: - for { - if aborted { - return - } - - select { - case v, ok := <-vals: - if !ok { - break loop - } - - // Select best value - if best != nil { - if bytes.Equal(best, v.Val) { - peersWithBest[v.From] = struct{}{} - aborted = newVal(ctx, v, false) - continue - } - sel, err := dht.Validator.Select(key, [][]byte{best, v.Val}) - if err != nil { - logger.Warnw("failed to select best value", "key", internal.LoggableRecordKeyString(key), "error", err) - continue - } - if sel != 1 { - aborted = newVal(ctx, v, false) - continue - } - } - peersWithBest = make(map[peer.ID]struct{}) - peersWithBest[v.From] = struct{}{} - best = v.Val - aborted = newVal(ctx, v, true) - case <-ctx.Done(): - return - } - } - - return -} - -func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) { - fixupRec := record.MakePutRecord(key, val) - for _, p := range peers { - go func(p peer.ID) { - //TODO: Is this possible? - if p == dht.self { - err := dht.putLocal(key, fixupRec) - if err != nil { - logger.Error("Error correcting local dht entry:", err) - } - return - } - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - err := dht.protoMessenger.PutValue(ctx, p, fixupRec) - if err != nil { - logger.Debug("Error correcting DHT entry: ", err) - } - }(p) - } -} - -func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { - valCh := make(chan RecvdVal, 1) - lookupResCh := make(chan *lookupWithFollowupResult, 1) +func (dht *IpfsDHT) getValues(ctx, queryAbortedCtx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions) (<-chan dhtrouting.RecvdVal, <-chan []peer.ID) { + valCh := make(chan dhtrouting.RecvdVal, 1) + closestPeersCh := make(chan []peer.ID, 1) logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key)) if rec, err := dht.getLocal(key); rec != nil && err == nil { select { - case valCh <- RecvdVal{ + case valCh <- dhtrouting.RecvdVal{ Val: rec.GetValue(), From: dht.self, }: @@ -307,7 +241,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st go func() { defer close(valCh) - defer close(lookupResCh) + defer close(closestPeersCh) lookupRes, err := dht.runLookupWithFollowup(ctx, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command @@ -336,7 +270,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st // TODO: What should happen if the record is invalid? // Pre-existing code counted it towards the quorum, but should it? if rec != nil && rec.GetValue() != nil { - rv := RecvdVal{ + rv := dhtrouting.RecvdVal{ Val: rec.GetValue(), From: p, } @@ -359,25 +293,26 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st }, func() bool { select { - case <-stopQuery: + case <-queryAbortedCtx.Done(): return true default: return false } }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, ) if err != nil { return } - lookupResCh <- lookupRes + closestPeersCh <- lookupRes.peers - if ctx.Err() == nil { + if ctx.Err() == nil && seedPeerOpts.SeedPeers == nil { dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes) } }() - return valCh, lookupResCh + return valCh, closestPeersCh } func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { @@ -391,20 +326,26 @@ func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollow // Some DHTs store values directly, while an indirect store stores pointers to // locations of the value, similarly to Coral and Mainline DHT. -// Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { +// ProvideExtended makes this node announce that it can provide a value for the given key +func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst bool, opts ...routing.Option) ([]peer.ID, error) { if !dht.enableProviders { - return routing.ErrNotSupported + return nil, routing.ErrNotSupported } else if !key.Defined() { - return fmt.Errorf("invalid cid: undefined") + return nil, fmt.Errorf("invalid cid: undefined") } keyMH := key.Hash() logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return nil, err + } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + // add self locally dht.ProviderManager.AddProvider(ctx, keyMH, dht.self) if !brdcst { - return nil + return nil, nil } closerCtx := ctx @@ -414,7 +355,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err if timeout < 0 { // timed out - return context.DeadlineExceeded + return nil, context.DeadlineExceeded } else if timeout < 10*time.Second { // Reserve 10% for the final put. deadline = deadline.Add(-timeout / 10) @@ -429,23 +370,25 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err } var exceededDeadline bool - peers, err := dht.GetClosestPeers(closerCtx, string(keyMH)) + peers, err := dht.GetClosestPeersSeeded(closerCtx, string(keyMH), seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers) switch err { case context.DeadlineExceeded: // If the _inner_ deadline has been exceeded but the _outer_ // context is still fine, provide the value to the closest peers // we managed to find, even if they're not the _actual_ closest peers. if ctx.Err() != nil { - return ctx.Err() + return nil, ctx.Err() } exceededDeadline = true case nil: default: - return err + return nil, err } + closestPeers := make([]peer.ID, 0, dht.bucketSize) wg := sync.WaitGroup{} for p := range peers { + closestPeers = append(closestPeers, p) wg.Add(1) go func(p peer.ID) { defer wg.Done() @@ -458,9 +401,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err } wg.Wait() if exceededDeadline { - return context.DeadlineExceeded + return nil, context.DeadlineExceeded } - return ctx.Err() + return closestPeers, ctx.Err() +} + +// Provide makes this node announce that it can provide a value for the given key +func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) error { + _, err := dht.ProvideExtended(ctx, key, brdcst) + return err } // FindProviders searches until the context expires. @@ -478,127 +427,154 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn return providers, nil } -// FindProvidersAsync is the same thing as FindProviders, but returns a channel. -// Peers will be returned on the channel as soon as they are found, even before -// the search query completes. If count is zero then the query will run until it -// completes. Note: not reading from the returned channel may block the query -// from progressing. -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +// FindProvidersAsyncExtended searches until the context expires. +func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, opts ...routing.Option) (<-chan peer.AddrInfo, <-chan []peer.ID, error) { if !dht.enableProviders || !key.Defined() { - peerOut := make(chan peer.AddrInfo) + peerOut, closestPeers := make(chan peer.AddrInfo), make(chan []peer.ID) close(peerOut) - return peerOut + close(closestPeers) + return peerOut, closestPeers, routing.ErrNotSupported } - chSize := count - if count == 0 { - chSize = 1 + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return nil, nil, err } - peerOut := make(chan peer.AddrInfo, chSize) keyMH := key.Hash() - logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut) - return peerOut -} + processors := dhtrouting.GetProcessors(&cfg) + if processors == nil { + newValuesOnly := &dhtrouting.NewPeerIDFilter{ + Key: string(keyMH), + Peers: peer.NewSet(), + } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { - defer close(peerOut) + quorum := &dhtrouting.CountStopper{ + MaxCount: dhtrouting.GetQuorum(&cfg), + } - findAll := count == 0 - var ps *peer.Set - if findAll { - ps = peer.NewSet() - } else { - ps = peer.NewLimitedSet(count) + processors = []dhtrouting.Processor{newValuesOnly, quorum} } - provs := dht.ProviderManager.GetProviders(ctx, key) - for _, p := range provs { - // NOTE: Assuming that this list of peers is unique - if ps.TryAdd(p) { + return dhtrouting.FindProviders(ctx, keyMH, dht.findProvidersAsyncRoutine, processors, &cfg) +} + +// FindProvidersAsync is the same thing as FindProviders, but returns a channel. +// Peers will be returned on the channel as soon as they are found, even before +// the search query completes. If count is zero then the query will run until it +// completes. Note: not reading from the returned channel may block the query +// from progressing. +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + providers, _, _ := dht.FindProvidersAsyncExtended(ctx, key, dhtrouting.Quorum(count)) + return providers +} + +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx, queryAbortedCtx context.Context, key multihash.Multihash, seedPeerOpts dhtrouting.SeedPeersOptions) (<-chan peer.AddrInfo, <-chan []peer.ID) { + logger.Debugw("finding providers", "key", key) + + provsCh := make(chan peer.AddrInfo, 1) + closestPeersCh := make(chan []peer.ID, 1) + + go func() { + defer close(provsCh) + defer close(closestPeersCh) + + provs := dht.ProviderManager.GetProviders(ctx, key) + for _, p := range provs { pi := dht.peerstore.PeerInfo(p) select { - case peerOut <- pi: + case provsCh <- pi: case <-ctx.Done(): return } } - // If we have enough peers locally, don't bother with remote RPC - // TODO: is this a DOS vector? - if !findAll && ps.Size() >= count { - return - } - } - - lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), - func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { - // For DHT query command - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.SendingQuery, - ID: p, - }) + lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), + func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { + // For DHT query command + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.SendingQuery, + ID: p, + }) - provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) - if err != nil { - return nil, err - } + provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) + if err != nil { + return nil, err + } - logger.Debugf("%d provider entries", len(provs)) + logger.Debugf("%d provider entries", len(provs)) - // Add unique providers from request, up to 'count' - for _, prov := range provs { - dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) - logger.Debugf("got provider: %s", prov) - if ps.TryAdd(prov.ID) { - logger.Debugf("using provider: %s", prov) + // Add unique providers from request, up to 'count' + for _, prov := range provs { + dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) + logger.Debugf("got provider: %s", prov) select { - case peerOut <- *prov: + case provsCh <- *prov: case <-ctx.Done(): logger.Debug("context timed out sending more providers") return nil, ctx.Err() } } - if !findAll && ps.Size() >= count { - logger.Debugf("got enough providers (%d/%d)", ps.Size(), count) - return nil, nil - } - } - // Give closer peers back to the query to be queried - logger.Debugf("got closer peers: %d %s", len(closest), closest) + // Give closer peers back to the query to be queried + logger.Debugf("got closer peers: %d %s", len(closest), closest) - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.PeerResponse, - ID: p, - Responses: closest, - }) + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.PeerResponse, + ID: p, + Responses: closest, + }) - return closest, nil - }, - func() bool { - return !findAll && ps.Size() >= count - }, - ) + return closest, nil + }, + func() bool { + select { + case <-queryAbortedCtx.Done(): + return true + default: + return false + } + }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, + ) - if err == nil && ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) - } + if err != nil { + return + } + closestPeersCh <- lookupRes.peers + + if ctx.Err() == nil && seedPeerOpts.SeedPeers == nil { + dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) + } + }() + + return provsCh, closestPeersCh } // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + pid, _, err := dht.FindPeerExtended(ctx, id) + return pid, err +} + +// FindPeerExtended searches for a peer with given ID. +func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...routing.Option) (peer.AddrInfo, []peer.ID, error) { if err := id.Validate(); err != nil { - return peer.AddrInfo{}, err + return peer.AddrInfo{}, nil, err } logger.Debugw("finding peer", "peer", id) + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return peer.AddrInfo{}, nil, err + } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + // Check if were already connected to them if pi := dht.FindLocal(id); pi.ID != "" { - return pi, nil + return pi, nil, nil } lookupRes, err := dht.runLookupWithFollowup(ctx, string(id), @@ -627,10 +603,11 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, func() bool { return dht.host.Network().Connectedness(id) == network.Connected }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, ) if err != nil { - return peer.AddrInfo{}, err + return peer.AddrInfo{}, nil, err } dialedPeerDuringQuery := false @@ -648,8 +625,8 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, // to the peer. connectedness := dht.host.Network().Connectedness(id) if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect { - return dht.peerstore.PeerInfo(id), nil + return dht.peerstore.PeerInfo(id), lookupRes.peers, nil } - return peer.AddrInfo{}, routing.ErrNotFound + return peer.AddrInfo{}, lookupRes.peers, routing.ErrNotFound } diff --git a/routing/options.go b/routing/options.go new file mode 100644 index 000000000..b009e0122 --- /dev/null +++ b/routing/options.go @@ -0,0 +1,106 @@ +package routing + +import ( + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" +) + +type quorumOptionKey struct{} + +const defaultQuorum = 0 + +// Quorum is a DHT option that tells the DHT how many peers it needs to get +// values from before returning the best one. Zero means the DHT query +// should complete instead of returning early. +// +// Default: 0 +func Quorum(n int) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[quorumOptionKey{}] = n + return nil + } +} + +func GetQuorum(opts *routing.Options) int { + responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int) + if !ok { + responsesNeeded = defaultQuorum + } + return responsesNeeded +} + +type seedPeersOptionKey struct{} +type SeedPeersOptions struct { + SeedPeers []peer.ID + UseRTPeers bool +} + +// SeedPeers is a DHT option that tells the DHT which peers it should use to seed a DHT query. +// +// Default: Use BucketSize closest peers to the target that are in the routing table +func SeedPeers(seedPeers []peer.ID, useRoutingTablePeers bool) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[seedPeersOptionKey{}] = SeedPeersOptions{SeedPeers: seedPeers, UseRTPeers: useRoutingTablePeers} + return nil + } +} + +func GetSeedPeers(opts *routing.Options) SeedPeersOptions { + seedPeersOpts, ok := opts.Other[seedPeersOptionKey{}].(SeedPeersOptions) + if !ok { + seedPeersOpts = SeedPeersOptions{UseRTPeers: true} + } + return seedPeersOpts +} + +type updateDuringGetOptionKey struct{} + +// UpdateDuringGet is a DHT option that tells the DHT if it should update peers with +// old data while doing a Get +// +// Default: true for Get/SearchValue, and false otherwise +func UpdateDuringGet(updateDuringGet bool) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[updateDuringGetOptionKey{}] = updateDuringGet + return nil + } +} + +func getUpdateDuringGet(opts *routing.Options, defaulValue bool) bool { + updateDuringGet, ok := opts.Other[updateDuringGetOptionKey{}].(bool) + if !ok { + updateDuringGet = defaulValue + } + return updateDuringGet +} + +type processorsOptionKey struct{} + +// WithProcessors is a DHT option that tells the DHT which processors it should use +// (and the order to apply them) on lookup results. +func WithProcessors(processors ...Processor) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[processorsOptionKey{}] = processors + return nil + } +} + +func GetProcessors(opts *routing.Options) []Processor { + processors, ok := opts.Other[processorsOptionKey{}].([]Processor) + if !ok { + processors = nil + } + return processors +} diff --git a/routing/pipeline.go b/routing/pipeline.go new file mode 100644 index 000000000..39d800c8a --- /dev/null +++ b/routing/pipeline.go @@ -0,0 +1,93 @@ +package routing + +import ( + "context" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/peer" +) + +var ( + logger = logging.Logger("dht.routing") +) + +// RecvdVal stores a value and the peer from which we got the value. +type RecvdVal struct { + Val []byte + From peer.ID +} + +func RunRecordPipeline(ctx context.Context, processors []Processor, abortQuery func(), out chan<- []byte, in <-chan RecvdVal) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + processInput: + for { + select { + case i, more := <-in: + if !more { + return + } + var state interface{} = i + var err error + for _, t := range processors { + state, err = t.Process(state, abortQuery) + if err != nil { + continue processInput + + } + } + + finalState := state.(RecvdVal) + select { + case out <- finalState.Val: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + }() + return errCh +} + +func RunProvidersPipeline(ctx context.Context, processors []Processor, abortQuery func(), out chan<- peer.AddrInfo, in <-chan peer.AddrInfo) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + processInput: + for { + select { + case i, more := <-in: + if !more { + return + } + var state interface{} = i + var err error + for _, t := range processors { + state, err = t.Process(state, abortQuery) + if err != nil { + continue processInput + + } + } + + finalState := state.(peer.AddrInfo) + select { + case out <- finalState: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + }() + return errCh +} diff --git a/routing/pipes.go b/routing/pipes.go new file mode 100644 index 000000000..7377eca47 --- /dev/null +++ b/routing/pipes.go @@ -0,0 +1,97 @@ +package routing + +import ( + "bytes" + "errors" + "github.com/libp2p/go-libp2p-core/peer" + record "github.com/libp2p/go-libp2p-record" +) + +type Processor interface { + Process(interface{}, func()) (interface{}, error) +} + +var skipErr = errors.New("skip value") + +type CountStopper struct { + Count int + MaxCount int +} + +func (f *CountStopper) Process(val interface{}, abortQuery func()) (interface{}, error) { + f.Count++ + if f.MaxCount > 0 && f.Count >= f.MaxCount { + abortQuery() + } + return val, nil +} + +type ValidationFilter struct { + Key string + Validator record.Validator +} + +func (f *ValidationFilter) Process(val interface{}, _ func()) (interface{}, error) { + v := val.(RecvdVal) + err := f.Validator.Validate(f.Key, v.Val) + if err != nil { + return nil, err + } + return v, nil +} + +type BestValueFilterRecorder struct { + Key string + Best []byte + Validator record.Validator + PeersWithBest map[peer.ID]struct{} +} + +func (p *BestValueFilterRecorder) Process(val interface{}, _ func()) (interface{}, error) { + v := val.(RecvdVal) + + // Select best value + if p.Best != nil { + if bytes.Equal(p.Best, v.Val) { + p.PeersWithBest[v.From] = struct{}{} + return nil, skipErr + } + newIsBetter, err := p.getBetterRecord(p.Key, p.Best, v.Val) + if err != nil { + logger.Warnw("failed to select best value", "key", p.Key, "error", err) + return nil, skipErr + } + if !newIsBetter { + return nil, skipErr + } + } + p.PeersWithBest = make(map[peer.ID]struct{}) + p.PeersWithBest[v.From] = struct{}{} + p.Best = v.Val + return v, nil +} + +func (p *BestValueFilterRecorder) getBetterRecord(key string, current, new []byte) (bool, error) { + sel, err := p.Validator.Select(key, [][]byte{current, new}) + if err != nil { + return false, err + } + return sel == 1, nil +} + +type NewPeerIDFilter struct { + Key string + Peers *peer.Set +} + +func (p *NewPeerIDFilter) Process(val interface{}, _ func()) (interface{}, error) { + prov := val.(peer.AddrInfo) + + logger.Debugf("got provider: %s", prov) + if p.Peers.TryAdd(prov.ID) { + logger.Debugf("using provider: %s", prov) + return prov, nil + } + + return nil, skipErr +} diff --git a/routing/wrapper.go b/routing/wrapper.go new file mode 100644 index 000000000..62e999ea2 --- /dev/null +++ b/routing/wrapper.go @@ -0,0 +1,91 @@ +package routing + +import ( + "context" + "github.com/multiformats/go-multihash" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" +) + +type getValuesFn func(ctx, queryAbortedCtx context.Context, key string, seedPeerOpts SeedPeersOptions) (<-chan RecvdVal, <-chan []peer.ID) +type updatePeerValuesFn func(ctx context.Context, best []byte, closestPeers []peer.ID) + +// SearchValue searches for the value corresponding to given Key and streams the results. +func SearchValue(ctx context.Context, key string, getVals getValuesFn, processors []Processor, updatePeerValues updatePeerValuesFn, cfg *routing.Options) (<-chan []byte, <-chan []peer.ID, error) { + seedPeerOpts := GetSeedPeers(cfg) + updateDuringGet := getUpdateDuringGet(cfg, true) + queryAbortCtx, abortQuery := context.WithCancel(context.Background()) + valCh, closestPeersCh := getVals(ctx, queryAbortCtx, key, seedPeerOpts) + + out := make(chan []byte) + returnClosestPeersCh := make(chan []peer.ID, 1) + + var closestPeers []peer.ID + + go func() { + defer close(out) + defer close(returnClosestPeersCh) + defer abortQuery() + errCh := RunRecordPipeline(ctx, processors, abortQuery, out, valCh) + if err := <-errCh; err != nil { + return + } + + closestPeers = <-closestPeersCh + if updateDuringGet && updatePeerValues != nil { + for _, f := range processors { + if bv, ok := f.(*BestValueFilterRecorder); ok { + if bv.Best == nil { + break + } + + updatePeers := make([]peer.ID, 0, len(closestPeers)) + for _, p := range closestPeers { + if _, ok := bv.PeersWithBest[p]; !ok { + updatePeers = append(updatePeers, p) + } + } + updatePeerValues(ctx, bv.Best, updatePeers) + break + } + } + } + returnClosestPeersCh <- closestPeers + }() + + return out, returnClosestPeersCh, nil +} + +type findProvsFn func(ctx, queryAbortedCtx context.Context, key multihash.Multihash, seedPeerOpts SeedPeersOptions) (<-chan peer.AddrInfo, <-chan []peer.ID) + +// FindProviders searches for the providers corresponding to given Key and streams the results. +func FindProviders(ctx context.Context, key multihash.Multihash, findProvsFn findProvsFn, processors []Processor, cfg *routing.Options) (<-chan peer.AddrInfo, <-chan []peer.ID, error) { + seedPeerOpts := GetSeedPeers(cfg) + maxRequestedRecords := GetQuorum(cfg) + + queryAbortCtx, abortQuery := context.WithCancel(context.Background()) + valCh, closestPeersCh := findProvsFn(ctx, queryAbortCtx, key, seedPeerOpts) + + outChSize := maxRequestedRecords + if outChSize == 0 { + outChSize = 1 + } + out := make(chan peer.AddrInfo, outChSize) + returnClosestPeersCh := make(chan []peer.ID, 1) + + go func() { + defer close(out) + defer close(returnClosestPeersCh) + defer abortQuery() + errCh := RunProvidersPipeline(ctx, processors, abortQuery, out, valCh) + if err := <-errCh; err != nil { + return + } + + closestPeers := <-closestPeersCh + returnClosestPeersCh <- closestPeers + }() + + return out, returnClosestPeersCh, nil +} diff --git a/routing_options.go b/routing_options.go index a1e5935b9..0a4a911d8 100644 --- a/routing_options.go +++ b/routing_options.go @@ -1,30 +1,17 @@ package dht -import "github.com/libp2p/go-libp2p-core/routing" - -type quorumOptionKey struct{} - -const defaultQuorum = 0 +import ( + "github.com/libp2p/go-libp2p-core/routing" + dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing" +) // Quorum is a DHT option that tells the DHT how many peers it needs to get // values from before returning the best one. Zero means the DHT query // should complete instead of returning early. // // Default: 0 +// +// Deprecated: use github.com/libp2p/go-libp2p-kad-dht/routing.Quorum func Quorum(n int) routing.Option { - return func(opts *routing.Options) error { - if opts.Other == nil { - opts.Other = make(map[interface{}]interface{}, 1) - } - opts.Other[quorumOptionKey{}] = n - return nil - } -} - -func getQuorum(opts *routing.Options, ndefault int) int { - responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int) - if !ok { - responsesNeeded = ndefault - } - return responsesNeeded + return dhtrouting.Quorum(n) }