From 051e0eecc3bc1b31157c6fcb9b05db881e572cb2 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 02:22:57 -0500 Subject: [PATCH 1/7] refactor: remove extraneous quorum check in dht.GetValue --- routing.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/routing.go b/routing.go index d14e3845f..de9a7de3c 100644 --- a/routing.go +++ b/routing.go @@ -104,13 +104,6 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op return nil, routing.ErrNotSupported } - // apply defaultQuorum if relevant - var cfg routing.Options - if err := cfg.Apply(opts...); err != nil { - return nil, err - } - opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum))) - responses, err := dht.SearchValue(ctx, key, opts...) if err != nil { return nil, err From 44375c33111eabbe261890075b10f5cca0fdb8ec Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 02:25:33 -0500 Subject: [PATCH 2/7] refactor: move quorum default value to be defined with quorum option --- routing.go | 2 +- routing_options.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/routing.go b/routing.go index de9a7de3c..85b1022bd 100644 --- a/routing.go +++ b/routing.go @@ -138,7 +138,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing responsesNeeded := 0 if !cfg.Offline { - responsesNeeded = getQuorum(&cfg, defaultQuorum) + responsesNeeded = getQuorum(&cfg) } stopCh := make(chan struct{}) diff --git a/routing_options.go b/routing_options.go index a1e5935b9..48df3746b 100644 --- a/routing_options.go +++ b/routing_options.go @@ -21,10 +21,10 @@ func Quorum(n int) routing.Option { } } -func getQuorum(opts *routing.Options, ndefault int) int { +func getQuorum(opts *routing.Options) int { responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int) if !ok { - responsesNeeded = ndefault + responsesNeeded = defaultQuorum } return responsesNeeded } From 757500b1f6d7af92d24e50efeeec046517b9b0a0 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 02:29:35 -0500 Subject: [PATCH 3/7] refactor: move quorum option to into a separate routing package --- routing.go | 3 ++- routing/options.go | 32 ++++++++++++++++++++++++++++++++ routing_options.go | 27 +++++++-------------------- 3 files changed, 41 insertions(+), 21 deletions(-) create mode 100644 routing/options.go diff --git a/routing.go b/routing.go index 85b1022bd..6539266dc 100644 --- a/routing.go +++ b/routing.go @@ -16,6 +16,7 @@ import ( u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p-kad-dht/qpeerset" + dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" "github.com/multiformats/go-multihash" @@ -138,7 +139,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing responsesNeeded := 0 if !cfg.Offline { - responsesNeeded = getQuorum(&cfg) + responsesNeeded = dhtrouting.GetQuorum(&cfg) } stopCh := make(chan struct{}) diff --git a/routing/options.go b/routing/options.go new file mode 100644 index 000000000..146773022 --- /dev/null +++ b/routing/options.go @@ -0,0 +1,32 @@ +package routing + +import ( + "github.com/libp2p/go-libp2p-core/routing" +) + +type quorumOptionKey struct{} + +const defaultQuorum = 0 + +// Quorum is a DHT option that tells the DHT how many peers it needs to get +// values from before returning the best one. Zero means the DHT query +// should complete instead of returning early. +// +// Default: 0 +func Quorum(n int) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[quorumOptionKey{}] = n + return nil + } +} + +func GetQuorum(opts *routing.Options) int { + responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int) + if !ok { + responsesNeeded = defaultQuorum + } + return responsesNeeded +} diff --git a/routing_options.go b/routing_options.go index 48df3746b..0a4a911d8 100644 --- a/routing_options.go +++ b/routing_options.go @@ -1,30 +1,17 @@ package dht -import "github.com/libp2p/go-libp2p-core/routing" - -type quorumOptionKey struct{} - -const defaultQuorum = 0 +import ( + "github.com/libp2p/go-libp2p-core/routing" + dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing" +) // Quorum is a DHT option that tells the DHT how many peers it needs to get // values from before returning the best one. Zero means the DHT query // should complete instead of returning early. // // Default: 0 +// +// Deprecated: use github.com/libp2p/go-libp2p-kad-dht/routing.Quorum func Quorum(n int) routing.Option { - return func(opts *routing.Options) error { - if opts.Other == nil { - opts.Other = make(map[interface{}]interface{}, 1) - } - opts.Other[quorumOptionKey{}] = n - return nil - } -} - -func getQuorum(opts *routing.Options) int { - responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int) - if !ok { - responsesNeeded = defaultQuorum - } - return responsesNeeded + return dhtrouting.Quorum(n) } From 1cdfa390a772bb2dcc8b2782fbd2810a19ed1762 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 04:12:10 -0500 Subject: [PATCH 4/7] refactor: switch to non-deprecated version of Quorum option in dual dht test --- dual/dual_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dual/dual_test.go b/dual/dual_test.go index 323ff1169..e6d892a23 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -15,6 +15,8 @@ import ( record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + + dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing" ) var wancid, lancid cid.Cid @@ -263,7 +265,7 @@ func TestSearchValue(t *testing.T) { _ = wan.PutValue(ctx, "/v/hello", []byte("valid")) - valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) + valCh, err := d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0)) if err != nil { t.Fatal(err) } @@ -291,7 +293,7 @@ func TestSearchValue(t *testing.T) { t.Error(err) } - valCh, err = d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) + valCh, err = d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0)) if err != nil { t.Fatal(err) } From 6a546cf192358ff01a8152f1e6d0c5fef0f8dd56 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 03:47:29 -0500 Subject: [PATCH 5/7] feat: add extended routing functions that take functional options and return the closest peers used in the query --- routing.go | 167 ++++++++++++++++++++++++++++++--------------- routing/options.go | 28 ++++++++ 2 files changed, 139 insertions(+), 56 deletions(-) diff --git a/routing.go b/routing.go index 6539266dc..92d1d0e7c 100644 --- a/routing.go +++ b/routing.go @@ -26,24 +26,23 @@ import ( // Basic Put/Get -// PutValue adds value corresponding to given Key. -// This is the top level "Store" operation of the DHT -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { +// PutValueExtended adds value corresponding to given Key. +func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []byte, opts ...routing.Option) ([]peer.ID, error) { if !dht.enableValues { - return routing.ErrNotSupported + return nil, routing.ErrNotSupported } logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key)) // don't even allow local users to put bad values. if err := dht.Validator.Validate(key, value); err != nil { - return err + return nil, err } old, err := dht.getLocal(key) if err != nil { // Means something is wrong with the datastore. - return err + return nil, err } // Check if we have an old value that's not the same as the new one. @@ -51,10 +50,10 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts // Check to see if the new one is better. i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()}) if err != nil { - return err + return nil, err } if i != 0 { - return fmt.Errorf("can't replace a newer value with an older value") + return nil, fmt.Errorf("can't replace a newer value with an older value") } } @@ -62,16 +61,18 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts rec.TimeReceived = u.FormatRFC3339(time.Now()) err = dht.putLocal(key, rec) if err != nil { - return err + return nil, err } pchan, err := dht.GetClosestPeers(ctx, key) if err != nil { - return err + return nil, err } + closestPeers := make([]peer.ID, 0, dht.bucketSize) wg := sync.WaitGroup{} for p := range pchan { + closestPeers = append(closestPeers, p) wg.Add(1) go func(p peer.ID) { ctx, cancel := context.WithCancel(ctx) @@ -90,7 +91,15 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts } wg.Wait() - return nil + return closestPeers, nil +} + +// PutValue adds value corresponding to given Key. +// This is the top level "Store" operation of the DHT + +func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { + _, err = dht.PutValueExtended(ctx, key, value, opts...) + return err } // RecvdVal stores a value and the peer from which we got the value. @@ -126,15 +135,15 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op return best, nil } -// SearchValue searches for the value corresponding to given Key and streams the results. -func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { +// SearchValueExtended searches for the value corresponding to given Key and streams the results. +func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, <-chan []peer.ID, error) { if !dht.enableValues { - return nil, routing.ErrNotSupported + return nil, nil, routing.ErrNotSupported } var cfg routing.Options if err := cfg.Apply(opts...); err != nil { - return nil, err + return nil, nil, err } responsesNeeded := 0 @@ -146,33 +155,45 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) + peers := make(chan []peer.ID, 1) go func() { defer close(out) + defer close(peers) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) + + var l *lookupWithFollowupResult + select { + case l = <-lookupRes: + case <-ctx.Done(): + return + } + + if l == nil { + return + } + if best == nil || aborted { + peers <- l.peers return } updatePeers := make([]peer.ID, 0, dht.bucketSize) - select { - case l := <-lookupRes: - if l == nil { - return - } - - for _, p := range l.peers { - if _, ok := peersWithBest[p]; !ok { - updatePeers = append(updatePeers, p) - } + for _, p := range l.peers { + if _, ok := peersWithBest[p]; !ok { + updatePeers = append(updatePeers, p) } - case <-ctx.Done(): - return } dht.updatePeerValues(dht.Context(), key, best, updatePeers) }() - return out, nil + return out, peers, nil +} + +// SearchValue searches for the value corresponding to given Key and streams the results. +func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + out, _, err := dht.SearchValueExtended(ctx, key, opts...) + return out, err } func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, @@ -385,12 +406,12 @@ func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollow // Some DHTs store values directly, while an indirect store stores pointers to // locations of the value, similarly to Coral and Mainline DHT. -// Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { +// ProvideExtended makes this node announce that it can provide a value for the given key +func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst bool, opts ...routing.Option) ([]peer.ID, error) { if !dht.enableProviders { - return routing.ErrNotSupported + return nil, routing.ErrNotSupported } else if !key.Defined() { - return fmt.Errorf("invalid cid: undefined") + return nil, fmt.Errorf("invalid cid: undefined") } keyMH := key.Hash() logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) @@ -398,7 +419,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err // add self locally dht.ProviderManager.AddProvider(ctx, keyMH, dht.self) if !brdcst { - return nil + return nil, nil } closerCtx := ctx @@ -408,7 +429,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err if timeout < 0 { // timed out - return context.DeadlineExceeded + return nil, context.DeadlineExceeded } else if timeout < 10*time.Second { // Reserve 10% for the final put. deadline = deadline.Add(-timeout / 10) @@ -430,16 +451,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err // context is still fine, provide the value to the closest peers // we managed to find, even if they're not the _actual_ closest peers. if ctx.Err() != nil { - return ctx.Err() + return nil, ctx.Err() } exceededDeadline = true case nil: default: - return err + return nil, err } + closestPeers := make([]peer.ID, 0, dht.bucketSize) wg := sync.WaitGroup{} for p := range peers { + closestPeers = append(closestPeers, p) wg.Add(1) go func(p peer.ID) { defer wg.Done() @@ -452,9 +475,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err } wg.Wait() if exceededDeadline { - return context.DeadlineExceeded + return nil, context.DeadlineExceeded } - return ctx.Err() + return closestPeers, ctx.Err() +} + +// Provide makes this node announce that it can provide a value for the given key +func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { + _, err = dht.ProvideExtended(ctx, key, brdcst) + return err } // FindProviders searches until the context expires. @@ -472,33 +501,49 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn return providers, nil } -// FindProvidersAsync is the same thing as FindProviders, but returns a channel. -// Peers will be returned on the channel as soon as they are found, even before -// the search query completes. If count is zero then the query will run until it -// completes. Note: not reading from the returned channel may block the query -// from progressing. -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +// FindProvidersAsyncExtended searches until the context expires. +func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, opts ...routing.Option) (<-chan peer.AddrInfo, <-chan []peer.ID, error) { if !dht.enableProviders || !key.Defined() { - peerOut := make(chan peer.AddrInfo) + peerOut, closestPeers := make(chan peer.AddrInfo), make(chan []peer.ID) close(peerOut) - return peerOut + close(closestPeers) + return peerOut, closestPeers, routing.ErrNotSupported + } + + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return nil, nil, err } + count := dhtrouting.GetQuorum(&cfg) + chSize := count if count == 0 { chSize = 1 } peerOut := make(chan peer.AddrInfo, chSize) + closestPeersOut := make(chan []peer.ID, 1) keyMH := key.Hash() logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut) - return peerOut + go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut, closestPeersOut) + return peerOut, closestPeersOut, nil +} + +// FindProvidersAsync is the same thing as FindProviders, but returns a channel. +// Peers will be returned on the channel as soon as they are found, even before +// the search query completes. If count is zero then the query will run until it +// completes. Note: not reading from the returned channel may block the query +// from progressing. +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + providers, _, _ := dht.FindProvidersAsyncExtended(ctx, key, dhtrouting.Quorum(count)) + return providers } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) { defer close(peerOut) + defer close(closestPeersOut) findAll := count == 0 var ps *peer.Set @@ -577,22 +622,26 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash }, ) + if lookupRes != nil { + closestPeersOut <- lookupRes.peers + } + if err == nil && ctx.Err() == nil { dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) } } -// FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { +// FindPeerExtended searches for a peer with given ID. +func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...routing.Option) (peer.AddrInfo, []peer.ID, error) { if err := id.Validate(); err != nil { - return peer.AddrInfo{}, err + return peer.AddrInfo{}, nil, err } logger.Debugw("finding peer", "peer", id) // Check if were already connected to them if pi := dht.FindLocal(id); pi.ID != "" { - return pi, nil + return pi, nil, nil } lookupRes, err := dht.runLookupWithFollowup(ctx, string(id), @@ -624,7 +673,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, ) if err != nil { - return peer.AddrInfo{}, err + return peer.AddrInfo{}, nil, err } dialedPeerDuringQuery := false @@ -642,8 +691,14 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, // to the peer. connectedness := dht.host.Network().Connectedness(id) if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect { - return dht.peerstore.PeerInfo(id), nil + return dht.peerstore.PeerInfo(id), lookupRes.peers, nil } - return peer.AddrInfo{}, routing.ErrNotFound + return peer.AddrInfo{}, lookupRes.peers, routing.ErrNotFound +} + +// FindPeer searches for a peer with given ID. +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + pid, _, err := dht.FindPeerExtended(ctx, id) + return pid, err } diff --git a/routing/options.go b/routing/options.go index 146773022..7f3d1615d 100644 --- a/routing/options.go +++ b/routing/options.go @@ -1,6 +1,7 @@ package routing import ( + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" ) @@ -30,3 +31,30 @@ func GetQuorum(opts *routing.Options) int { } return responsesNeeded } + +type seedPeersOptionKey struct{} +type SeedPeersOptions struct { + SeedPeers []peer.ID + UseRTPeers bool +} + +// SeedPeers is a DHT option that tells the DHT which peers it should use to seed a DHT query. +// +// Default: Use BucketSize closest peers to the target that are in the routing table +func SeedPeers(seedPeers []peer.ID, useRoutingTablePeers bool) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[seedPeersOptionKey{}] = SeedPeersOptions{SeedPeers: seedPeers, UseRTPeers: useRoutingTablePeers} + return nil + } +} + +func GetSeedPeers(opts *routing.Options) SeedPeersOptions { + seedPeersOpts, ok := opts.Other[seedPeersOptionKey{}].(SeedPeersOptions) + if !ok { + seedPeersOpts = SeedPeersOptions{UseRTPeers: true} + } + return seedPeersOpts +} From 9b4bfb91c180c6170121624b0cab52705e426ca5 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 04:01:57 -0500 Subject: [PATCH 6/7] feat: add support for controlling the seed peers used in queries --- lookup.go | 6 ++++++ query.go | 21 +++++++++++++++++---- routing.go | 38 +++++++++++++++++++++++++++++++------- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/lookup.go b/lookup.go index dff8bb244..badd8cb93 100644 --- a/lookup.go +++ b/lookup.go @@ -17,6 +17,11 @@ import ( // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { + return dht.GetClosestPeersSeeded(ctx, key, nil, true) +} + +// GetClosestPeersSeeded is the Kademlia 'node lookup' operation +func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID, useRTPeers bool) (<-chan peer.ID, error) { if key == "" { return nil, fmt.Errorf("can't lookup empty key") } @@ -45,6 +50,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee return peers, err }, func() bool { return false }, + seedPeers, useRTPeers, ) if err != nil { diff --git a/query.go b/query.go index 47d87df05..a1d12b9b9 100644 --- a/query.go +++ b/query.go @@ -76,9 +76,9 @@ type lookupWithFollowupResult struct { // // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. -func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) { // run the query - lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn) + lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers, useRTPeers) if err != nil { return nil, err } @@ -145,10 +145,23 @@ processFollowUp: return lookupRes, nil } -func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, manuallySeededPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) { // pick the K closest peers to the key in our Routing table. targetKadID := kb.ConvertKey(target) - seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) + + seedPeerSet := peer.NewSet() + for _, p := range manuallySeededPeers { + seedPeerSet.Add(p) + } + + if manuallySeededPeers == nil || useRTPeers { + RTSeedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) + for _, p := range RTSeedPeers { + seedPeerSet.Add(p) + } + } + + seedPeers := seedPeerSet.Peers() if len(seedPeers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, diff --git a/routing.go b/routing.go index 92d1d0e7c..af1710aab 100644 --- a/routing.go +++ b/routing.go @@ -34,6 +34,12 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key)) + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return nil, err + } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + // don't even allow local users to put bad values. if err := dht.Validator.Validate(key, value); err != nil { return nil, err @@ -64,7 +70,7 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by return nil, err } - pchan, err := dht.GetClosestPeers(ctx, key) + pchan, err := dht.GetClosestPeersSeeded(ctx, key, seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers) if err != nil { return nil, err } @@ -151,8 +157,10 @@ func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts .. responsesNeeded = dhtrouting.GetQuorum(&cfg) } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + stopCh := make(chan struct{}) - valCh, lookupRes := dht.getValues(ctx, key, stopCh) + valCh, lookupRes := dht.getValues(ctx, key, seedPeerOpts, stopCh) out := make(chan []byte) peers := make(chan []peer.ID, 1) @@ -226,7 +234,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R queryCtx, cancel := context.WithCancel(ctx) defer cancel() - valCh, _ := dht.getValues(queryCtx, key, nil) + valCh, _ := dht.getValues(queryCtx, key, dhtrouting.SeedPeersOptions{UseRTPeers: true}, nil) out := make([]RecvdVal, 0, nvals) for val := range valCh { @@ -304,7 +312,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte } } -func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { +func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { valCh := make(chan RecvdVal, 1) lookupResCh := make(chan *lookupWithFollowupResult, 1) @@ -380,6 +388,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st return false } }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, ) if err != nil { @@ -416,6 +425,12 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo keyMH := key.Hash() logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return nil, err + } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + // add self locally dht.ProviderManager.AddProvider(ctx, keyMH, dht.self) if !brdcst { @@ -444,7 +459,7 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo } var exceededDeadline bool - peers, err := dht.GetClosestPeers(closerCtx, string(keyMH)) + peers, err := dht.GetClosestPeersSeeded(closerCtx, string(keyMH), seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers) switch err { case context.DeadlineExceeded: // If the _inner_ deadline has been exceeded but the _outer_ @@ -516,6 +531,7 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, } count := dhtrouting.GetQuorum(&cfg) + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) chSize := count if count == 0 { @@ -527,7 +543,7 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, keyMH := key.Hash() logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut, closestPeersOut) + go dht.findProvidersAsyncRoutine(ctx, keyMH, count, seedPeerOpts, peerOut, closestPeersOut) return peerOut, closestPeersOut, nil } @@ -541,7 +557,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return providers } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) { +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, seedPeerOpts dhtrouting.SeedPeersOptions, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) { defer close(peerOut) defer close(closestPeersOut) @@ -620,6 +636,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash func() bool { return !findAll && ps.Size() >= count }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, ) if lookupRes != nil { @@ -639,6 +656,12 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro logger.Debugw("finding peer", "peer", id) + var cfg routing.Options + if err := cfg.Apply(opts...); err != nil { + return peer.AddrInfo{}, nil, err + } + seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + // Check if were already connected to them if pi := dht.FindLocal(id); pi.ID != "" { return pi, nil, nil @@ -670,6 +693,7 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro func() bool { return dht.host.Network().Connectedness(id) == network.Connected }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, ) if err != nil { From 961c097b2d54c632b5b3cb128369cc2ce118be25 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 4 Jan 2021 04:08:40 -0500 Subject: [PATCH 7/7] feat: rework routing to be more configurable --- routing.go | 364 ++++++++++++++++---------------------------- routing/options.go | 46 ++++++ routing/pipeline.go | 93 +++++++++++ routing/pipes.go | 97 ++++++++++++ routing/wrapper.go | 91 +++++++++++ 5 files changed, 461 insertions(+), 230 deletions(-) create mode 100644 routing/pipeline.go create mode 100644 routing/pipes.go create mode 100644 routing/wrapper.go diff --git a/routing.go b/routing.go index af1710aab..5fabc4b04 100644 --- a/routing.go +++ b/routing.go @@ -102,20 +102,16 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT - -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { - _, err = dht.PutValueExtended(ctx, key, value, opts...) +func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { + _, err := dht.PutValueExtended(ctx, key, value, opts...) return err } // RecvdVal stores a value and the peer from which we got the value. -type RecvdVal struct { - Val []byte - From peer.ID -} +type RecvdVal dhtrouting.RecvdVal // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { +func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -152,50 +148,49 @@ func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts .. return nil, nil, err } - responsesNeeded := 0 - if !cfg.Offline { - responsesNeeded = dhtrouting.GetQuorum(&cfg) - } - - seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) - - stopCh := make(chan struct{}) - valCh, lookupRes := dht.getValues(ctx, key, seedPeerOpts, stopCh) - - out := make(chan []byte) - peers := make(chan []peer.ID, 1) - go func() { - defer close(out) - defer close(peers) - best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) - - var l *lookupWithFollowupResult - select { - case l = <-lookupRes: - case <-ctx.Done(): - return + processors := dhtrouting.GetProcessors(&cfg) + if processors == nil { + validation := &dhtrouting.ValidationFilter{ + Key: key, + Validator: dht.Validator, } - if l == nil { - return + quorum := &dhtrouting.CountStopper{ + MaxCount: dhtrouting.GetQuorum(&cfg), } - if best == nil || aborted { - peers <- l.peers - return + bestValue := &dhtrouting.BestValueFilterRecorder{ + Key: key, + Validator: dht.Validator, + PeersWithBest: make(map[peer.ID]struct{}), } - updatePeers := make([]peer.ID, 0, dht.bucketSize) - for _, p := range l.peers { - if _, ok := peersWithBest[p]; !ok { - updatePeers = append(updatePeers, p) + processors = []dhtrouting.Processor{validation, quorum, bestValue} + } + + return dhtrouting.SearchValue(ctx, key, dht.getValues, processors, + func(ctx context.Context, best []byte, closestPeers []peer.ID) { + fixupRec := record.MakePutRecord(key, best) + for _, p := range closestPeers { + go func(p peer.ID) { + //TODO: Is this possible? + if p == dht.self { + err := dht.putLocal(key, fixupRec) + if err != nil { + logger.Error("Error correcting local dht entry:", err) + } + return + } + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + err := dht.protoMessenger.PutValue(ctx, p, fixupRec) + if err != nil { + logger.Debug("Error correcting DHT entry: ", err) + } + }(p) } - } - - dht.updatePeerValues(dht.Context(), key, best, updatePeers) - }() - - return out, peers, nil + }, + &cfg) } // SearchValue searches for the value corresponding to given Key and streams the results. @@ -204,28 +199,6 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing return out, err } -func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, - out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { - numResponses := 0 - return dht.processValues(ctx, key, valCh, - func(ctx context.Context, v RecvdVal, better bool) bool { - numResponses++ - if better { - select { - case out <- v.Val: - case <-ctx.Done(): - return false - } - } - - if nvals > 0 && numResponses > nvals { - close(stopCh) - return true - } - return false - }) -} - // GetValues gets nvals values corresponding to the given key. func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { if !dht.enableValues { @@ -234,11 +207,14 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R queryCtx, cancel := context.WithCancel(ctx) defer cancel() - valCh, _ := dht.getValues(queryCtx, key, dhtrouting.SeedPeersOptions{UseRTPeers: true}, nil) + valCh, _ := dht.getValues(queryCtx, context.Background(), key, dhtrouting.SeedPeersOptions{ + SeedPeers: nil, + UseRTPeers: true, + }) out := make([]RecvdVal, 0, nvals) for val := range valCh { - out = append(out, val) + out = append(out, RecvdVal(val)) if len(out) == nvals { cancel() } @@ -247,80 +223,15 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R return out, ctx.Err() } -func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, - newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { -loop: - for { - if aborted { - return - } - - select { - case v, ok := <-vals: - if !ok { - break loop - } - - // Select best value - if best != nil { - if bytes.Equal(best, v.Val) { - peersWithBest[v.From] = struct{}{} - aborted = newVal(ctx, v, false) - continue - } - sel, err := dht.Validator.Select(key, [][]byte{best, v.Val}) - if err != nil { - logger.Warnw("failed to select best value", "key", internal.LoggableRecordKeyString(key), "error", err) - continue - } - if sel != 1 { - aborted = newVal(ctx, v, false) - continue - } - } - peersWithBest = make(map[peer.ID]struct{}) - peersWithBest[v.From] = struct{}{} - best = v.Val - aborted = newVal(ctx, v, true) - case <-ctx.Done(): - return - } - } - - return -} - -func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) { - fixupRec := record.MakePutRecord(key, val) - for _, p := range peers { - go func(p peer.ID) { - //TODO: Is this possible? - if p == dht.self { - err := dht.putLocal(key, fixupRec) - if err != nil { - logger.Error("Error correcting local dht entry:", err) - } - return - } - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - err := dht.protoMessenger.PutValue(ctx, p, fixupRec) - if err != nil { - logger.Debug("Error correcting DHT entry: ", err) - } - }(p) - } -} - -func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { - valCh := make(chan RecvdVal, 1) - lookupResCh := make(chan *lookupWithFollowupResult, 1) +func (dht *IpfsDHT) getValues(ctx, queryAbortedCtx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions) (<-chan dhtrouting.RecvdVal, <-chan []peer.ID) { + valCh := make(chan dhtrouting.RecvdVal, 1) + closestPeersCh := make(chan []peer.ID, 1) logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key)) if rec, err := dht.getLocal(key); rec != nil && err == nil { select { - case valCh <- RecvdVal{ + case valCh <- dhtrouting.RecvdVal{ Val: rec.GetValue(), From: dht.self, }: @@ -330,7 +241,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr go func() { defer close(valCh) - defer close(lookupResCh) + defer close(closestPeersCh) lookupRes, err := dht.runLookupWithFollowup(ctx, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command @@ -359,7 +270,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr // TODO: What should happen if the record is invalid? // Pre-existing code counted it towards the quorum, but should it? if rec != nil && rec.GetValue() != nil { - rv := RecvdVal{ + rv := dhtrouting.RecvdVal{ Val: rec.GetValue(), From: p, } @@ -382,7 +293,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr }, func() bool { select { - case <-stopQuery: + case <-queryAbortedCtx.Done(): return true default: return false @@ -394,14 +305,14 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr if err != nil { return } - lookupResCh <- lookupRes + closestPeersCh <- lookupRes.peers - if ctx.Err() == nil { + if ctx.Err() == nil && seedPeerOpts.SeedPeers == nil { dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes) } }() - return valCh, lookupResCh + return valCh, closestPeersCh } func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { @@ -496,8 +407,8 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo } // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { - _, err = dht.ProvideExtended(ctx, key, brdcst) +func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) error { + _, err := dht.ProvideExtended(ctx, key, brdcst) return err } @@ -530,21 +441,23 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, return nil, nil, err } - count := dhtrouting.GetQuorum(&cfg) - seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + keyMH := key.Hash() - chSize := count - if count == 0 { - chSize = 1 - } - peerOut := make(chan peer.AddrInfo, chSize) - closestPeersOut := make(chan []peer.ID, 1) + processors := dhtrouting.GetProcessors(&cfg) + if processors == nil { + newValuesOnly := &dhtrouting.NewPeerIDFilter{ + Key: string(keyMH), + Peers: peer.NewSet(), + } - keyMH := key.Hash() + quorum := &dhtrouting.CountStopper{ + MaxCount: dhtrouting.GetQuorum(&cfg), + } - logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, keyMH, count, seedPeerOpts, peerOut, closestPeersOut) - return peerOut, closestPeersOut, nil + processors = []dhtrouting.Processor{newValuesOnly, quorum} + } + + return dhtrouting.FindProviders(ctx, keyMH, dht.findProvidersAsyncRoutine, processors, &cfg) } // FindProvidersAsync is the same thing as FindProviders, but returns a channel. @@ -557,95 +470,92 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return providers } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, seedPeerOpts dhtrouting.SeedPeersOptions, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) { - defer close(peerOut) - defer close(closestPeersOut) +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx, queryAbortedCtx context.Context, key multihash.Multihash, seedPeerOpts dhtrouting.SeedPeersOptions) (<-chan peer.AddrInfo, <-chan []peer.ID) { + logger.Debugw("finding providers", "key", key) - findAll := count == 0 - var ps *peer.Set - if findAll { - ps = peer.NewSet() - } else { - ps = peer.NewLimitedSet(count) - } + provsCh := make(chan peer.AddrInfo, 1) + closestPeersCh := make(chan []peer.ID, 1) - provs := dht.ProviderManager.GetProviders(ctx, key) - for _, p := range provs { - // NOTE: Assuming that this list of peers is unique - if ps.TryAdd(p) { + go func() { + defer close(provsCh) + defer close(closestPeersCh) + + provs := dht.ProviderManager.GetProviders(ctx, key) + for _, p := range provs { pi := dht.peerstore.PeerInfo(p) select { - case peerOut <- pi: + case provsCh <- pi: case <-ctx.Done(): return } } - // If we have enough peers locally, don't bother with remote RPC - // TODO: is this a DOS vector? - if !findAll && ps.Size() >= count { - return - } - } - - lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), - func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { - // For DHT query command - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.SendingQuery, - ID: p, - }) + lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), + func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { + // For DHT query command + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.SendingQuery, + ID: p, + }) - provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) - if err != nil { - return nil, err - } + provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) + if err != nil { + return nil, err + } - logger.Debugf("%d provider entries", len(provs)) + logger.Debugf("%d provider entries", len(provs)) - // Add unique providers from request, up to 'count' - for _, prov := range provs { - dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) - logger.Debugf("got provider: %s", prov) - if ps.TryAdd(prov.ID) { - logger.Debugf("using provider: %s", prov) + // Add unique providers from request, up to 'count' + for _, prov := range provs { + dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) + logger.Debugf("got provider: %s", prov) select { - case peerOut <- *prov: + case provsCh <- *prov: case <-ctx.Done(): logger.Debug("context timed out sending more providers") return nil, ctx.Err() } } - if !findAll && ps.Size() >= count { - logger.Debugf("got enough providers (%d/%d)", ps.Size(), count) - return nil, nil - } - } - // Give closer peers back to the query to be queried - logger.Debugf("got closer peers: %d %s", len(closest), closest) + // Give closer peers back to the query to be queried + logger.Debugf("got closer peers: %d %s", len(closest), closest) - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.PeerResponse, - ID: p, - Responses: closest, - }) + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.PeerResponse, + ID: p, + Responses: closest, + }) - return closest, nil - }, - func() bool { - return !findAll && ps.Size() >= count - }, - seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, - ) + return closest, nil + }, + func() bool { + select { + case <-queryAbortedCtx.Done(): + return true + default: + return false + } + }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, + ) - if lookupRes != nil { - closestPeersOut <- lookupRes.peers - } + if err != nil { + return + } + closestPeersCh <- lookupRes.peers - if err == nil && ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) - } + if ctx.Err() == nil && seedPeerOpts.SeedPeers == nil { + dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) + } + }() + + return provsCh, closestPeersCh +} + +// FindPeer searches for a peer with given ID. +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + pid, _, err := dht.FindPeerExtended(ctx, id) + return pid, err } // FindPeerExtended searches for a peer with given ID. @@ -720,9 +630,3 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro return peer.AddrInfo{}, lookupRes.peers, routing.ErrNotFound } - -// FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { - pid, _, err := dht.FindPeerExtended(ctx, id) - return pid, err -} diff --git a/routing/options.go b/routing/options.go index 7f3d1615d..b009e0122 100644 --- a/routing/options.go +++ b/routing/options.go @@ -58,3 +58,49 @@ func GetSeedPeers(opts *routing.Options) SeedPeersOptions { } return seedPeersOpts } + +type updateDuringGetOptionKey struct{} + +// UpdateDuringGet is a DHT option that tells the DHT if it should update peers with +// old data while doing a Get +// +// Default: true for Get/SearchValue, and false otherwise +func UpdateDuringGet(updateDuringGet bool) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[updateDuringGetOptionKey{}] = updateDuringGet + return nil + } +} + +func getUpdateDuringGet(opts *routing.Options, defaulValue bool) bool { + updateDuringGet, ok := opts.Other[updateDuringGetOptionKey{}].(bool) + if !ok { + updateDuringGet = defaulValue + } + return updateDuringGet +} + +type processorsOptionKey struct{} + +// WithProcessors is a DHT option that tells the DHT which processors it should use +// (and the order to apply them) on lookup results. +func WithProcessors(processors ...Processor) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[processorsOptionKey{}] = processors + return nil + } +} + +func GetProcessors(opts *routing.Options) []Processor { + processors, ok := opts.Other[processorsOptionKey{}].([]Processor) + if !ok { + processors = nil + } + return processors +} diff --git a/routing/pipeline.go b/routing/pipeline.go new file mode 100644 index 000000000..39d800c8a --- /dev/null +++ b/routing/pipeline.go @@ -0,0 +1,93 @@ +package routing + +import ( + "context" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/peer" +) + +var ( + logger = logging.Logger("dht.routing") +) + +// RecvdVal stores a value and the peer from which we got the value. +type RecvdVal struct { + Val []byte + From peer.ID +} + +func RunRecordPipeline(ctx context.Context, processors []Processor, abortQuery func(), out chan<- []byte, in <-chan RecvdVal) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + processInput: + for { + select { + case i, more := <-in: + if !more { + return + } + var state interface{} = i + var err error + for _, t := range processors { + state, err = t.Process(state, abortQuery) + if err != nil { + continue processInput + + } + } + + finalState := state.(RecvdVal) + select { + case out <- finalState.Val: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + }() + return errCh +} + +func RunProvidersPipeline(ctx context.Context, processors []Processor, abortQuery func(), out chan<- peer.AddrInfo, in <-chan peer.AddrInfo) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + processInput: + for { + select { + case i, more := <-in: + if !more { + return + } + var state interface{} = i + var err error + for _, t := range processors { + state, err = t.Process(state, abortQuery) + if err != nil { + continue processInput + + } + } + + finalState := state.(peer.AddrInfo) + select { + case out <- finalState: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + }() + return errCh +} diff --git a/routing/pipes.go b/routing/pipes.go new file mode 100644 index 000000000..7377eca47 --- /dev/null +++ b/routing/pipes.go @@ -0,0 +1,97 @@ +package routing + +import ( + "bytes" + "errors" + "github.com/libp2p/go-libp2p-core/peer" + record "github.com/libp2p/go-libp2p-record" +) + +type Processor interface { + Process(interface{}, func()) (interface{}, error) +} + +var skipErr = errors.New("skip value") + +type CountStopper struct { + Count int + MaxCount int +} + +func (f *CountStopper) Process(val interface{}, abortQuery func()) (interface{}, error) { + f.Count++ + if f.MaxCount > 0 && f.Count >= f.MaxCount { + abortQuery() + } + return val, nil +} + +type ValidationFilter struct { + Key string + Validator record.Validator +} + +func (f *ValidationFilter) Process(val interface{}, _ func()) (interface{}, error) { + v := val.(RecvdVal) + err := f.Validator.Validate(f.Key, v.Val) + if err != nil { + return nil, err + } + return v, nil +} + +type BestValueFilterRecorder struct { + Key string + Best []byte + Validator record.Validator + PeersWithBest map[peer.ID]struct{} +} + +func (p *BestValueFilterRecorder) Process(val interface{}, _ func()) (interface{}, error) { + v := val.(RecvdVal) + + // Select best value + if p.Best != nil { + if bytes.Equal(p.Best, v.Val) { + p.PeersWithBest[v.From] = struct{}{} + return nil, skipErr + } + newIsBetter, err := p.getBetterRecord(p.Key, p.Best, v.Val) + if err != nil { + logger.Warnw("failed to select best value", "key", p.Key, "error", err) + return nil, skipErr + } + if !newIsBetter { + return nil, skipErr + } + } + p.PeersWithBest = make(map[peer.ID]struct{}) + p.PeersWithBest[v.From] = struct{}{} + p.Best = v.Val + return v, nil +} + +func (p *BestValueFilterRecorder) getBetterRecord(key string, current, new []byte) (bool, error) { + sel, err := p.Validator.Select(key, [][]byte{current, new}) + if err != nil { + return false, err + } + return sel == 1, nil +} + +type NewPeerIDFilter struct { + Key string + Peers *peer.Set +} + +func (p *NewPeerIDFilter) Process(val interface{}, _ func()) (interface{}, error) { + prov := val.(peer.AddrInfo) + + logger.Debugf("got provider: %s", prov) + if p.Peers.TryAdd(prov.ID) { + logger.Debugf("using provider: %s", prov) + return prov, nil + } + + return nil, skipErr +} diff --git a/routing/wrapper.go b/routing/wrapper.go new file mode 100644 index 000000000..62e999ea2 --- /dev/null +++ b/routing/wrapper.go @@ -0,0 +1,91 @@ +package routing + +import ( + "context" + "github.com/multiformats/go-multihash" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" +) + +type getValuesFn func(ctx, queryAbortedCtx context.Context, key string, seedPeerOpts SeedPeersOptions) (<-chan RecvdVal, <-chan []peer.ID) +type updatePeerValuesFn func(ctx context.Context, best []byte, closestPeers []peer.ID) + +// SearchValue searches for the value corresponding to given Key and streams the results. +func SearchValue(ctx context.Context, key string, getVals getValuesFn, processors []Processor, updatePeerValues updatePeerValuesFn, cfg *routing.Options) (<-chan []byte, <-chan []peer.ID, error) { + seedPeerOpts := GetSeedPeers(cfg) + updateDuringGet := getUpdateDuringGet(cfg, true) + queryAbortCtx, abortQuery := context.WithCancel(context.Background()) + valCh, closestPeersCh := getVals(ctx, queryAbortCtx, key, seedPeerOpts) + + out := make(chan []byte) + returnClosestPeersCh := make(chan []peer.ID, 1) + + var closestPeers []peer.ID + + go func() { + defer close(out) + defer close(returnClosestPeersCh) + defer abortQuery() + errCh := RunRecordPipeline(ctx, processors, abortQuery, out, valCh) + if err := <-errCh; err != nil { + return + } + + closestPeers = <-closestPeersCh + if updateDuringGet && updatePeerValues != nil { + for _, f := range processors { + if bv, ok := f.(*BestValueFilterRecorder); ok { + if bv.Best == nil { + break + } + + updatePeers := make([]peer.ID, 0, len(closestPeers)) + for _, p := range closestPeers { + if _, ok := bv.PeersWithBest[p]; !ok { + updatePeers = append(updatePeers, p) + } + } + updatePeerValues(ctx, bv.Best, updatePeers) + break + } + } + } + returnClosestPeersCh <- closestPeers + }() + + return out, returnClosestPeersCh, nil +} + +type findProvsFn func(ctx, queryAbortedCtx context.Context, key multihash.Multihash, seedPeerOpts SeedPeersOptions) (<-chan peer.AddrInfo, <-chan []peer.ID) + +// FindProviders searches for the providers corresponding to given Key and streams the results. +func FindProviders(ctx context.Context, key multihash.Multihash, findProvsFn findProvsFn, processors []Processor, cfg *routing.Options) (<-chan peer.AddrInfo, <-chan []peer.ID, error) { + seedPeerOpts := GetSeedPeers(cfg) + maxRequestedRecords := GetQuorum(cfg) + + queryAbortCtx, abortQuery := context.WithCancel(context.Background()) + valCh, closestPeersCh := findProvsFn(ctx, queryAbortCtx, key, seedPeerOpts) + + outChSize := maxRequestedRecords + if outChSize == 0 { + outChSize = 1 + } + out := make(chan peer.AddrInfo, outChSize) + returnClosestPeersCh := make(chan []peer.ID, 1) + + go func() { + defer close(out) + defer close(returnClosestPeersCh) + defer abortQuery() + errCh := RunProvidersPipeline(ctx, processors, abortQuery, out, valCh) + if err := <-errCh; err != nil { + return + } + + closestPeers := <-closestPeersCh + returnClosestPeersCh <- closestPeers + }() + + return out, returnClosestPeersCh, nil +}