Skip to content

Commit

Permalink
relations: Sync relations interface with client and pool
Browse files Browse the repository at this point in the history
close #416

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jul 6, 2023
1 parent 7edf97f commit 9ec17c9
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 54 deletions.
23 changes: 13 additions & 10 deletions audit/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package audit

import (
"context"
"errors"
"fmt"

"github.com/nspcc-dev/neofs-sdk-go/checksum"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -13,16 +15,11 @@ import (
"github.com/nspcc-dev/tzhash/tz"
)

type Collector interface {
Head(ctx context.Context, addr oid.Address) (*object.Object, error)
relations.Relations
}

// CollectMembers creates new storage group structure and fills it
// with information about members collected via HeadReceiver.
//
// Resulting storage group consists of physically stored objects only.
func CollectMembers(ctx context.Context, collector Collector, cnr cid.ID, members []oid.ID, tokens relations.Tokens, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
func CollectMembers(ctx context.Context, collector relations.Executor, cnr cid.ID, members []oid.ID, tokens relations.Tokens, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
var (
err error
sumPhySize uint64
Expand All @@ -35,19 +32,25 @@ func CollectMembers(ctx context.Context, collector Collector, cnr cid.ID, member
addr.SetContainer(cnr)

for i := range members {
if phyMembers, err = relations.ListRelations(ctx, collector, cnr, members[i], tokens, false); err != nil {
if phyMembers, _, err = relations.Get(ctx, collector, cnr, members[i], tokens); err != nil {
return nil, err
}

var prmHead client.PrmObjectHead
for _, phyMember := range phyMembers {
addr.SetObject(phyMember)
leaf, err := collector.Head(ctx, addr)
leaf, err := collector.ObjectHead(ctx, addr.Container(), addr.Object(), prmHead)
if err != nil {
return nil, fmt.Errorf("head phy member '%s': %w", phyMember.EncodeToString(), err)
}

sumPhySize += leaf.PayloadSize()
cs, _ := leaf.PayloadHomomorphicHash()
var hdr object.Object
if !leaf.ReadHeader(&hdr) {
return nil, errors.New("header err")
}

sumPhySize += hdr.PayloadSize()
cs, _ := hdr.PayloadHomomorphicHash()

if calcHomoHash {
phyHashes = append(phyHashes, cs.Value())
Expand Down
202 changes: 160 additions & 42 deletions object/relations/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/nspcc-dev/neofs-sdk-go/bearer"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -18,24 +19,6 @@ type Tokens struct {
Bearer *bearer.Token
}

type Relations interface {
// GetSplitInfo tries to get split info by some object id.
// This method must return split info on any object from split chain as well as on parent/linking object.
// If object doesn't have any split information returns ErrNoSplitInfo.
GetSplitInfo(ctx context.Context, cnrID cid.ID, rootID oid.ID, tokens Tokens) (*object.SplitInfo, error)

// ListChildrenByLinker returns list of children for link object.
// Result doesn't include link object itself.
ListChildrenByLinker(ctx context.Context, cnrID cid.ID, linkerID oid.ID, tokens Tokens) ([]oid.ID, error)

// GetLeftSibling return previous object id in object chain.
// If no previous object it returns ErrNoLeftSibling.
GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error)

// FindSiblingByParentID returns all object that relates to the provided parent id.
FindSiblingByParentID(ctx context.Context, cnrID cid.ID, parentID oid.ID, tokens Tokens) ([]oid.ID, error)
}

var (
// ErrNoLeftSibling is an error that must be returned if object doesn't have left sibling in objects chain.
ErrNoLeftSibling = errors.New("no left siblings")
Expand All @@ -44,42 +27,56 @@ var (
ErrNoSplitInfo = errors.New("no split info")
)

// ListAllRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself. If linking object is found its id will be the last one.
func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) {
return ListRelations(ctx, rels, cnrID, rootObjID, tokens, true)
// HeadExecutor describes methods to get object head.
type HeadExecutor interface {
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectHead) (*client.ResObjectHead, error)
}

// SearchExecutor describes methods to search objects.
type SearchExecutor interface {
ObjectSearchInit(ctx context.Context, containerID cid.ID, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
}

// Executor describes all methods required to find all siblings for object.
type Executor interface {
HeadExecutor
SearchExecutor
}

// ListRelations return all related phy objects for provided root object ID in split-chain order.
// Get returns all related phy objects for provided root object ID in split-chain order, without linking object id.
// If linking object is found its id will be returned in the second result variable.
//
// Result doesn't include root object ID itself.
func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens, includeLinking bool) ([]oid.ID, error) {
splitInfo, err := rels.GetSplitInfo(ctx, cnrID, rootObjID, tokens)
func Get(ctx context.Context, executor Executor, containerID cid.ID, rootObjectID oid.ID, tokens Tokens) ([]oid.ID, *oid.ID, error) {
splitInfo, err := getSplitInfo(ctx, executor, containerID, rootObjectID, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
return []oid.ID{}, nil
return []oid.ID{}, nil, nil
}
return nil, err

return nil, nil, err
}

// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if _, ok := splitInfo.Link(); !ok {
// the list is expected to contain last part and (probably) split info
list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens)
list, err := findSiblingByParentID(ctx, executor, containerID, rootObjectID, tokens)
if err != nil {
return nil, fmt.Errorf("failed to find object children: %w", err)
return nil, nil, fmt.Errorf("children: %w", err)
}

for _, id := range list {
split, err := rels.GetSplitInfo(ctx, cnrID, id, tokens)
split, err := getSplitInfo(ctx, executor, containerID, id, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
continue
}
return nil, fmt.Errorf("failed to get split info: %w", err)
return nil, nil, fmt.Errorf("split info: %w", err)
}
if link, ok := split.Link(); ok {
splitInfo.SetLink(link)
break
}
if last, ok := split.LastPart(); ok {
splitInfo.SetLastPart(last)
Expand All @@ -88,41 +85,162 @@ func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID
}

if idLinking, ok := splitInfo.Link(); ok {
children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens)
children, err := listChildrenByLinker(ctx, executor, containerID, idLinking, tokens)
if err != nil {
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
return nil, nil, fmt.Errorf("linking object's header: %w", err)
}

if includeLinking {
children = append(children, idLinking)
}
return children, nil
return children, &idLinking, nil
}

idMember, ok := splitInfo.LastPart()
if !ok {
return nil, errors.New("missing any data in received object split information")
return nil, nil, errors.New("missing any data in received object split information")
}

chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}

for {
idMember, err = rels.GetLeftSibling(ctx, cnrID, idMember, tokens)
idMember, err = getLeftSibling(ctx, executor, containerID, idMember, tokens)
if err != nil {
if errors.Is(err, ErrNoLeftSibling) {
break
}
return nil, fmt.Errorf("failed to read split chain member's header: %w", err)
return nil, nil, fmt.Errorf("split chain member's header: %w", err)
}

if _, ok = chainSet[idMember]; ok {
return nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
return nil, nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
}

chain = append([]oid.ID{idMember}, chain...)
chainSet[idMember] = struct{}{}
}

return chain, nil
return chain, nil, nil
}

func getSplitInfo(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) (*object.SplitInfo, error) {
var prmHead client.PrmObjectHead
if tokens.Bearer != nil {
prmHead.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prmHead.WithinSession(*tokens.Session)
}
prmHead.MarkRaw()
res, err := header.ObjectHead(ctx, cnrID, objID, prmHead)

if err != nil {
var errSplit *object.SplitInfoError
if errors.As(err, &errSplit) {
return errSplit.SplitInfo(), nil
}

return nil, fmt.Errorf("raw object header: %w", err)
}

var hdr object.Object
if !res.ReadHeader(&hdr) {
return nil, errors.New("header")
}

if hdr.SplitID() == nil {
return nil, ErrNoSplitInfo
}

si := object.NewSplitInfo()
si.SetSplitID(hdr.SplitID())

if hdr.HasParent() {
if len(hdr.Children()) > 0 {
si.SetLink(objID)
} else {
si.SetLastPart(objID)
}
}

return si, nil
}

func findSiblingByParentID(ctx context.Context, searcher SearchExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) ([]oid.ID, error) {
var query object.SearchFilters
var prm client.PrmObjectSearch

query.AddParentIDFilter(object.MatchStringEqual, objID)
prm.SetFilters(query)

if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}

resSearch, err := searcher.ObjectSearchInit(ctx, cnrID, prm)
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}

var res []oid.ID
err = resSearch.Iterate(func(id oid.ID) bool {
res = append(res, id)
return false
})

if err != nil {
return nil, fmt.Errorf("iterate: %w", err)
}

return res, nil
}

func listChildrenByLinker(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) ([]oid.ID, error) {
var prm client.PrmObjectHead
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}

res, err := header.ObjectHead(ctx, cnrID, objID, prm)
if err != nil {
return nil, fmt.Errorf("linking object's header: %w", err)
}

var hdr object.Object
if !res.ReadHeader(&hdr) {
return nil, errors.New("header")
}

return hdr.Children(), nil
}

func getLeftSibling(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error) {
var prm client.PrmObjectHead
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}

res, err := header.ObjectHead(ctx, cnrID, objID, prm)
if err != nil {
return oid.ID{}, fmt.Errorf("split chain member's header: %w", err)
}

var hdr object.Object
if !res.ReadHeader(&hdr) {
return oid.ID{}, errors.New("header")
}

idMember, ok := hdr.PreviousID()
if !ok {
return oid.ID{}, ErrNoLeftSibling
}

return idMember, nil
}
13 changes: 11 additions & 2 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ import (
"go.uber.org/zap"
)

