Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
bitswap,bitswap/network,bitswap/client: move content routing responsa…
Browse files Browse the repository at this point in the history
…bilities to an option of the client

Given that the previous commit remove the content advertising from the server, it did not made sense to share these paths on the network.

The code has been reworked:
- addresses aren't magically added to the peerstore as a side-effect of calling `Network.FindProvidersAsync`. Instead they are passed as hints to ConnectTo which copies libp2p `host.ConnectTo` API.
- the providerquerymanager is completely shutdown when not using `WithContentSearch` option, this helps usecase where `routinghelpers.Null` is used for content routing and the consumer exclusively rely on broadcast, like networks where most peoples have all the content (Filecoin, Celestia, ...).
Jorropo committed Dec 28, 2023
1 parent c6cf2bf commit b5aa23d
Showing 21 changed files with 219 additions and 233 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -16,8 +16,14 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap` & `bitswap/client` now have a `WithContentSearch` option, this pickup the content routing job from `bitswap/network`.
It used to be a commun pattern for consumers which do not need external content routing to pass a [`routinghelpers.Null`](https://pkg.go.dev/github.com/libp2p/go-libp2p-routing-helpers#Null), now this can be ommited completely which is more efficient.

### Changed

- 🛠 `bitswap/network` no longer manages content routing, related Methods and function Arguments have been removed.
- `Network.ConnectTo` method has been changed from [`peer.ID`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#ID) to [`peer.AddrInfo`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#AddrInfo), given adding addresses hints used to be a side effect of the network. Theses now need to be passed in as values.

### Removed

- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.
9 changes: 4 additions & 5 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ import (
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
)
@@ -142,7 +141,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
oldSeedCount := bch.oldSeedCount
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
net := tn.VirtualNetwork(fixedDelay)

// Simulate an older Bitswap node (old protocol ID) that doesn't
// send DONT_HAVE responses
@@ -294,7 +293,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
numblks := 1000

for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
@@ -312,7 +311,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
net := tn.VirtualNetwork(d)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)

@@ -327,7 +326,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
30 changes: 15 additions & 15 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
const kNetworkDelay = 0 * time.Millisecond

func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
@@ -66,7 +66,7 @@ func TestClose(t *testing.T) {

func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

@@ -90,7 +90,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
}

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
@@ -118,7 +118,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
@@ -150,7 +150,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
func TestUnwantedBlockNotAdded(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsMessage := bsmsg.New(true)
bsMessage.AddBlock(block)
@@ -186,7 +186,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
// (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4

@@ -278,7 +278,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
bitswap.TaskWorkerCount(5),
bitswap.EngineTaskWorkerCount(5),
@@ -335,7 +335,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.SkipNow()
}

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
@@ -373,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

func TestEmptyKey(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bs := ig.Instances(1)[0].Exchange
@@ -406,7 +406,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6
}

func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
@@ -478,7 +478,7 @@ func TestBasicBitswap(t *testing.T) {
}

func TestDoubleGet(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
@@ -543,7 +543,7 @@ func TestDoubleGet(t *testing.T) {
}

func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
@@ -665,7 +665,7 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt {
}

func TestBitswapLedgerOneWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
@@ -714,7 +714,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {
}

func TestBitswapLedgerTwoWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
@@ -803,7 +803,7 @@ func (tsl *testingScoreLedger) Stop() {
// Tests start and stop of a custom decision logic
func TestWithScoreLedger(t *testing.T) {
tsl := newTestingScoreLedger()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()
40 changes: 29 additions & 11 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/client/internal/session"
"github.com/ipfs/boxo/bitswap/client/traceability"
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
@@ -18,13 +19,15 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
tu "github.com/libp2p/go-libp2p-testing/etc"
tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

func getVirtualNetwork() tn.Network {
// FIXME: the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
return tn.VirtualNetwork(delay.Fixed(0))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
@@ -37,10 +40,6 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
@@ -114,7 +113,7 @@ func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
@@ -219,16 +218,23 @@ func TestFetchNotConnected(t *testing.T) {
defer cancel()

vnet := getVirtualNetwork()
rs := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

other := ig.Next()
var otherClient mockrouting.Client
other := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
otherClient = rs.Client(id)
return nil, nil // don't add content search, only the client needs it
})

// Provide 10 blocks on Peer A
blks := bgen.Blocks(10)
for _, block := range blks {
addBlock(t, ctx, other, block)
err := otherClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

var cids []cid.Cid
@@ -239,7 +245,9 @@ func TestFetchNotConnected(t *testing.T) {
// Request blocks with Peer B
// Note: Peer A and Peer B are not initially connected, so this tests
// that Peer B will search for and find Peer A
thisNode := ig.Next()
thisNode := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
})
ses := thisNode.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

@@ -262,16 +270,19 @@ func TestFetchAfterDisconnect(t *testing.T) {
defer cancel()

vnet := getVirtualNetwork()
rs := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{
bitswap.ProviderSearchDelay(10 * time.Millisecond),
bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)),
})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

inst := ig.Instances(2)
peerA := inst[0]
peerB := inst[1]
var aClient mockrouting.Client
peerA := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
aClient = rs.Client(id)
return nil, nil // don't add content search, only the client needs it
})

// Provide 5 blocks on Peer A
blks := bgen.Blocks(10)
@@ -283,9 +294,14 @@ func TestFetchAfterDisconnect(t *testing.T) {
firstBlks := blks[:5]
for _, block := range firstBlks {
addBlock(t, ctx, peerA, block)
err := aClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

// Request all blocks with Peer B
peerB := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
})
ses := peerB.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

@@ -317,6 +333,8 @@ func TestFetchAfterDisconnect(t *testing.T) {
lastBlks := blks[5:]
for _, block := range lastBlks {
addBlock(t, ctx, peerA, block)
err := aClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

// Peer B should call FindProviders() and find Peer A
Loading

0 comments on commit b5aa23d

Please sign in to comment.