diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 3be25d7b..9e647ff3 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -36,6 +36,7 @@ import ( "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -324,6 +325,47 @@ func TestGraphsyncRoundTrip(t *testing.T) { } } +func TestGraphsyncIdentityCIDRoundTrip(t *testing.T) { + // create network + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + identityDag := testutil.SetupIdentityDAG(ctx, t, td.persistence2) + + // initialize graphsync on second node to respond to requests + responder := td.GraphSyncHost2() + assertComplete := assertCompletionFunction(responder, 1) + + responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + + finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1) + responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { + select { + case finalResponseStatusChan <- status: + default: + } + }) + progressChan, errChan := requestor.Request(ctx, td.host2.ID(), identityDag.RootLink, selectorparse.CommonSelector_ExploreAllRecursively) + + identityDag.VerifyWholeDAG(ctx, progressChan) + testutil.VerifyEmptyErrors(ctx, t, errChan) + require.Len(t, td.blockStore1, len(identityDag.AllLinks), "did not store all blocks") + + // verify listener + var finalResponseStatus graphsync.ResponseStatusCode + testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") + require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) + + drain(requestor) + drain(responder) + assertComplete(ctx, t) +} + func TestGraphsyncRoundTripPartial(t *testing.T) { // create network diff --git a/ipldutil/traverser_test.go b/ipldutil/traverser_test.go index 6c10805e..10dd9696 100644 --- a/ipldutil/traverser_test.go +++ b/ipldutil/traverser_test.go @@ -11,11 +11,14 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/stretchr/testify/require" graphsync "github.com/filecoin-project/boost-graphsync" @@ -165,16 +168,50 @@ func TestTraverser(t *testing.T) { _, _ = traverser.CurrentRequest() } }) + + t.Run("traverses correctly, DAG with identity CID in the middle", func(t *testing.T) { + store := make(map[ipld.Link][]byte) + persistence := testutil.NewTestStore(store) + identityDag := testutil.SetupIdentityDAG(ctx, t, persistence) + inProgressChan := make(chan graphsync.ResponseProgress) + done := make(chan struct{}) + traverser := TraversalBuilder{ + Root: identityDag.RootLink, + Selector: selectorparse.CommonSelector_ExploreAllRecursively, + Chooser: func(l datamodel.Link, lc linking.LinkContext) (datamodel.NodePrototype, error) { + return basicnode.Prototype.Any, nil + }, + LinkSystem: persistence, + Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error { + select { + case <-ctx.Done(): + case inProgressChan <- graphsync.ResponseProgress{ + Node: node, + Path: tp.Path, + LastBlock: tp.LastBlock, + }: + } + return nil + }, + }.Start(ctx) + go func() { + identityDag.VerifyWholeDAG(ctx, inProgressChan) + close(done) + }() + checkTraverseSequence(ctx, t, traverser, identityDag.AllBlocks(), nil) + close(inProgressChan) + testutil.AssertDoesReceive(ctx, t, done, "should have completed verification but did not") + }) } func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block, finalErr error) { t.Helper() - for _, blk := range expectedBlks { + for ii, blk := range expectedBlks { isComplete, err := traverser.IsComplete() require.False(t, isComplete) require.NoError(t, err) lnk, _ := traverser.CurrentRequest() - require.Equal(t, lnk.(cidlink.Link).Cid, blk.Cid()) + require.Equal(t, lnk.(cidlink.Link).Cid.String(), blk.Cid().String(), fmt.Sprintf("unexpected CID @ block %d", ii)) err = traverser.Advance(bytes.NewBuffer(blk.RawData())) require.NoError(t, err) } diff --git a/testutil/identity.go b/testutil/identity.go new file mode 100644 index 00000000..d83b55f0 --- /dev/null +++ b/testutil/identity.go @@ -0,0 +1,197 @@ +package testutil + +import ( + "context" + "fmt" + "io" + "testing" + + graphsync "github.com/filecoin-project/boost-graphsync" + _ "github.com/ipld/go-ipld-prime/codec/raw" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/fluent/qp" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +// roughly duplicated in github.com/ipld/go-trustless-utils/testutil + +type TestIdentityDAG struct { + t testing.TB + loader ipld.BlockReadOpener + + RootLink ipld.Link + AllLinks []ipld.Link +} + +/* ugly, but it makes a DAG with paths that look like this but doesn't involved dag-pb or unixfs */ +var identityDagPaths = []string{ + "", + "a/!foo", + "a/b/!bar", + "a/b/c/!baz/identity jump", + "a/b/c/!baz/identity jump/these are my children/blip", + "a/b/c/!baz/identity jump/these are my children/bloop", + "a/b/c/!baz/identity jump/these are my children/bloop/ leaf ", + "a/b/c/!baz/identity jump/these are my children/blop", + "a/b/c/!baz/identity jump/these are my children/leaf", + "a/b/c/d/!leaf", +} + +func SetupIdentityDAG( + ctx context.Context, + t testing.TB, + lsys ipld.LinkSystem) *TestIdentityDAG { + + allLinks := make([]ipld.Link, 0) + store := func(lp datamodel.LinkPrototype, n datamodel.Node) datamodel.Link { + l, err := lsys.Store(linking.LinkContext{}, lp, n) + require.NoError(t, err) + allLinks = append(allLinks, l) + return l + } + + rootLeaf := store(rawlp, basicnode.NewBytes([]byte("leaf node in the root"))) + bazLeaf := store(rawlp, basicnode.NewBytes([]byte("leaf node in baz"))) + blop := store(djlp, must(qp.BuildList(basicnode.Prototype.Any, -1, func(la datamodel.ListAssembler) { + qp.ListEntry(la, qp.Int(100)) + qp.ListEntry(la, qp.Int(200)) + qp.ListEntry(la, qp.Int(300)) + }))(t)) + bloopLeaf := store(rawlp, basicnode.NewBytes([]byte("leaf node in bloop"))) + bloop := store(djlp, must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "desc", qp.List(-1, func(la datamodel.ListAssembler) { + qp.ListEntry(la, qp.String("this")) + qp.ListEntry(la, qp.String("is")) + qp.ListEntry(la, qp.String("bloop")) + })) + qp.MapEntry(ma, " leaf ", qp.Link(bloopLeaf)) + }))(t)) + blip := store(djlp, basicnode.NewString("blip!")) + baz := store(djlp, must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "desc", qp.List(-1, func(la datamodel.ListAssembler) { + qp.ListEntry(la, qp.String("this")) + qp.ListEntry(la, qp.String("is")) + qp.ListEntry(la, qp.String("baz")) + })) + qp.MapEntry(ma, "these are my children", qp.Map(-1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "blip", qp.Link(blip)) + qp.MapEntry(ma, "bloop", qp.Link(bloop)) + qp.MapEntry(ma, "blop", qp.Link(blop)) + qp.MapEntry(ma, "leaf", qp.Link(bazLeaf)) + })) + }))(t)) + var bazIdent datamodel.Link + { + // not stored, shouldn't count as a block + ident := must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "identity jump", qp.Link(baz)) + }))(t) + identBytes := must(ipld.Encode(ident, dagjson.Encode))(t) + mh := must(multihash.Sum(identBytes, multihash.IDENTITY, len(identBytes)))(t) + bazIdent = cidlink.Link{Cid: cid.NewCidV1(cid.DagJSON, mh)} + } + bar := store(djlp, basicnode.NewInt(2020202020202020)) + foo := store(djlp, basicnode.NewInt(1010101010101010)) + root := store(djlp, must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "a", qp.Map(-1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "b", qp.Map(-1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "c", qp.Map(-1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "d", qp.Map(-1, func(ma datamodel.MapAssembler) { + qp.MapEntry(ma, "!leaf", qp.Link(rootLeaf)) + })) + qp.MapEntry(ma, "!baz", qp.Link(bazIdent)) + })) + qp.MapEntry(ma, "!bar", qp.Link(bar)) + })) + qp.MapEntry(ma, "!foo", qp.Link(foo)) + })) + }))(t)) + + return &TestIdentityDAG{ + t: t, + loader: lsys.StorageReadOpener, + RootLink: root, + AllLinks: reverse(allLinks), // TODO: slices.Reverse post 1.21 + } +} + +func (tid *TestIdentityDAG) AllBlocks() []blocks.Block { + blks := make([]blocks.Block, len(tid.AllLinks)) + for ii, link := range tid.AllLinks { + reader, err := tid.loader(ipld.LinkContext{}, link) + require.NoError(tid.t, err) + data, err := io.ReadAll(reader) + require.NoError(tid.t, err) + blk, err := blocks.NewBlockWithCid(data, link.(cidlink.Link).Cid) + require.NoError(tid.t, err) + blks[ii] = blk + } + return blks +} + +// VerifyWholeDAGWithTypes verifies the given response channel returns the expected responses for the whole DAG +// and that the types in the response are the expected types for the DAG +func (tid *TestIdentityDAG) VerifyWholeDAG(ctx context.Context, responseChan <-chan graphsync.ResponseProgress) { + responses := CollectResponses(ctx, tid.t, responseChan) + tid.checkResponses(responses) +} + +func (tid *TestIdentityDAG) checkResponses(responses []graphsync.ResponseProgress) { + var pathIndex int + for ii, response := range responses { + // only check the paths that have links, assume the rest are just describing + // the non-link nodes of the DAG + if response.Path.String() == identityDagPaths[pathIndex] { + if response.LastBlock.Link != nil { + expectedLink := tid.AllLinks[pathIndex] + require.Equal(tid.t, expectedLink.String(), response.LastBlock.Link.String(), fmt.Sprintf("response %d has correct link (%d)", ii, pathIndex)) + } + pathIndex++ + } + } + require.Equal(tid.t, len(identityDagPaths), pathIndex, "traverses all nodes") +} + +func must[T any](v T, err error) func(t testing.TB) T { + return func(t testing.TB) T { + t.Helper() + if err != nil { + t.Fatal(err) + } + return v + } +} + +var djlp = cidlink.LinkPrototype{ + Prefix: cid.Prefix{ + Version: 1, + Codec: cid.DagJSON, + MhType: multihash.SHA2_256, + MhLength: 32, + }, +} + +var rawlp = cidlink.LinkPrototype{ + Prefix: cid.Prefix{ + Version: 1, + Codec: cid.Raw, + MhType: multihash.SHA2_256, + MhLength: 32, + }, +} + +func reverse[S ~[]E, E any](s S) S { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } + return s +} diff --git a/testutil/testutil.go b/testutil/testutil.go index e4de8a2c..bf8fe8ad 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -209,11 +209,11 @@ func VerifyEmptyErrors(ctx context.Context, t testing.TB, errChan <-chan error) t.Helper() for { select { - case _, ok := <-errChan: + case err, ok := <-errChan: if !ok { return } - t.Fatal("errors were sent but shouldn't have been") + t.Fatalf("errors were sent but shouldn't have been: %s", err) case <-ctx.Done(): t.Fatal("errors channel never closed") }