Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve ObjectService.SearchV2 RPC #3111

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 98 additions & 10 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/network"
objectService "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl"
v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2"
Expand All @@ -33,7 +35,9 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
Expand All @@ -42,6 +46,7 @@ import (
apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type objectSvc struct {
Expand Down Expand Up @@ -131,6 +136,86 @@ func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {

type coreClientConstructor reputationClientConstructor

func (x *coreClientConstructor) SendSearchRequest(ctx context.Context, node netmapsdk.NodeInfo, req *protoobject.SearchV2Request) ([]client.SearchResultItem, error) {
// TODO: copy-pasted from old search implementation, consider deduplicating in
// the client constructor
var endpoints network.AddressGroup
if err := endpoints.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control.
return nil, fmt.Errorf("failed to decode network endpoints of the storage node from the network map: %w", err)
}
var info coreclient.NodeInfo
info.SetAddressGroup(endpoints)
info.SetPublicKey(node.PublicKey())
c, err := x.Get(info)
if err != nil {
return nil, fmt.Errorf("get node client: %w", err)
}

var resp *protoobject.SearchV2Response
var firstErr error
for i := range endpoints {
if err = c.RawForAddress(endpoints[i], func(conn *grpc.ClientConn) error {
var err error
resp, err = protoobject.NewObjectServiceClient(conn).SearchV2(ctx, req)
return err
}); err == nil {
break
}
if firstErr == nil {
firstErr = fmt.Errorf("send request over gRPC: %w", err)
}
}
if firstErr != nil {
return nil, firstErr
}

if !bytes.Equal(resp.GetVerifyHeader().GetBodySignature().GetKey(), node.PublicKey()) {
return nil, coreclient.ErrWrongPublicKey
}
if err := neofscrypto.VerifyResponseWithBuffer(resp, nil); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err := apistatus.ToError(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
// TODO: copy-pasted from SDK client code, consider sharing
if resp.Body == nil {
return nil, nil
}
n := uint32(len(resp.Body.Result))
if n == 0 {
if resp.Body.Cursor != "" {
return nil, errors.New("invalid response body: cursor is set with empty result")
}
return nil, nil
}
if reqCursor := req.GetBody().GetCursor(); reqCursor != "" && resp.Body.Cursor == reqCursor {
return nil, errors.New("invalid response body: cursor repeats the initial one")
}
if n > req.GetBody().GetCount() {
return nil, errors.New("invalid response body: more items than requested")
}

res := make([]client.SearchResultItem, n)
for i, r := range resp.Body.Result {
switch {
case r == nil:
return nil, fmt.Errorf("invalid response body: nil element #%d", i)
case r.Id == nil:
return nil, fmt.Errorf("invalid response body: invalid element #%d: missing ID", i)
case len(r.Attributes) != len(req.GetBody().GetAttributes()):
return nil, fmt.Errorf("invalid response body: invalid element #%d: wrong attribute count %d", i, len(r.Attributes))
}
if err = res[i].ID.FromProtoMessage(r.Id); err != nil {
return nil, fmt.Errorf("invalid response body: invalid element #%d: invalid ID: %w", i, err)
}
res[i].Attributes = r.Attributes
}
return res, nil
}

func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiAddressClient, error) {
c, err := (*reputationClientConstructor)(x).Get(info)
if err != nil {
Expand Down Expand Up @@ -303,10 +388,11 @@ func initObjectService(c *cfg) {
)

storage := storageForObjectService{
local: ls,
putSvc: sPut,
keys: keyStorage,
}
server := objectService.New(objSvc, mNumber, fsChain, storage, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc)
server := objectService.New(objSvc, mNumber, fsChain, storage, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor)

for _, srv := range c.cfgGRPC.servers {
protoobject.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -594,13 +680,9 @@ func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) b
}
}

// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *fsChainForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
// ForEachContainerNodeInLastTwoEpochs implements [objectService.FSChain] interface.
func (x *fsChainForObjects) ForEachContainerNodeInLastTwoEpochs(id cid.ID, f func(netmapsdk.NodeInfo) bool) error {
return x.containerNodes.forEachContainerNode(id, true, f)
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
Expand All @@ -616,10 +698,16 @@ func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool {
func (x *fsChainForObjects) LocalNodeUnderMaintenance() bool { return x.isMaintenance.Load() }

type storageForObjectService struct {
local *engine.StorageEngine
putSvc *putsvc.Service
keys *util.KeyStorage
}

// SearchObjects implements [objectService.Storage] interface.
func (x storageForObjectService) SearchObjects(cnr cid.ID, fs objectSDK.SearchFilters, attrs []string, cursor *meta.SearchCursor, count uint16) ([]client.SearchResultItem, *meta.SearchCursor, error) {
return x.local.Search(cnr, fs, attrs, cursor, count)
}

func (x storageForObjectService) VerifyAndStoreObjectLocally(obj objectSDK.Object) error {
return x.putSvc.ValidateAndStoreObjectLocally(obj)
}
Expand Down Expand Up @@ -699,8 +787,8 @@ type netmapSourceWithNodes struct {

func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) {
var serverInContainer bool
err := n.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool {
if n.fsChain.isLocalPubKey(pubKey) {
err := n.fsChain.ForEachContainerNodeInLastTwoEpochs(cID, func(node netmapsdk.NodeInfo) bool {
if n.fsChain.isLocalPubKey(node.PublicKey()) {
serverInContainer = true
return false
}
Expand Down
9 changes: 0 additions & 9 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con
}, nil
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
return x.forEachContainerNode(cnrID, true, func(node netmapsdk.NodeInfo) bool {
return f(node.PublicKey())
})
}

func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error {
curEpoch, err := x.network.Epoch()
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions pkg/core/object/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package object

import (
"bytes"
"math/big"
"strings"

"github.com/nspcc-dev/neofs-sdk-go/client"
)

// MergeSearchResults inserts next set of search result items into pre-allocated
// buffer with n set items. Returns the number of items added.
func MergeSearchResults(n uint16, buf, next []client.SearchResultItem) uint16 {
switch {
case len(buf) == 0:
panic("empty buffer")
case n > uint16(len(buf)):
panic("n overflows buffer len")
}
if n == 0 {
return uint16(copy(buf, next))
}
withAttr := len(buf[0].Attributes) > 0
var iN, iB *big.Int
if withAttr {
iN, iB = new(big.Int), new(big.Int)
}
var added uint16
next:
for len(next) > 0 {
var isInt bool
if withAttr {
_, isInt = iN.SetString(next[0].Attributes[0], 10)
}
var i uint16
for i = 0; i < n; i++ { // do not use range: if next[0] is the biggest, i=n is desired, following condition catches
if withAttr {
if isInt {
_, isInt = iB.SetString(buf[i].Attributes[0], 10)
}
if isInt {
if c := iN.Cmp(iB); c < 0 {
break
} else if c > 0 {
continue
}
} else {
if c := strings.Compare(next[0].Attributes[0], buf[i].Attributes[0]); c < 0 {
break
} else if c > 0 {
continue
}
}
}
if c := bytes.Compare(next[0].ID[:], buf[i].ID[:]); c == 0 {
next = next[1:]
continue next
} else if c < 0 {
break
}
}
if i == uint16(len(buf)) {
break
}
copy(buf[i+1:], buf[i:])
buf[i] = next[0]
added++
next, buf = next[1:], buf[i+1:]
if i < n {
n -= i + 1
} else {
n = 0
}
}
return added
}
Loading
Loading