Skip to content

Commit

Permalink
feat(http): single peer http retrieval unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Apr 28, 2023
1 parent 7030fa6 commit 5a55cd0
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/itest/http_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func TestHttpFetch(t *testing.T) {
}},
},
{
name: "bitswap large sharded file",
name: "bitswap large sharded file, fixedPeer",
bitswapRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
fileEntry := unixfs.GenerateFile(t, &remotes[0].LinkSystem, rndReader, 4<<20)
Expand Down
14 changes: 12 additions & 2 deletions pkg/retriever/graphsyncretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ type ProtocolGraphsync struct {
// NewGraphsyncRetriever makes a new CandidateRetriever for Graphsync retrievals
// (transport-graphsync-filecoinv1).
func NewGraphsyncRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client GraphsyncClient) types.CandidateRetriever {
return NewGraphsyncRetrieverWithConfig(getStorageProviderTimeout, client, clock.New(), 2*time.Millisecond)
}

func NewGraphsyncRetrieverWithConfig(
getStorageProviderTimeout GetStorageProviderTimeout,
client GraphsyncClient,
clock clock.Clock,
initialPause time.Duration,
) types.CandidateRetriever {

return &parallelPeerRetriever{
Protocol: &ProtocolGraphsync{
Client: client,
},
GetStorageProviderTimeout: getStorageProviderTimeout,
Clock: clock.New(),
QueueInitialPause: 2 * time.Millisecond,
Clock: clock,
QueueInitialPause: initialPause,
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package retriever
package retriever_test

import (
"context"
Expand All @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lassie/pkg/events"
"github.com/filecoin-project/lassie/pkg/internal/testutil"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -509,9 +510,7 @@ func TestRetrievalRacing(t *testing.T) {
}
candidates = append(candidates, types.NewRetrievalCandidate(peer.ID(p), cid.Undef, protocol))
}
cfg := NewGraphsyncRetriever(func(peer peer.ID) time.Duration { return time.Second }, mockClient)
cfg.(*parallelPeerRetriever).Clock = clock
cfg.(*parallelPeerRetriever).QueueInitialPause = initialPause
cfg := retriever.NewGraphsyncRetrieverWithConfig(func(peer peer.ID) time.Duration { return time.Second }, mockClient, clock, initialPause)

rv := testutil.RetrievalVerifier{
ExpectedSequence: tc.expectSequence,
Expand Down Expand Up @@ -576,9 +575,7 @@ func TestMultipleRetrievals(t *testing.T) {
clock,
)

cfg := NewGraphsyncRetriever(func(peer peer.ID) time.Duration { return time.Second }, mockClient)
cfg.(*parallelPeerRetriever).Clock = clock
cfg.(*parallelPeerRetriever).QueueInitialPause = initialPause
cfg := retriever.NewGraphsyncRetrieverWithConfig(func(peer peer.ID) time.Duration { return time.Second }, mockClient, clock, initialPause)

expectedSequence := []testutil.ExpectedActionsAtTime{
{
Expand Down Expand Up @@ -689,14 +686,13 @@ func TestRetrievalSelector(t *testing.T) {
defer cancel()
retrievalID := types.RetrievalID(uuid.New())
cid1 := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
clock := clock.New()
mockClient := testutil.NewMockClient(
map[string]testutil.DelayedConnectReturn{"foo": {Err: nil, Delay: 0}},
map[string]testutil.DelayedClientReturn{"foo": {ResultStats: &types.RetrievalStats{StorageProviderId: peer.ID("bar"), Size: 2}, Delay: 0}},
clock,
clock.New(),
)

cfg := NewGraphsyncRetriever(func(peer peer.ID) time.Duration { return time.Second }, mockClient)
cfg := retriever.NewGraphsyncRetriever(func(peer peer.ID) time.Duration { return time.Second }, mockClient)

selector := selectorparse.CommonSelector_MatchPoint

Expand Down Expand Up @@ -741,9 +737,7 @@ func TestDuplicateRetreivals(t *testing.T) {
clock,
)

cfg := NewGraphsyncRetriever(func(peer peer.ID) time.Duration { return time.Second }, mockClient)
cfg.(*parallelPeerRetriever).Clock = clock
cfg.(*parallelPeerRetriever).QueueInitialPause = initialPause
cfg := retriever.NewGraphsyncRetrieverWithConfig(func(peer peer.ID) time.Duration { return time.Second }, mockClient, clock, initialPause)

expectedSequence := []testutil.ExpectedActionsAtTime{
{
Expand Down
28 changes: 10 additions & 18 deletions pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ type ProtocolHttp struct {
// NewHttpRetriever makes a new CandidateRetriever for verified CAR HTTP
// retrievals (transport-ipfs-gateway-http).
func NewHttpRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client) types.CandidateRetriever {
return NewHttpRetrieverWithClock(getStorageProviderTimeout, client, clock.New())
}

func NewHttpRetrieverWithClock(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client, clock clock.Clock) types.CandidateRetriever {
return &parallelPeerRetriever{
Protocol: &ProtocolHttp{
Client: client,
},
GetStorageProviderTimeout: getStorageProviderTimeout,
Clock: clock.New(),
QueueInitialPause: 2 * time.Millisecond,
Clock: clock,
}
}

Expand Down Expand Up @@ -92,7 +95,6 @@ func (ph *ProtocolHttp) Retrieve(

func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (*http.Request, error) {
candidateURL, err := candidate.ToURL()
fmt.Println("candidateURL", candidateURL, "err", err, "candidate", candidate)
if err != nil {
log.Warnf("Couldn't construct a url for miner %s: %v", candidate.MinerPeer.ID, err)
return nil, fmt.Errorf("%w: %v", ErrNoHttpForPeer, err)
Expand All @@ -117,8 +119,8 @@ func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate

func readBody(rootCid cid.Cid, peerId peer.ID, body io.ReadCloser, writer linking.BlockWriteOpener) (*types.RetrievalStats, error) {
startTime := time.Now() // TODO: consider whether this should be at connection time rather than body read time
cr := &countingReader{Reader: body}
cbr, err := car.NewBlockReader(cr)
var blockBytes uint64
cbr, err := car.NewBlockReader(body)
if err != nil {
return nil, err
}
Expand All @@ -141,6 +143,7 @@ func readBody(rootCid cid.Cid, peerId peer.ID, body io.ReadCloser, writer linkin
if err != nil {
return nil, err
}
blockBytes += uint64(len(blk.RawData()))
err = d(cidlink.Link{Cid: blk.Cid()})
if err != nil {
return nil, err
Expand All @@ -149,12 +152,12 @@ func readBody(rootCid cid.Cid, peerId peer.ID, body io.ReadCloser, writer linkin
}

duration := time.Since(startTime)
speed := uint64(float64(cr.total) / duration.Seconds())
speed := uint64(float64(blockBytes) / duration.Seconds())

return &types.RetrievalStats{
RootCid: rootCid,
StorageProviderId: peerId,
Size: cr.total,
Size: blockBytes,
Blocks: blockCount,
Duration: duration,
AverageSpeed: speed,
Expand All @@ -164,14 +167,3 @@ func readBody(rootCid cid.Cid, peerId peer.ID, body io.ReadCloser, writer linkin
TimeToFirstByte: ttfb,
}, nil
}

type countingReader struct {
io.Reader
total uint64
}

func (cr *countingReader) Read(p []byte) (n int, err error) {
n, err = cr.Reader.Read(p)
cr.total += uint64(n)
return
}
Loading

0 comments on commit 5a55cd0

Please sign in to comment.