Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
bitswap/client: remove providerQueryManager and move logic in session
Browse files Browse the repository at this point in the history
Closes: #172
See #172 (comment) for rational.
Jorropo committed Dec 29, 2023
1 parent fda54dc commit d705aa3
Showing 5 changed files with 106 additions and 865 deletions.
20 changes: 2 additions & 18 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ import (
bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
"github.com/ipfs/boxo/bitswap/client/internal/notifications"
bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager"
bspqm "github.com/ipfs/boxo/bitswap/client/internal/providerquerymanager"
bssession "github.com/ipfs/boxo/bitswap/client/internal/session"
bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager"
bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager"
@@ -99,7 +98,7 @@ func WithoutDuplicatedBlockStats() Option {
}
}

type ContentSearcher = bspqm.ContentRouter
type ContentSearcher = bssession.ProviderFinder

// WithContentSearch allows the client to search for providers when it is not
// able to find the content itself.
@@ -155,11 +154,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
option(bs)
}

if bs.router != nil {
bs.pqm = bspqm.New(ctx, network, bs.router)
bs.pqm.Startup()
}

// bind the context and process.
// do it over here to avoid closing before all setup is done.
go func() {
@@ -179,10 +173,6 @@ type Client struct {

pm *bspm.PeerManager

// the provider query manager manages requests to find providers
// is nil if content routing is disabled
pqm *bspqm.ProviderQueryManager

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

@@ -244,13 +234,7 @@ func (bs *Client) sessionFactory(
rebroadcastDelay delay.D,
self peer.ID,
) bssm.Session {
// avoid typed nils
var pqm bssession.ProviderFinder
if bs.pqm != nil {
pqm = bs.pqm
}

return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(sessctx, sessmgr, id, spm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, bs.router, bs.network)
}

// onDontHaveTimeout is called when a want-block is sent to a peer that
440 changes: 0 additions & 440 deletions bitswap/client/internal/providerquerymanager/providerquerymanager.go

This file was deleted.

This file was deleted.

111 changes: 90 additions & 21 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package session

import (
"context"
"sync"
"time"

"github.com/ipfs/boxo/bitswap/client/internal"
@@ -15,6 +16,8 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)

@@ -25,6 +28,9 @@ var (

const (
broadcastLiveWantsLimit = 64
// MAGIC: why ten ? we should keep searching until we find a couple of online peers
maxProviders = 10
findProvidersTimeout = 10 * time.Second
)

// PeerManager keeps track of which sessions are interested in which peers
@@ -74,7 +80,7 @@ type SessionPeerManager interface {
// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
// FindProvidersAsync searches for peers that provide the given CID
FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo
}

// opType is the kind of operation that is being processed by the event loop
@@ -103,13 +109,12 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
shutdown func()
sm SessionManager
pm PeerManager
sprm SessionPeerManager
providerFinder ProviderFinder // optional, nil when missing
sim *bssim.SessionInterestManager
ctx context.Context
shutdown func()
sm SessionManager
pm PeerManager
sprm SessionPeerManager
sim *bssim.SessionInterestManager

sw sessionWants
sws sessionWantSender
@@ -131,7 +136,15 @@ type Session struct {
notif notifications.PubSub
id uint64

self peer.ID
providerFinder ProviderFinder // optional, nil when missing
network Connector
}

type Connector interface {
Self() peer.ID

// ConnectTo attempts to connect to the peer, using the passed addresses as a hint, they can be empty.
ConnectTo(context.Context, peer.AddrInfo) error
}

// New creates a new bitswap session whose lifetime is bounded by the
@@ -141,15 +154,15 @@ func New(
sm SessionManager,
id uint64,
sprm SessionPeerManager,
// providerFinder might be nil
providerFinder ProviderFinder,
sim *bssim.SessionInterestManager,
pm PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
initialSearchDelay time.Duration,
periodicSearchDelay delay.D,
self peer.ID,
// providerFinder might be nil
providerFinder ProviderFinder,
network Connector,
) *Session {
ctx, cancel := context.WithCancel(ctx)
s := &Session{
@@ -160,7 +173,6 @@ func New(
sm: sm,
pm: pm,
sprm: sprm,
providerFinder: providerFinder,
sim: sim,
incoming: make(chan op, 128),
latencyTrkr: latencyTracker{},
@@ -169,7 +181,8 @@ func New(
id: id,
initialSearchDelay: initialSearchDelay,
periodicSearchDelay: periodicSearchDelay,
self: self,
providerFinder: providerFinder,
network: network,
}
s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted)

@@ -218,13 +231,13 @@ func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []c
}

for _, c := range interestedKs {
log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
log.Debugw("Bitswap <- block", "local", s.network.Self(), "from", from, "cid", c, "session", s.id)
}
for _, c := range haves {
log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
log.Debugw("Bitswap <- HAVE", "local", s.network.Self(), "from", from, "cid", c, "session", s.id)
}
for _, c := range dontHaves {
log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
log.Debugw("Bitswap <- DONT_HAVE", "local", s.network.Self(), "from", from, "cid", c, "session", s.id)
}
}

@@ -396,12 +409,68 @@ func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
// ¯\_(ツ)_/¯
return
}

go func(k cid.Cid) {
for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
// When a provider indicates that it has a cid, it's equivalent to
// the providing peer sending a HAVE
s.sws.Update(p, nil, []cid.Cid{c}, nil)
ctx, span := internal.StartSpan(ctx, "Session.findMorePeers")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("cid", c))
}

ctx, cancel := context.WithTimeout(ctx, findProvidersTimeout)
defer cancel()

providers := s.providerFinder.FindProvidersAsync(ctx, k, maxProviders)
var wg sync.WaitGroup
providerLoop:
for {
select {
case p, ok := <-providers:
if !ok {
break providerLoop
}

if p.ID == s.network.Self() {
continue // ignore self as provider
}

wg.Add(1)
go func(p peer.AddrInfo) {
defer wg.Done()

ctx, span := internal.StartSpan(ctx, "Session.findMorePeers.ConnectTo")
defer span.End()
recording := span.IsRecording()
if recording {
maddrs := make([]string, len(p.Addrs))
for i, a := range p.Addrs {
maddrs[i] = a.String()
}

span.SetAttributes(
attribute.Stringer("peer", p.ID),
attribute.StringSlice("addrs", maddrs),
)
}

err := s.network.ConnectTo(ctx, p)
if err != nil {
if recording {
span.SetStatus(codes.Error, err.Error())
}
log.Debugf("failed to connect to provider %s: %s", p, err)
return
}

// When a provider indicates that it has a cid, it's equivalent to
// the providing peer sending a HAVE
s.sws.Update(p.ID, nil, []cid.Cid{c}, nil)
}(p)
case <-ctx.Done():
return
}
}
wg.Wait()
}(c)
}

