diff --git a/CHANGELOG.md b/CHANGELOG.md index ee78a8b19..4a4c5d43a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The following emojis are used to highlight certain changes: ## [Unreleased] ### Added +🛠 - New non variadic `NotifyNewBlock` function. This changes the `blockservice.Interface`. The new function avoids allocating a slice on each call when called with one block. * `boxo/bitswap/server`: * A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 393ab96ad..74ef79014 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -98,6 +98,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc return bs } +func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + return multierr.Combine( + bs.Client.NotifyNewBlock(ctx, blk), + bs.Server.NotifyNewBlock(ctx, blk), + ) +} + func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error { return multierr.Combine( bs.Client.NotifyNewBlocks(ctx, blks...), diff --git a/bitswap/client/client.go b/bitswap/client/client.go index e0f952d82..060d6c7f8 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -283,6 +283,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks. return session.GetBlocks(ctx, keys) } +// NotifyNewBlock announces the existence of blocks to this bitswap service. +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + // Call to the variadic to avoid code duplication. + // This is actually fine to do because no calls is virtual the compiler is able + // to see that the slice does not leak and the slice is stack allocated. + return bs.NotifyNewBlocks(ctx, blk) +} + // NotifyNewBlocks announces the existence of blocks to this bitswap service. // Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure // that those blocks are available in the blockstore before calling this function. diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 5e4463e33..e67c15fda 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -977,8 +977,9 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) { } } -// NotifyNewBlocks is called when new blocks becomes available locally, and in particular when the caller of bitswap -// decide to store those blocks and make them available on the network. +// NotifyNewBlocks is called when new blocks become available locally, and in +// particular when the caller of bitswap decides to store those blocks and make +// them available on the network. func (e *Engine) NotifyNewBlocks(blks []blocks.Block) { if len(blks) == 0 { return diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 46d29a8fc..f3c5786a9 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -446,6 +446,17 @@ func (bs *Server) Stat() (Stat, error) { return s, nil } +// NotifyNewBlock announces the existence of block to this bitswap service. The +// service will potentially notify its peers. +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + // Call to the variadic to avoid code duplication. + // This is actually fine to do because no calls is virtual the compiler is able + // to see that the slice does not leak and the slice is stack allocated. + return bs.NotifyNewBlocks(ctx, blk) +} + // NotifyNewBlocks announces the existence of blocks to this bitswap service. The // service will potentially notify its peers. // Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 353be00f8..43e474cb4 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -176,8 +176,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Debugf("BlockService.BlockAdded %s", c) if s.exchange != nil { - if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil { - logger.Errorf("NotifyNewBlocks: %s", err.Error()) + if err := s.exchange.NotifyNewBlock(ctx, o); err != nil { + logger.Errorf("NotifyNewBlock: %s", err.Error()) } } @@ -282,7 +282,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } if ex := bs.Exchange(); ex != nil { - err = ex.NotifyNewBlocks(ctx, blk) + err = ex.NotifyNewBlock(ctx, blk) if err != nil { return nil, err } @@ -364,7 +364,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } ex := blockservice.Exchange() - var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block select { @@ -386,13 +385,11 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet if ex != nil { // inform the exchange that the blocks are available - cache[0] = b - err = ex.NotifyNewBlocks(ctx, cache[:]...) + err = ex.NotifyNewBlock(ctx, b) if err != nil { logger.Errorf("could not tell the exchange about new blocks: %s", err) return } - cache[0] = nil // early gc } select { diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 29350ff37..f1a64de80 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -205,6 +205,11 @@ type notifyCountingExchange struct { notifyCount int } +func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error { + n.notifyCount++ + return n.Interface.NotifyNewBlock(ctx, blocks) +} + func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { n.notifyCount += len(blocks) return n.Interface.NotifyNewBlocks(ctx, blocks...) @@ -312,6 +317,10 @@ func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fe return f.ses } +func (*fakeIsNewSessionCreateExchange) NotifyNewBlock(context.Context, blocks.Block) error { + return nil +} + func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error { return nil } diff --git a/exchange/interface.go b/exchange/interface.go index 3ae174d5c..7d32960a5 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,6 +13,8 @@ import ( type Interface interface { // type Exchanger interface Fetcher + // NotifyNewBlock tells the exchange that a new block is available and can be served. + NotifyNewBlock(ctx context.Context, blocks blocks.Block) error // NotifyNewBlocks tells the exchange that new blocks are available and can be served. NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index f3590893e..1e8cdd2fe 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block return blk, err } +// NotifyNewBlock tells the exchange that a new block is available and can be served. +func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error { + // as an offline exchange we have nothing to do + return nil +} + // NotifyNewBlocks tells the exchange that new blocks are available and can be served. func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { // as an offline exchange we have nothing to do