var (
relationsGet = relations.Get
)

// client represents virtual connection to the single NeoFS network endpoint from which Pool is formed.
// This interface is expected to have exactly one production implementation - clientWrapper.
// Others are expected to be for test purposes only.
Expand Down Expand Up @@ -1834,14 +1838,19 @@ func (p *Pool) DeleteObject(ctx context.Context, containerID cid.ID, objectID oi
var tokens relations.Tokens
tokens.Bearer = prm.btoken

relatives, err := relations.ListAllRelations(ctx, p, containerID, objectID, tokens)
relatives, linkerID, err := relationsGet(ctx, p, containerID, objectID, tokens)
if err != nil {
return fmt.Errorf("failed to collect relatives: %w", err)
}

if len(relatives) != 0 {
prmCtx.useContainer(containerID)
prmCtx.useObjects(append(relatives, objectID))
objList := append(relatives, objectID)
if linkerID != nil {
objList = append(objList, *linkerID)
}

prmCtx.useObjects(objList)
}
}

Expand Down
5 changes: 5 additions & 0 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/crypto/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/relations"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -429,6 +430,10 @@ func TestSessionCacheWithKey(t *testing.T) {
anonKey := test.RandomSignerRFC6979(t)
prm.UseSigner(anonKey)

relationsGet = func(ctx context.Context, executor relations.Executor, containerID cid.ID, rootObjectID oid.ID, tokens relations.Tokens) ([]oid.ID, *oid.ID, error) {
return nil, nil, nil
}

err = pool.DeleteObject(ctx, cid.ID{}, oid.ID{}, prm)
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
Expand Down

0 comments on commit 9ec17c9

Please sign in to comment.