Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: traverser should skip over identity CIDs #437

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -355,6 +356,47 @@ func TestGraphsyncRoundTrip(t *testing.T) {
}
}

func TestGraphsyncIdentityCIDRoundTrip(t *testing.T) {
// create network
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()
identityDag := testutil.SetupIdentityDAG(ctx, t, td.persistence2)

// initialize graphsync on second node to respond to requests
responder := td.GraphSyncHost2()
assertComplete := assertCompletionFunction(responder, 1)

responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), identityDag.RootLink, selectorparse.CommonSelector_ExploreAllRecursively)

identityDag.VerifyWholeDAG(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, len(identityDag.AllLinks), "did not store all blocks")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

drain(requestor)
drain(responder)
assertComplete(ctx, t)
}

func TestGraphsyncRoundTripHooksOrder(t *testing.T) {

// create network
Expand Down
19 changes: 19 additions & 0 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ipldutil

import (
"bytes"
"context"
"errors"
"io"
Expand All @@ -12,7 +13,9 @@
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/multiformats/go-multihash"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/panics"
)

Expand Down Expand Up @@ -165,7 +168,23 @@
return t.blocksCount
}

func asIdentity(c cid.Cid) (digest []byte, ok bool, err error) {
dmh, err := multihash.Decode(c.Hash())
if err != nil {
return nil, false, err
}

Check warning on line 175 in ipldutil/traverser.go

View check run for this annotation

Codecov / codecov/patch

ipldutil/traverser.go#L174-L175

Added lines #L174 - L175 were not covered by tests
ok = dmh.Code == multihash.IDENTITY
digest = dmh.Digest
return digest, ok, nil
}

func (t *traverser) loader(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
if digest, ok, err := asIdentity(lnk.(cidlink.Link).Cid); ok {
return io.NopCloser(bytes.NewReader(digest)), nil
} else if err != nil {
return nil, err
}

Check warning on line 186 in ipldutil/traverser.go

View check run for this annotation

Codecov / codecov/patch

ipldutil/traverser.go#L185-L186

Added lines #L185 - L186 were not covered by tests

// A StorageReadOpener call came in; update the state and release the lock.
// We can't simply unlock the mutex inside the <-t.responses case,
// as then we'd deadlock with the other side trying to send.
Expand Down
41 changes: 39 additions & 2 deletions ipldutil/traverser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
Expand Down Expand Up @@ -165,16 +168,50 @@ func TestTraverser(t *testing.T) {
_, _ = traverser.CurrentRequest()
}
})

t.Run("traverses correctly, DAG with identity CID in the middle", func(t *testing.T) {
store := make(map[ipld.Link][]byte)
persistence := testutil.NewTestStore(store)
identityDag := testutil.SetupIdentityDAG(ctx, t, persistence)
inProgressChan := make(chan graphsync.ResponseProgress)
done := make(chan struct{})
traverser := TraversalBuilder{
Root: identityDag.RootLink,
Selector: selectorparse.CommonSelector_ExploreAllRecursively,
Chooser: func(l datamodel.Link, lc linking.LinkContext) (datamodel.NodePrototype, error) {
return basicnode.Prototype.Any, nil
},
LinkSystem: persistence,
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
select {
case <-ctx.Done():
case inProgressChan <- graphsync.ResponseProgress{
Node: node,
Path: tp.Path,
LastBlock: tp.LastBlock,
}:
}
return nil
},
}.Start(ctx)
go func() {
identityDag.VerifyWholeDAG(ctx, inProgressChan)
close(done)
}()
checkTraverseSequence(ctx, t, traverser, identityDag.AllBlocks(), nil)
close(inProgressChan)
testutil.AssertDoesReceive(ctx, t, done, "should have completed verification but did not")
})
}

