Skip to content

Commit

Permalink
relations: Sync relations interface with client and pool
Browse files Browse the repository at this point in the history
close #416

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jul 4, 2023
1 parent 4f36bc4 commit d2cf50f
Showing 1 changed file with 233 additions and 0 deletions.
233 changes: 233 additions & 0 deletions object/relations/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/nspcc-dev/neofs-sdk-go/bearer"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -46,12 +47,14 @@ var (

// ListAllRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself. If linking object is found its id will be the last one.
// Deprecated: use All instead.
func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) {
return ListRelations(ctx, rels, cnrID, rootObjID, tokens, true)
}

// ListRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself.
// Deprecated: use WithLinking instead.
func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens, includeLinking bool) ([]oid.ID, error) {
splitInfo, err := rels.GetSplitInfo(ctx, cnrID, rootObjID, tokens)
if err != nil {
Expand Down Expand Up @@ -126,3 +129,233 @@ func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID

return chain, nil
}

// HeadExecutor describes methods to get object head.
type HeadExecutor interface {
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectHead) (*client.ResObjectHead, error)
}

// SearchExecutor describes methods to search objects.
type SearchExecutor interface {
ObjectSearchInit(ctx context.Context, containerID cid.ID, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
}

// Executor describes all methods required to find all siblings for object.
type Executor interface {
HeadExecutor
SearchExecutor
}

// All returns all related phy objects for provided root object ID in split-chain order.
// If linking object is found its id will be the last one.
//
// Result doesn't include root object ID itself.
func All(ctx context.Context, executor Executor, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) {
return WithLinking(ctx, executor, cnrID, rootObjID, tokens, true)
}

// WithLinking returns all related phy objects for provided root object ID in split-chain order.
//
// In case of object small enough and wasn't divided/split to few objects, the function returns ([]oid.ID, nil).
// This means there is no another objects related to requested one.
//
// Result doesn't include root object ID itself.
func WithLinking(ctx context.Context, executor Executor, containerID cid.ID, rootObjectID oid.ID, tokens Tokens, includeLinking bool) ([]oid.ID, error) {
splitInfo, err := getSplitInfo(ctx, executor, containerID, rootObjectID, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
return []oid.ID{}, nil
}

return nil, err
}

// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if _, ok := splitInfo.Link(); !ok {
// the list is expected to contain last part and (probably) split info
list, err := findSiblingByParentID(ctx, executor, containerID, rootObjectID, tokens)
if err != nil {
return nil, fmt.Errorf("children: %w", err)
}

for _, id := range list {
split, err := getSplitInfo(ctx, executor, containerID, id, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
continue
}
return nil, fmt.Errorf("split info: %w", err)
}
if link, ok := split.Link(); ok {
splitInfo.SetLink(link)
}
if last, ok := split.LastPart(); ok {
splitInfo.SetLastPart(last)
}
}
}

if idLinking, ok := splitInfo.Link(); ok {
children, err := listChildrenByLinker(ctx, executor, containerID, idLinking, tokens)
if err != nil {
return nil, fmt.Errorf("linking object's header: %w", err)
}

if includeLinking {
children = append(children, idLinking)
}
return children, nil
}

idMember, ok := splitInfo.LastPart()
if !ok {
return nil, errors.New("missing any data in received object split information")
}

chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}

for {
idMember, err = getLeftSibling(ctx, executor, containerID, idMember, tokens)
if err != nil {
if errors.Is(err, ErrNoLeftSibling) {
break
}
return nil, fmt.Errorf("split chain member's header: %w", err)
}

if _, ok = chainSet[idMember]; ok {
return nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
}

chain = append([]oid.ID{idMember}, chain...)
chainSet[idMember] = struct{}{}
}

return chain, nil
}

func getSplitInfo(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) (*object.SplitInfo, error) {
var prmHead client.PrmObjectHead
if tokens.Bearer != nil {
prmHead.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prmHead.WithinSession(*tokens.Session)
}
prmHead.MarkRaw()
res, err := header.ObjectHead(ctx, cnrID, objID, prmHead)

var hdr object.Object
if !res.ReadHeader(&hdr) {
return nil, errors.New("header")
}

var errSplit *object.SplitInfoError
if err != nil {
if errors.As(err, &errSplit) {
return errSplit.SplitInfo(), nil
}

return nil, fmt.Errorf("raw object header: %w", err)
}

if hdr.SplitID() == nil {
return nil, ErrNoSplitInfo
}

si := object.NewSplitInfo()
si.SetSplitID(hdr.SplitID())

if hdr.HasParent() {
if len(hdr.Children()) > 0 {
si.SetLink(objID)
} else {
si.SetLastPart(objID)
}
}

return si, nil
}

func findSiblingByParentID(ctx context.Context, searcher SearchExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) ([]oid.ID, error) {
var query object.SearchFilters
var prm client.PrmObjectSearch

query.AddParentIDFilter(object.MatchStringEqual, objID)
prm.SetFilters(query)

if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}

resSearch, err := searcher.ObjectSearchInit(ctx, cnrID, prm)
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}

var res []oid.ID
err = resSearch.Iterate(func(id oid.ID) bool {
res = append(res, id)
return false
})

if err != nil {
return nil, fmt.Errorf("iterate: %w", err)
}

return res, nil
}

func listChildrenByLinker(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) ([]oid.ID, error) {
var prm client.PrmObjectHead
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}

res, err := header.ObjectHead(ctx, cnrID, objID, prm)
if err != nil {
return nil, fmt.Errorf("linking object's header: %w", err)
}

var hdr object.Object
if !res.ReadHeader(&hdr) {
return nil, errors.New("header")
}

return hdr.Children(), nil
}

func getLeftSibling(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error) {
var prm client.PrmObjectHead
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}

res, err := header.ObjectHead(ctx, cnrID, objID, prm)
if err != nil {
return oid.ID{}, fmt.Errorf("split chain member's header: %w", err)
}

var hdr object.Object
if !res.ReadHeader(&hdr) {
return oid.ID{}, errors.New("header")
}

idMember, ok := hdr.PreviousID()
if !ok {
return oid.ID{}, ErrNoLeftSibling
}

return idMember, nil
}

0 comments on commit d2cf50f

Please sign in to comment.