Skip to content

Commit

Permalink
Move providing responsabilities from bitswap to blockservice
Browse files Browse the repository at this point in the history
- bitswap/server: remove provide
- blockservice: add session workaround to work with wrapped blockservices
- blockservice: add WithProvider option
- blockservice: remove session embeding in context

Replaces #534 by @Jorropo which was outdated enough to make merging difficult.
  • Loading branch information
gammazero committed Sep 30, 2024
1 parent 19a402b commit 29faea9
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 293 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes:
* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
- `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671)
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.

### Changed

Expand Down Expand Up @@ -61,6 +62,7 @@ The following emojis are used to highlight certain changes:
- `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636)
- `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651)
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629)
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.

## [v0.21.0]

Expand Down
12 changes: 2 additions & 10 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand Down Expand Up @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()

Expand Down
4 changes: 4 additions & 0 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const (
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256

// Maximum size of the wantlist we are willing to keep in memory.
MaxQueuedWantlistEntiresPerPeer = 1024
Expand Down
4 changes: 0 additions & 4 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option {
return Option{server.TaskWorkerCount(count)}
}

func ProvideEnabled(enabled bool) Option {
return Option{server.ProvideEnabled(enabled)}
}

func SetSendDontHaves(send bool) Option {
return Option{server.SetSendDontHaves(send)}
}
Expand Down
166 changes: 8 additions & 158 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)

var provideKeysBufferSize = 2048

var (
log = logging.Logger("bitswap/server")
sflog = log.Desugar()
)

const provideWorkerMax = 6

type Option func(*Server)

type Server struct {
Expand All @@ -59,20 +54,8 @@ type Server struct {

process process.Process

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid

// Extra options to pass to the decision manager
engineOptions []decision.Option

// the size of channel buffer to use
hasBlockBufferSize int
// whether or not to make provide announcements
provideEnabled bool
}

func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server {
Expand All @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
}()

s := &Server{
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
provideEnabled: true,
hasBlockBufferSize: defaults.HasBlockBufferSize,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
}
s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize)

for _, o := range options {
o(s)
Expand Down Expand Up @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option {
}
}

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Server) {
bs.provideEnabled = enabled
}
}

func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option {
o := decision.WithPeerBlockRequestFilter(pbrf)
return func(bs *Server) {
Expand Down Expand Up @@ -241,16 +213,6 @@ func MaxCidSize(n uint) Option {
}
}

// HasBlockBufferSize configure how big the new blocks buffer should be.
func HasBlockBufferSize(count int) Option {
if count < 0 {
panic("cannot have negative buffer size")
}
return func(bs *Server) {
bs.hasBlockBufferSize = count
}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which the bitswap server will replace a WantHave with a WantBlock response.
//
Expand Down Expand Up @@ -303,18 +265,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) {
bs.taskWorker(ctx, i)
})
}

if bs.provideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
}
}

func (bs *Server) taskWorker(ctx context.Context, id int) {
Expand Down Expand Up @@ -422,18 +372,16 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
}

type Stat struct {
Peers []string
ProvideBufLen int
BlocksSent uint64
DataSent uint64
Peers []string
BlocksSent uint64
DataSent uint64
}

// Stat returns aggregated statistics about bitswap operations
func (bs *Server) Stat() (Stat, error) {
bs.counterLk.Lock()
s := bs.counters
bs.counterLk.Unlock()
s.ProvideBufLen = len(bs.newBlocks)

peers := bs.engine.Peers()
peersStr := make([]string, len(peers))
Expand All @@ -460,107 +408,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

return nil
}

func (bs *Server) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toProvide []cid.Cid
var nextKey cid.Cid
var keysOut chan cid.Cid

for {
select {
case blkey, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}

if keysOut == nil {
nextKey = blkey
keysOut = bs.provideKeys
} else {
toProvide = append(toProvide, blkey)
}
case keysOut <- nextKey:
if len(toProvide) > 0 {
nextKey = toProvide[0]
toProvide = toProvide[1:]
} else {
keysOut = nil
}
case <-ctx.Done():
return
}
}
}

func (bs *Server) provideWorker(px process.Process) {
// FIXME: OnClosingContext returns a _custom_ context type.
// Unfortunately, deriving a new cancelable context from this custom
// type fires off a goroutine. To work around this, we create a single
// cancelable context up-front and derive all sub-contexts from that.
//
// See: https://github.com/ipfs/go-ipfs/issues/5810
ctx := procctx.OnClosingContext(px)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

limit := make(chan struct{}, provideWorkerMax)

limitedGoProvide := func(k cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
}()

log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)

ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
defer cancel()

if err := bs.network.Provide(ctx, k); err != nil {
log.Warn(err)
}
}

// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
for wid := 2; ; wid++ {
log.Debug("Bitswap.ProvideWorker.Loop")

select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
select {
case <-px.Closing():
return
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
}
}
}
}

func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
Expand Down
Loading

0 comments on commit 29faea9

Please sign in to comment.