Skip to content

Commit

Permalink
fix!: move block event reporting to bitswap's blockstore
Browse files Browse the repository at this point in the history
BREAKING CHANGE because the signature of NewByteCountingLinkSystem has changed
to include a peer.ID.

We need to report BlockReceived events from the immediate blockstore that
bitswap writes to so we don't incur a delay and have our preload cache race
to report it's got the block before our "loader" has time to register it has
it.
  • Loading branch information
rvagg committed Oct 12, 2023
1 parent 9613e24 commit 98486ac
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 28 deletions.
9 changes: 7 additions & 2 deletions pkg/retriever/bitswaphelpers/countinglinksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
"github.com/libp2p/go-libp2p/core/peer"
)

type cumulativeCountWriter struct {
Expand Down Expand Up @@ -32,15 +33,19 @@ func (ccw *cumulativeCountWriter) Commit(link datamodel.Link) error {
return nil
}

func NewByteCountingLinkSystem(lsys *linking.LinkSystem, bytesWritten func(count uint64)) *linking.LinkSystem {
func NewByteCountingLinkSystem(lsys *linking.LinkSystem, blockWritten func(from *peer.ID, count uint64)) *linking.LinkSystem {
newLsys := *lsys // copy all values from old system
oldWriteOpener := lsys.StorageWriteOpener
newLsys.StorageWriteOpener = func(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) {
var from *peer.ID = nil
if p, ok := lctx.Ctx.Value(peerIdContextKey).(peer.ID); ok {
from = &p
}
w, committer, err := oldWriteOpener(lctx)
if err != nil {
return w, committer, err
}
ccw := &cumulativeCountWriter{w, 0, committer, bytesWritten}
ccw := &cumulativeCountWriter{w, 0, committer, func(count uint64) { blockWritten(from, count) }}
return ccw, ccw.Commit, err
}
return &newLsys
Expand Down
13 changes: 12 additions & 1 deletion pkg/retriever/bitswaphelpers/multiblockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/boxo/bitswap/client/traceability"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
Expand All @@ -23,6 +24,10 @@ var ErrAlreadyRegisterd = errors.New("already registered")
// ErrAlreadyRegistered means there is nothing registered for a retrieval id
var ErrNotRegistered = errors.New("not registered")

type contextKey string

const peerIdContextKey = contextKey("traceableBlock.peerId")

// MultiBlockstore creates a blockstore based on one or more linkystems, extracting the target linksystem for each request
// from the retrieval id context key
type MultiBlockstore struct {
Expand Down Expand Up @@ -123,7 +128,13 @@ func (mbs *MultiBlockstore) PutMany(ctx context.Context, blks []blocks.Block) er
return ErrNotRegistered
}
for _, blk := range blks {
w, commit, err := lsys.StorageWriteOpener(linking.LinkContext{Ctx: ctx})
lctx := ctx
if traceableBlock, ok := blk.(traceability.Block); ok {
lctx = context.WithValue(lctx, peerIdContextKey, traceableBlock.From)
} else {
logger.Warn("Got untraceable block from bitswap")
}
w, commit, err := lsys.StorageWriteOpener(linking.LinkContext{Ctx: lctx})
if err != nil {
return err
}
Expand Down
27 changes: 17 additions & 10 deletions pkg/retriever/bitswaphelpers/preloadcachingstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,18 +362,25 @@ func (cs *PreloadCachingStorage) preloadLink(pl *preloadingLink, linkCtx linking
}
pl.err = err
} else {
w, c, err := cs.cacheLinkSystem.StorageWriteOpener(linkCtx)
if err != nil {
pl.err = err
return
}
if _, err := io.Copy(w, reader); err != nil {
pl.err = err
return
}
if err := c(link); err != nil {
// the user of PreloadCachingStorage may have already wired up the
// cacheLinkSystem to receive the blocks from fetcher()
if has, err := linkSystemHas(cs.cacheLinkSystem, linkCtx, link); err != nil {
pl.err = err
return
} else if !has {
w, c, err := cs.cacheLinkSystem.StorageWriteOpener(linkCtx)
if err != nil {
pl.err = err
return
}
if _, err := io.Copy(w, reader); err != nil {
pl.err = err
return
}
if err := c(link); err != nil {
pl.err = err
return
}
}
}
})
Expand Down
27 changes: 12 additions & 15 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/filecoin-project/lassie/pkg/retriever/bitswaphelpers/groupworkpool"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/client/traceability"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -184,13 +183,22 @@ func (br *bitswapRetrieval) runRetrieval(ctx context.Context, ayncCandidates typ

totalWritten := atomic.Uint64{}
blockCount := atomic.Uint64{}
bytesWrittenCb := func(bytesWritten uint64) {
blockWrittenCb := func(from *peer.ID, bytesWritten uint64) {
// record first byte received
if totalWritten.Load() == 0 {
shared.sendEvent(ctx, events.FirstByte(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, br.clock.Since(startTime), multicodec.TransportBitswap))
}
totalWritten.Add(bytesWritten)
blockCount.Add(1)
if from != nil {
shared.sendEvent(ctx, events.BlockReceived(
br.clock.Now(),
br.request.RetrievalID,
types.RetrievalCandidate{RootCid: br.request.Root, MinerPeer: peer.AddrInfo{ID: *from}},
multicodec.TransportBitswap,
bytesWritten,
))
}
// reset the timer
if bytesWritten > 0 && lastBytesReceivedTimer != nil {
lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout)
Expand Down Expand Up @@ -253,12 +261,12 @@ func (br *bitswapRetrieval) runRetrieval(ctx context.Context, ayncCandidates typ

br.bstore.AddLinkSystem(
br.request.RetrievalID,
bitswaphelpers.NewByteCountingLinkSystem(storage.BitswapLinkSystem, bytesWrittenCb),
bitswaphelpers.NewByteCountingLinkSystem(storage.BitswapLinkSystem, blockWrittenCb),
)
} else {
br.bstore.AddLinkSystem(
br.request.RetrievalID,
bitswaphelpers.NewByteCountingLinkSystem(&br.request.LinkSystem, bytesWrittenCb),
bitswaphelpers.NewByteCountingLinkSystem(&br.request.LinkSystem, blockWrittenCb),
)
traversalLinkSys.StorageReadOpener = loader
}
Expand Down Expand Up @@ -347,17 +355,6 @@ func (br *bitswapRetrieval) loader(ctx context.Context, shared *retrievalShared)
return nil, err
}
logger.Debugw("Got block from bitswap", "retrievalID", br.request.RetrievalID, "root", br.request.Root, "block", cidLink.Cid, "size", len(blk.RawData()))
if traceableBlock, ok := blk.(traceability.Block); ok {
shared.sendEvent(ctx, events.BlockReceived(
br.clock.Now(),
br.request.RetrievalID,
types.RetrievalCandidate{RootCid: br.request.Root, MinerPeer: peer.AddrInfo{ID: traceableBlock.From}},
multicodec.TransportBitswap,
uint64(len(blk.RawData()))),
)
} else {
logger.Warn("Got untraceable block from bitswap")
}
return bytes.NewReader(blk.RawData()), nil
}
}

0 comments on commit 98486ac

Please sign in to comment.