func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block, finalErr error) {
t.Helper()
for _, blk := range expectedBlks {
for ii, blk := range expectedBlks {
isComplete, err := traverser.IsComplete()
require.False(t, isComplete)
require.NoError(t, err)
lnk, _ := traverser.CurrentRequest()
require.Equal(t, lnk.(cidlink.Link).Cid, blk.Cid())
require.Equal(t, lnk.(cidlink.Link).Cid.String(), blk.Cid().String(), fmt.Sprintf("unexpected CID @ block %d", ii))
err = traverser.Advance(bytes.NewBuffer(blk.RawData()))
require.NoError(t, err)
}
Expand Down
197 changes: 197 additions & 0 deletions testutil/identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package testutil

import (
"context"
"fmt"
"io"
"testing"

_ "github.com/ipld/go-ipld-prime/codec/raw"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/fluent/qp"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

// roughly duplicated in github.com/ipld/go-trustless-utils/testutil

type TestIdentityDAG struct {
t testing.TB
loader ipld.BlockReadOpener

RootLink ipld.Link
AllLinks []ipld.Link
}

/* ugly, but it makes a DAG with paths that look like this but doesn't involved dag-pb or unixfs */
var identityDagPaths = []string{
"",
"a/!foo",
"a/b/!bar",
"a/b/c/!baz/identity jump",
"a/b/c/!baz/identity jump/these are my children/blip",
"a/b/c/!baz/identity jump/these are my children/bloop",
"a/b/c/!baz/identity jump/these are my children/bloop/ leaf ",
"a/b/c/!baz/identity jump/these are my children/blop",
"a/b/c/!baz/identity jump/these are my children/leaf",
"a/b/c/d/!leaf",
}

func SetupIdentityDAG(
ctx context.Context,
t testing.TB,
lsys ipld.LinkSystem) *TestIdentityDAG {

allLinks := make([]ipld.Link, 0)
store := func(lp datamodel.LinkPrototype, n datamodel.Node) datamodel.Link {
l, err := lsys.Store(linking.LinkContext{}, lp, n)
require.NoError(t, err)
allLinks = append(allLinks, l)
return l
}

rootLeaf := store(rawlp, basicnode.NewBytes([]byte("leaf node in the root")))
bazLeaf := store(rawlp, basicnode.NewBytes([]byte("leaf node in baz")))
blop := store(djlp, must(qp.BuildList(basicnode.Prototype.Any, -1, func(la datamodel.ListAssembler) {
qp.ListEntry(la, qp.Int(100))
qp.ListEntry(la, qp.Int(200))
qp.ListEntry(la, qp.Int(300))
}))(t))
bloopLeaf := store(rawlp, basicnode.NewBytes([]byte("leaf node in bloop")))
bloop := store(djlp, must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "desc", qp.List(-1, func(la datamodel.ListAssembler) {
qp.ListEntry(la, qp.String("this"))
qp.ListEntry(la, qp.String("is"))
qp.ListEntry(la, qp.String("bloop"))
}))
qp.MapEntry(ma, " leaf ", qp.Link(bloopLeaf))
}))(t))
blip := store(djlp, basicnode.NewString("blip!"))
baz := store(djlp, must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "desc", qp.List(-1, func(la datamodel.ListAssembler) {
qp.ListEntry(la, qp.String("this"))
qp.ListEntry(la, qp.String("is"))
qp.ListEntry(la, qp.String("baz"))
}))
qp.MapEntry(ma, "these are my children", qp.Map(-1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "blip", qp.Link(blip))
qp.MapEntry(ma, "bloop", qp.Link(bloop))
qp.MapEntry(ma, "blop", qp.Link(blop))
qp.MapEntry(ma, "leaf", qp.Link(bazLeaf))
}))
}))(t))
var bazIdent datamodel.Link
{
// not stored, shouldn't count as a block
ident := must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "identity jump", qp.Link(baz))
}))(t)
identBytes := must(ipld.Encode(ident, dagjson.Encode))(t)
mh := must(multihash.Sum(identBytes, multihash.IDENTITY, len(identBytes)))(t)
bazIdent = cidlink.Link{Cid: cid.NewCidV1(cid.DagJSON, mh)}
}
bar := store(djlp, basicnode.NewInt(2020202020202020))
foo := store(djlp, basicnode.NewInt(1010101010101010))
root := store(djlp, must(qp.BuildMap(basicnode.Prototype.Any, -1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "a", qp.Map(-1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "b", qp.Map(-1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "c", qp.Map(-1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "d", qp.Map(-1, func(ma datamodel.MapAssembler) {
qp.MapEntry(ma, "!leaf", qp.Link(rootLeaf))
}))
qp.MapEntry(ma, "!baz", qp.Link(bazIdent))
}))
qp.MapEntry(ma, "!bar", qp.Link(bar))
}))
qp.MapEntry(ma, "!foo", qp.Link(foo))
}))
}))(t))

