diff --git a/pkg/retriever/bitswapretriever.go b/pkg/retriever/bitswapretriever.go index 768ad190..a94055c0 100644 --- a/pkg/retriever/bitswapretriever.go +++ b/pkg/retriever/bitswapretriever.go @@ -166,21 +166,6 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb }) } - totalWritten := atomic.Uint64{} - blockCount := atomic.Uint64{} - bytesWrittenCb := func(bytesWritten uint64) { - // record first byte received - if totalWritten.Load() == 0 { - br.events(events.FirstByte(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, br.clock.Since(startTime), multicodec.TransportBitswap)) - } - totalWritten.Add(bytesWritten) - blockCount.Add(1) - // reset the timer - if bytesWritten > 0 && lastBytesReceivedTimer != nil { - lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout) - } - } - // setup providers for this retrieval hasCandidates, nextCandidates, err := ayncCandidates.Next(retrievalCtx) if !hasCandidates || err != nil { @@ -189,7 +174,8 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb return nil, nil } - br.events(events.StartedRetrieval(br.clock.Now(), br.request.RetrievalID, bitswapCandidate, multicodec.TransportBitswap)) + firstCandidatesTime := br.clock.Now() + br.events(events.StartedRetrieval(firstCandidatesTime, br.request.RetrievalID, bitswapCandidate, multicodec.TransportBitswap)) // set initial providers, then start a goroutine to add more as they come in br.routing.AddProviders(br.request.RetrievalID, nextCandidates) @@ -209,6 +195,35 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb } }() + totalWritten := atomic.Uint64{} + blockCount := atomic.Uint64{} + ttfb := atomic.Int64{} + bytesWrittenCb := func(bytesWritten uint64) { + // Record first byte received, this is on a per-protocol basis for Bitswap, + // individual providers (currently) don't get one of these so we take the + // duration from the time we started collecting candidates. + // If we end up taking responsibility for dialing peers, and end up with + // first-byte events per peer, we could move the first-byte duration up to + // the post-connect time to match http and graphsync (alternatively) + if totalWritten.Load() == 0 { + ttfbD := br.clock.Since(firstCandidatesTime) + ttfb.Store(int64(ttfbD)) + br.events(events.FirstByte( + br.clock.Now(), + br.request.RetrievalID, + bitswapCandidate, + ttfbD, + multicodec.TransportBitswap, + )) + } + totalWritten.Add(bytesWritten) + blockCount.Add(1) + // reset the timer + if bytesWritten > 0 && lastBytesReceivedTimer != nil { + lastBytesReceivedTimer.Reset(br.cfg.BlockTimeout) + } + } + // set up the storage system, including the preloader if configured var preloader preload.Loader traversalLinkSys := br.request.LinkSystem @@ -300,6 +315,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb TotalPayment: big.Zero(), NumPayments: 0, AskPrice: big.Zero(), + TimeToFirstByte: time.Duration(ttfb.Load()), }, nil } diff --git a/pkg/retriever/bitswapretriever_test.go b/pkg/retriever/bitswapretriever_test.go index 49ba1301..e96cc6cf 100644 --- a/pkg/retriever/bitswapretriever_test.go +++ b/pkg/retriever/bitswapretriever_test.go @@ -95,22 +95,24 @@ func TestBitswapRetriever(t *testing.T) { }, expectedStats: map[cid.Cid]*types.RetrievalStats{ cid1: { - RootCid: cid1, - Size: sizeOf(tbc1.AllBlocks()), - Blocks: 100, - Duration: remoteBlockDuration * 100, - AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + RootCid: cid1, + Size: sizeOf(tbc1.AllBlocks()), + Blocks: 100, + Duration: remoteBlockDuration * 100, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + TimeToFirstByte: remoteBlockDuration, }, cid2: { - RootCid: cid2, - Size: sizeOf(tbc2.AllBlocks()), - Blocks: 100, - Duration: remoteBlockDuration * 100, - AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + RootCid: cid2, + Size: sizeOf(tbc2.AllBlocks()), + Blocks: 100, + Duration: remoteBlockDuration * 100, + AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + TimeToFirstByte: remoteBlockDuration, }, }, }, @@ -141,22 +143,24 @@ func TestBitswapRetriever(t *testing.T) { }, expectedStats: map[cid.Cid]*types.RetrievalStats{ cid1: { - RootCid: cid1, - Size: sizeOf(tbc1.Blocks(50, 100)), - Blocks: 50, - Duration: remoteBlockDuration * 50, - AverageSpeed: uint64(float64(sizeOf(tbc1.Blocks(50, 100))) / (remoteBlockDuration * 50).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + RootCid: cid1, + Size: sizeOf(tbc1.Blocks(50, 100)), + Blocks: 50, + Duration: remoteBlockDuration * 50, + AverageSpeed: uint64(float64(sizeOf(tbc1.Blocks(50, 100))) / (remoteBlockDuration * 50).Seconds()), + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + TimeToFirstByte: remoteBlockDuration, }, cid2: { - RootCid: cid2, - Size: sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...)), - Blocks: 45, - Duration: remoteBlockDuration * 45, - AverageSpeed: uint64(float64(sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...))) / (remoteBlockDuration * 45).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + RootCid: cid2, + Size: sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...)), + Blocks: 45, + Duration: remoteBlockDuration * 45, + AverageSpeed: uint64(float64(sizeOf(append(tbc2.Blocks(25, 45), tbc2.Blocks(75, 100)...))) / (remoteBlockDuration * 45).Seconds()), + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + TimeToFirstByte: remoteBlockDuration, }, }, }, @@ -185,22 +189,24 @@ func TestBitswapRetriever(t *testing.T) { }, expectedStats: map[cid.Cid]*types.RetrievalStats{ cid1: { - RootCid: cid1, - Size: sizeOf(tbc1.AllBlocks()[:5]), - Blocks: 5, - Duration: remoteBlockDuration * 5, - AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + RootCid: cid1, + Size: sizeOf(tbc1.AllBlocks()[:5]), + Blocks: 5, + Duration: remoteBlockDuration * 5, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()), + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + TimeToFirstByte: remoteBlockDuration, }, cid2: { - RootCid: cid2, - Size: sizeOf(tbc2.AllBlocks()[:5]), - Blocks: 5, - Duration: remoteBlockDuration * 5, - AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + RootCid: cid2, + Size: sizeOf(tbc2.AllBlocks()[:5]), + Blocks: 5, + Duration: remoteBlockDuration * 5, + AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks()[:5])) / (remoteBlockDuration * 5).Seconds()), + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + TimeToFirstByte: remoteBlockDuration, }, }, },