28 changes: 14 additions & 14 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
@@ -112,17 +112,22 @@ func newFakeProviderFinder() *fakeProviderFinder {
}
}

func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID {
func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo {
go func() {
select {
case fpf.findMorePeersRequested <- k:
case <-ctx.Done():
}
}()

return make(chan peer.ID)
return make(chan peer.AddrInfo)
}

type fakeNetwork struct{}

func (fakeNetwork) Self() peer.ID { return "" }
func (fakeNetwork) ConnectTo(context.Context, peer.AddrInfo) error { return nil }

type wantReq struct {
cids []cid.Cid
}
@@ -154,14 +159,13 @@ func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{})
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
@@ -256,7 +260,7 @@ func TestSessionFindMorePeers(t *testing.T) {
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), fpf, fakeNetwork{})
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
@@ -323,15 +327,14 @@ func TestSessionOnPeersExhausted(t *testing.T) {
defer cancel()
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()

sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{})
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid
@@ -377,7 +380,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), fpf, fakeNetwork{})
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
@@ -483,7 +486,6 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
@@ -493,7 +495,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(sessctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{})

timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel()
@@ -533,7 +535,6 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
func TestSessionOnShutdownCalled(t *testing.T) {
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
@@ -544,7 +545,7 @@ func TestSessionOnShutdownCalled(t *testing.T) {
// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer sesscancel()
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(sessctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{})

// Shutdown the session
session.Shutdown()
@@ -561,15 +562,14 @@ func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 20*time.Millisecond)
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()

sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{})
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

0 comments on commit d705aa3

Please sign in to comment.