return &TestIdentityDAG{
t: t,
loader: lsys.StorageReadOpener,
RootLink: root,
AllLinks: reverse(allLinks), // TODO: slices.Reverse post 1.21
}
}

func (tid *TestIdentityDAG) AllBlocks() []blocks.Block {
blks := make([]blocks.Block, len(tid.AllLinks))
for ii, link := range tid.AllLinks {
reader, err := tid.loader(ipld.LinkContext{}, link)
require.NoError(tid.t, err)
data, err := io.ReadAll(reader)
require.NoError(tid.t, err)
blk, err := blocks.NewBlockWithCid(data, link.(cidlink.Link).Cid)
require.NoError(tid.t, err)
blks[ii] = blk
}
return blks
}

// VerifyWholeDAGWithTypes verifies the given response channel returns the expected responses for the whole DAG
// and that the types in the response are the expected types for the DAG
func (tid *TestIdentityDAG) VerifyWholeDAG(ctx context.Context, responseChan <-chan graphsync.ResponseProgress) {
responses := CollectResponses(ctx, tid.t, responseChan)
tid.checkResponses(responses)
}

func (tid *TestIdentityDAG) checkResponses(responses []graphsync.ResponseProgress) {
var pathIndex int
for ii, response := range responses {
// only check the paths that have links, assume the rest are just describing
// the non-link nodes of the DAG
if response.Path.String() == identityDagPaths[pathIndex] {
if response.LastBlock.Link != nil {
expectedLink := tid.AllLinks[pathIndex]
require.Equal(tid.t, expectedLink.String(), response.LastBlock.Link.String(), fmt.Sprintf("response %d has correct link (%d)", ii, pathIndex))
}
pathIndex++
}
}
require.Equal(tid.t, len(identityDagPaths), pathIndex, "traverses all nodes")
}

func must[T any](v T, err error) func(t testing.TB) T {
return func(t testing.TB) T {
t.Helper()
if err != nil {
t.Fatal(err)
}

Check warning on line 169 in testutil/identity.go

View check run for this annotation

Codecov / codecov/patch

testutil/identity.go#L168-L169

Added lines #L168 - L169 were not covered by tests
return v
}
}

var djlp = cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: cid.DagJSON,
MhType: multihash.SHA2_256,
MhLength: 32,
},
}

var rawlp = cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.SHA2_256,
MhLength: 32,
},
}

func reverse[S ~[]E, E any](s S) S {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}
4 changes: 2 additions & 2 deletions testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@
t.Helper()
for {
select {
case _, ok := <-errChan:
case err, ok := <-errChan:
if !ok {
return
}
t.Fatal("errors were sent but shouldn't have been")
t.Fatalf("errors were sent but shouldn't have been: %s", err)

Check warning on line 216 in testutil/testutil.go

View check run for this annotation

Codecov / codecov/patch

testutil/testutil.go#L216

Added line #L216 was not covered by tests
case <-ctx.Done():
t.Fatal("errors channel never closed")
}
Expand Down