diff --git a/CHANGELOG.md b/CHANGELOG.md index 110951cddc..d051f26727 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Changelog for NeoFS Node - CLI now allows to create and print eACL with numeric filters (#2742) - gRPC connection limits per endpoint (#1240) - `neofs-lens object link` command for the new link object inspection (#2799) +- Storage nodes serve new `ObjectService.Replicate` RPC (#2674) ### Fixed - Access to `PUT` objects no longer grants `DELETE` rights (#2261) @@ -24,6 +25,7 @@ Changelog for NeoFS Node - Storage nodes no longer accept objects with header larger than 16KB (#2749) - IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777) - Big objects are split with the new split scheme (#2667) +- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317) ### Removed - Object notifications incl. NATS (#2750) diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 8fdc36af01..c40cdf7938 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -4,10 +4,12 @@ import ( "crypto/tls" "errors" "fmt" + "math" "net" "time" grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc" + "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" "golang.org/x/net/netutil" "google.golang.org/grpc" @@ -15,11 +17,50 @@ import ( ) func initGRPC(c *cfg) { + if c.cfgMorph.client == nil { + initMorphComponents(c) + } + + // limit max size of single messages received by the gRPC servers up to max + // object size setting of the NeoFS network: this is needed to serve + // ObjectService.Replicate RPC transmitting the entire stored object in one + // message + maxObjSize, err := c.nCli.MaxObjectSize() + fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) + + maxRecvSize := maxObjSize + // don't forget about meta fields + if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice + maxRecvSize += object.MaxHeaderLen + } else { + maxRecvSize = math.MaxUint64 + } + + var maxRecvMsgSizeOpt grpc.ServerOption + if maxRecvSize > maxMsgSize { // do not decrease default value + if maxRecvSize > math.MaxInt { + // ^2GB for 32-bit systems which is currently enough in practice. If at some + // point this is not enough, we'll need to expand the option + fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", + maxRecvSize, math.MaxInt)) + } + maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize)) + c.log.Debug("limit max recv gRPC message size to fit max stored objects", + zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize)) + } + var successCount int grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) { serverOpts := []grpc.ServerOption{ grpc.MaxSendMsgSize(maxMsgSize), } + if maxRecvMsgSizeOpt != nil { + // TODO(@cthulhu-rider): the setting can be server-global only now, support + // per-RPC limits + // TODO(@cthulhu-rider): max object size setting may change in general, + // but server configuration is static now + serverOpts = append(serverOpts, maxRecvMsgSizeOpt) + } tlsCfg := sc.TLS() diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 990f86b95b..14edb01445 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -359,7 +359,10 @@ func initObjectService(c *cfg) { firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector) } - server := objectTransportGRPC.New(firstSvc) + objNode, err := newNodeForObjects(c.cfgObject.cnrSource, c.netMapSource, sPut, c.IsLocalKey) + fatalOnErr(err) + + server := objectTransportGRPC.New(firstSvc, objNode) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) @@ -601,3 +604,47 @@ func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) { return hw.h, nil } + +// nodeForObjects represents NeoFS storage node for object storage. +type nodeForObjects struct { + putObjectService *putsvc.Service + containerNodes *containerNodes + isLocalPubKey func([]byte) bool +} + +func newNodeForObjects(containers containercore.Source, network netmap.Source, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) (*nodeForObjects, error) { + cnrNodes, err := newContainerNodes(containers, network) + if err != nil { + return nil, err + } + return &nodeForObjects{ + putObjectService: putObjectService, + containerNodes: cnrNodes, + isLocalPubKey: isLocalPubKey, + }, nil +} + +// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key +// of each node match the referenced container's storage policy at two latest +// epochs into f. When f returns false, nil is returned instantly. +// +// Implements [object.Node] interface. +func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error { + return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f) +} + +// IsOwnPublicKey checks whether given binary-encoded public key is assigned to +// local storage node in the network map. +// +// Implements [object.Node] interface. +func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool { + return x.isLocalPubKey(pubKey) +} + +// VerifyAndStoreObject checks given object's format and, if it is correct, +// saves the object in the node's local object storage. +// +// Implements [object.Node] interface. +func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error { + return x.putObjectService.ValidateAndStoreObjectLocally(obj) +} diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go new file mode 100644 index 0000000000..ef39b2b9e1 --- /dev/null +++ b/cmd/neofs-node/policy.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +// containerNodes wraps NeoFS network state to apply container storage policies. +type containerNodes struct { + containers container.Source + network netmap.Source +} + +func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { + return &containerNodes{ + containers: containers, + network: network, + }, nil +} + +// forEachNodePubKeyInSets passes binary-encoded public key of each node into f. +// When f returns false, forEachNodePubKeyInSets returns false instantly. +// Otherwise, true is returned. +func forEachNodePubKeyInSets(nodeSets [][]netmapsdk.NodeInfo, f func(pubKey []byte) bool) bool { + for i := range nodeSets { + for j := range nodeSets[i] { + if !f(nodeSets[i][j].PublicKey()) { + return false + } + } + } + return true +} + +// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key +// of each node match the referenced container's storage policy at two latest +// epochs into f. When f returns false, nil is returned instantly. +func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error { + epoch, err := x.network.Epoch() + if err != nil { + return fmt.Errorf("read current NeoFS epoch: %w", err) + } + + cnr, err := x.containers.Get(cnrID) + if err != nil { + return fmt.Errorf("read container by ID: %w", err) + } + + networkMap, err := x.network.GetNetMapByEpoch(epoch) + if err != nil { + return fmt.Errorf("read network map at epoch #%d: %w", epoch, err) + } + // TODO(#2692): node sets remain unchanged for fixed container and network map, + // so recently calculated results worth caching + ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID) + if err != nil { + return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) + } + + if !forEachNodePubKeyInSets(ns, f) || epoch == 0 { + return nil + } + + epoch-- + + networkMap, err = x.network.GetNetMapByEpoch(epoch) + if err != nil { + return fmt.Errorf("read network map at epoch #%d: %w", epoch, err) + } + + ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID) + if err != nil { + return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) + } + + forEachNodePubKeyInSets(ns, f) + + return nil +} diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go new file mode 100644 index 0000000000..93eba1bece --- /dev/null +++ b/cmd/neofs-node/policy_test.go @@ -0,0 +1,229 @@ +package main + +import ( + "crypto/rand" + "errors" + "fmt" + "testing" + + containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-sdk-go/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/stretchr/testify/require" +) + +type testContainer struct { + id cid.ID + val container.Container + err error +} + +func (x *testContainer) Get(id cid.ID) (*containercore.Container, error) { + if !id.Equals(x.id) { + return nil, fmt.Errorf("unexpected container requested %s!=%s", id, x.id) + } + if x.err != nil { + return nil, x.err + } + return &containercore.Container{Value: x.val}, nil +} + +type testNetwork struct { + epoch uint64 + epochErr error + + curNetmap *netmap.NetMap + curNetmapErr error + prevNetmap *netmap.NetMap + prevNetmapErr error +} + +func (x *testNetwork) GetNetMap(diff uint64) (*netmap.NetMap, error) { + panic("unexpected call") +} + +func (x *testNetwork) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) { + if epoch == x.epoch { + return x.curNetmap, x.curNetmapErr + } + if x.epoch > 0 && epoch == x.epoch-1 { + return x.prevNetmap, x.prevNetmapErr + } + return nil, fmt.Errorf("unexpected epoch #%d requested", epoch) +} + +func (x *testNetwork) Epoch() (uint64, error) { + return x.epoch, x.epochErr +} + +func newNetmapWithContainer(tb testing.TB, nodeNum int, selected []int) ([]netmap.NodeInfo, *netmap.NetMap, container.Container) { + nodes := make([]netmap.NodeInfo, nodeNum) +nextNode: + for i := range nodes { + key := make([]byte, 33) + _, err := rand.Read(key) + require.NoError(tb, err) + nodes[i].SetPublicKey(key) + + for j := range selected { + if i == selected[j] { + nodes[i].SetAttribute("attr", "true") + continue nextNode + } + } + + nodes[i].SetAttribute("attr", "false") + } + + var networkMap netmap.NetMap + networkMap.SetNodes(nodes) + + var policy netmap.PlacementPolicy + strPolicy := fmt.Sprintf("REP %d CBF 1 SELECT %d FROM F FILTER attr EQ true AS F", len(selected), len(selected)) + require.NoError(tb, policy.DecodeString(strPolicy)) + + nodeSets, err := networkMap.ContainerNodes(policy, cidtest.ID()) + require.NoError(tb, err) + require.Len(tb, nodeSets, 1) + require.Len(tb, nodeSets[0], len(selected)) + for i := range selected { + require.Contains(tb, nodeSets[0], nodes[selected[i]], i) + } + + var cnr container.Container + cnr.SetPlacementPolicy(policy) + + return nodes, &networkMap, cnr +} + +func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.T) { + const anyEpoch = 42 + anyCnr := cidtest.ID() + failOnCall := func(tb testing.TB) func([]byte) bool { + return func([]byte) bool { + tb.Fatal("must not be called") + return false + } + } + + t.Run("read current epoch", func(t *testing.T) { + epochErr := errors.New("any epoch error") + ns, err := newContainerNodes(new(testContainer), &testNetwork{epochErr: epochErr}) + require.NoError(t, err) + + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, epochErr) + }) + + t.Run("read container failure", func(t *testing.T) { + cnrErr := errors.New("any container error") + ns, err := newContainerNodes(&testContainer{ + id: anyCnr, + err: cnrErr, + }, &testNetwork{epoch: anyEpoch}) + require.NoError(t, err) + + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, cnrErr) + }) + + t.Run("read current netmap failure", func(t *testing.T) { + curNetmapErr := errors.New("any current netmap error") + ns, err := newContainerNodes(&testContainer{id: anyCnr}, &testNetwork{ + epoch: anyEpoch, + curNetmapErr: curNetmapErr, + }) + require.NoError(t, err) + + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, curNetmapErr) + }) + + t.Run("zero current epoch", func(t *testing.T) { + nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: 0, + curNetmap: curNetmap, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + }) + + t.Run("zero current epoch", func(t *testing.T) { + nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: 0, + curNetmap: curNetmap, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + }) + + t.Run("read previous network map failure", func(t *testing.T) { + nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + prevNetmapErr := errors.New("any previous netmap error") + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: anyEpoch, + curNetmap: curNetmap, + prevNetmapErr: prevNetmapErr, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.ErrorIs(t, err, prevNetmapErr) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + }) + + t.Run("both epochs OK", func(t *testing.T) { + curNodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + prevNodes, prevNetmap, _ := newNetmapWithContainer(t, 5, []int{0, 4}) + + ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ + epoch: anyEpoch, + curNetmap: curNetmap, + prevNetmap: prevNetmap, + }) + require.NoError(t, err) + + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 4) + require.Contains(t, calledKeys, curNodes[1].PublicKey()) + require.Contains(t, calledKeys, curNodes[3].PublicKey()) + require.Contains(t, calledKeys, prevNodes[0].PublicKey()) + require.Contains(t, calledKeys, prevNodes[4].PublicKey()) + }) +} diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 12ab3c868b..7e42f31006 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -2,12 +2,14 @@ package client import ( "context" + "io" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" @@ -19,6 +21,7 @@ import ( type Client interface { ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error) + ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 056b83b175..bee0faaf24 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" @@ -14,6 +15,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" @@ -237,6 +239,24 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object return } +func (x *multiClient) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error { + var errSeek error + err := x.iterateClients(ctx, func(c clientcore.Client) error { + err := c.ReplicateObject(ctx, id, src, signer) + if err != nil { + _, errSeek = src.Seek(0, io.SeekStart) + if errSeek != nil { + return nil // to break the iterator + } + } + return err + }) + if err != nil { + return err + } + return errSeek +} + func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error { return x.iterateClients(ctx, func(c clientcore.Client) error { return c.ContainerAnnounceUsedSpace(ctx, announcements, prm) diff --git a/pkg/network/transport/object/grpc/replication.go b/pkg/network/transport/object/grpc/replication.go new file mode 100644 index 0000000000..0945acf1e0 --- /dev/null +++ b/pkg/network/transport/object/grpc/replication.go @@ -0,0 +1,192 @@ +package object + +import ( + "bytes" + "context" + "errors" + "fmt" + + objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + refsv2 "github.com/nspcc-dev/neofs-api-go/v2/refs" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" + status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/object" +) + +// Replicate serves neo.fs.v2.object.ObjectService/Replicate RPC. +func (s *Server) Replicate(_ context.Context, req *objectGRPC.ReplicateRequest) (*objectGRPC.ReplicateResponse, error) { + if req.Object == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "binary object field is missing/empty", + }}, nil + } + + if req.Object.ObjectId == nil || len(req.Object.ObjectId.Value) == 0 { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "ID field is missing/empty in the object field", + }}, nil + } + + if req.Signature == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "missing object signature field", + }}, nil + } + + if len(req.Signature.Key) == 0 { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "public key field is missing/empty in the object signature field", + }}, nil + } + + if len(req.Signature.Sign) == 0 { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, Message: "signature value is missing/empty in the object signature field", + }}, nil + } + + switch scheme := req.Signature.Scheme; scheme { + default: + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "unsupported scheme in the object signature field", + }}, nil + case + refs.SignatureScheme_ECDSA_SHA512, + refs.SignatureScheme_ECDSA_RFC6979_SHA256, + refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT: + } + + hdr := req.Object.GetHeader() + if hdr == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "missing header field in the object field", + }}, nil + } + + gCnrMsg := hdr.GetContainerId() + if gCnrMsg == nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "missing container ID field in the object header field", + }}, nil + } + + var cnr cid.ID + var cnrMsg refsv2.ContainerID + err := cnrMsg.FromGRPCMessage(gCnrMsg) + if err == nil { + err = cnr.ReadFromV2(cnrMsg) + } + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("invalid container ID in the object header field: %v", err), + }}, nil + } + + var pubKey neofscrypto.PublicKey + switch req.Signature.Scheme { + // other cases already checked above + case refs.SignatureScheme_ECDSA_SHA512: + pubKey = new(neofsecdsa.PublicKey) + err = pubKey.Decode(req.Signature.Key) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "invalid ECDSA public key in the object signature field", + }}, nil + } + case refs.SignatureScheme_ECDSA_RFC6979_SHA256: + pubKey = new(neofsecdsa.PublicKeyRFC6979) + err = pubKey.Decode(req.Signature.Key) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "invalid ECDSA public key in the object signature field", + }}, nil + } + case refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT: + pubKey = new(neofsecdsa.PublicKeyWalletConnect) + err = pubKey.Decode(req.Signature.Key) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "invalid ECDSA public key in the object signature field", + }}, nil + } + } + if !pubKey.Verify(req.Object.ObjectId.Value, req.Signature.Sign) { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: "signature mismatch in the object signature field", + }}, nil + } + + var clientInCnr, serverInCnr bool + err = s.node.ForEachContainerNodePublicKeyInLastTwoEpochs(cnr, func(pubKey []byte) bool { + if !serverInCnr { + serverInCnr = s.node.IsOwnPublicKey(pubKey) + } + if !clientInCnr { + clientInCnr = bytes.Equal(pubKey, req.Signature.Key) + } + return !clientInCnr || !serverInCnr + }) + if err != nil { + if errors.Is(err, apistatus.ErrContainerNotFound) { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeContainerNotFound, + Message: "failed to check server's compliance to object's storage policy: object's container not found", + }}, nil + } + + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("failed to apply object's storage policy: %v", err), + }}, nil + } else if !serverInCnr { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeAccessDenied, Message: "server does not match the object's storage policy", + }}, nil + } else if !clientInCnr { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeAccessDenied, Message: "client does not match the object's storage policy", + }}, nil + } + + // TODO(@cthulhu-rider): avoid decoding the object completely + obj, err := objectFromMessage(req.Object) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("invalid object field: %v", err), + }}, nil + } + + err = s.node.VerifyAndStoreObject(*obj) + if err != nil { + return &objectGRPC.ReplicateResponse{Status: &status.Status{ + Code: codeInternal, + Message: fmt.Sprintf("failed to verify and store object locally: %v", err), + }}, nil + } + + return new(objectGRPC.ReplicateResponse), nil +} + +func objectFromMessage(gMsg *objectGRPC.Object) (*object.Object, error) { + var msg objectv2.Object + err := msg.FromGRPCMessage(gMsg) + if err != nil { + return nil, err + } + + return object.NewFromV2(&msg), nil +} diff --git a/pkg/network/transport/object/grpc/replication_test.go b/pkg/network/transport/object/grpc/replication_test.go new file mode 100644 index 0000000000..2045f45160 --- /dev/null +++ b/pkg/network/transport/object/grpc/replication_test.go @@ -0,0 +1,437 @@ +package object_test + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "errors" + "testing" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" + . "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/stretchr/testify/require" +) + +func randECDSAPrivateKey(tb testing.TB) *ecdsa.PrivateKey { + k, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + require.NoError(tb, err) + return k +} + +type noCallObjectService struct{} + +func (x noCallObjectService) Get(*objectV2.GetRequest, objectSvc.GetObjectStream) error { + panic("must not be called") +} + +func (x noCallObjectService) Put(context.Context) (objectSvc.PutObjectStream, error) { + panic("must not be called") +} + +func (x noCallObjectService) Head(context.Context, *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { + panic("must not be called") +} + +func (x noCallObjectService) Search(*objectV2.SearchRequest, objectSvc.SearchStream) error { + panic("must not be called") +} + +func (x noCallObjectService) Delete(context.Context, *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) { + panic("must not be called") +} + +func (x noCallObjectService) GetRange(*objectV2.GetRangeRequest, objectSvc.GetObjectRangeStream) error { + panic("must not be called") +} + +func (x noCallObjectService) GetRangeHash(context.Context, *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { + panic("must not be called") +} + +type noCallTestNode struct{} + +func (x *noCallTestNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func([]byte) bool) error { + panic("must not be called") +} + +func (x *noCallTestNode) IsOwnPublicKey([]byte) bool { + panic("must not be called") +} + +func (x *noCallTestNode) VerifyAndStoreObject(object.Object) error { + panic("must not be called") +} + +type testNode struct { + tb testing.TB + + // server state + serverPubKey []byte + + // request data + clientPubKey []byte + cnr cid.ID + obj *objectgrpc.Object + + // return + cnrErr error + clientOutsideCnr bool + serverOutsideCnr bool + + storeErr error +} + +func newTestNode(tb testing.TB, serverPubKey, clientPubKey []byte, cnr cid.ID, obj *objectgrpc.Object) *testNode { + return &testNode{ + tb: tb, + serverPubKey: serverPubKey, + clientPubKey: clientPubKey, + cnr: cnr, + obj: obj, + } +} + +func (x *testNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cnr cid.ID, f func(pubKey []byte) bool) error { + require.True(x.tb, cnr.Equals(x.cnr)) + require.NotNil(x.tb, f) + if x.cnrErr != nil { + return x.cnrErr + } + if !x.clientOutsideCnr && !f(x.clientPubKey) { + return nil + } + if !x.serverOutsideCnr && !f(x.serverPubKey) { + return nil + } + return nil +} + +func (x *testNode) IsOwnPublicKey(pubKey []byte) bool { return bytes.Equal(x.serverPubKey, pubKey) } + +func (x *testNode) VerifyAndStoreObject(obj object.Object) error { + require.Equal(x.tb, x.obj, obj.ToV2().ToGRPCMessage().(*objectgrpc.Object)) + return x.storeErr +} + +func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID oid.ID) *objectgrpc.ReplicateRequest { + obj := objecttest.Object(tb) + obj.SetContainerID(cnr) + obj.SetID(objID) + + sig, err := signer.Sign(objID[:]) + require.NoError(tb, err) + + req := &objectgrpc.ReplicateRequest{ + Object: obj.ToV2().ToGRPCMessage().(*objectgrpc.Object), + Signature: &refs.Signature{ + Key: neofscrypto.PublicKeyBytes(signer.Public()), + Sign: sig, + }, + } + + switch signer.Scheme() { + default: + tb.Fatalf("unsupported scheme %v", signer.Scheme()) + case neofscrypto.ECDSA_SHA512: + req.Signature.Scheme = refs.SignatureScheme_ECDSA_SHA512 + case neofscrypto.ECDSA_DETERMINISTIC_SHA256: + req.Signature.Scheme = refs.SignatureScheme_ECDSA_RFC6979_SHA256 + case neofscrypto.ECDSA_WALLETCONNECT: + req.Signature.Scheme = refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT + } + + return req +} + +func TestServer_Replicate(t *testing.T) { + var noCallNode noCallTestNode + var noCallObjSvc noCallObjectService + noCallSrv := New(noCallObjSvc, &noCallNode) + clientSigner := test.RandomSigner(t) + clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) + serverPubKey := neofscrypto.PublicKeyBytes(test.RandomSigner(t).Public()) + cnr := cidtest.ID() + objID := oidtest.ID() + req := anyValidRequest(t, clientSigner, cnr, objID) + + t.Run("invalid/unsupported signature format", func(t *testing.T) { + // note: verification is tested separately + for _, tc := range []struct { + name string + fSig func() *refs.Signature + expectedCode uint32 + expectedMsg string + }{ + { + name: "missing object signature field", + fSig: func() *refs.Signature { return nil }, + expectedCode: 1024, + expectedMsg: "missing object signature field", + }, + { + name: "missing public key field in the signature field", + fSig: func() *refs.Signature { + return &refs.Signature{ + Key: nil, + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_SHA512, // any supported + } + }, + expectedCode: 1024, + expectedMsg: "public key field is missing/empty in the object signature field", + }, + { + name: "missing value field in the signature field", + fSig: func() *refs.Signature { + return &refs.Signature{ + Key: []byte("any non-empty"), + Sign: []byte{}, + Scheme: refs.SignatureScheme_ECDSA_SHA512, // any supported + } + }, + expectedCode: 1024, + expectedMsg: "signature value is missing/empty in the object signature field", + }, + { + name: "unsupported scheme in the signature field", + fSig: func() *refs.Signature { + return &refs.Signature{ + Key: []byte("any non-empty"), + Sign: []byte("any non-empty"), + Scheme: 3, + } + }, + expectedCode: 1024, + expectedMsg: "unsupported scheme in the object signature field", + }, + } { + req := anyValidRequest(t, test.RandomSigner(t), cidtest.ID(), oidtest.ID()) + req.Signature = tc.fSig() + resp, err := noCallSrv.Replicate(context.Background(), req) + require.NoError(t, err, tc.name) + require.EqualValues(t, tc.expectedCode, resp.GetStatus().GetCode(), tc.name) + require.Equal(t, tc.expectedMsg, resp.GetStatus().GetMessage(), tc.name) + } + }) + + t.Run("signature verification failure", func(t *testing.T) { + // note: common format is tested separately + for _, tc := range []struct { + name string + fSig func(bObj []byte) *refs.Signature + expectedCode uint32 + expectedMsg string + }{ + { + name: "ECDSA SHA-512: invalid public key", + fSig: func(_ []byte) *refs.Signature { + return &refs.Signature{ + Key: []byte("not ECDSA key"), + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_SHA512, + } + }, + expectedCode: 1024, + expectedMsg: "invalid ECDSA public key in the object signature field", + }, + { + name: "ECDSA SHA-512: signature mismatch", + fSig: func(bObj []byte) *refs.Signature { + return &refs.Signature{ + Key: neofscrypto.PublicKeyBytes((*neofsecdsa.Signer)(randECDSAPrivateKey(t)).Public()), + Sign: []byte("definitely invalid"), + Scheme: refs.SignatureScheme_ECDSA_SHA512, + } + }, + expectedCode: 1024, + expectedMsg: "signature mismatch in the object signature field", + }, + { + name: "ECDSA SHA-256 deterministic: invalid public key", + fSig: func(_ []byte) *refs.Signature { + return &refs.Signature{ + Key: []byte("not ECDSA key"), + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256, + } + }, + expectedCode: 1024, + expectedMsg: "invalid ECDSA public key in the object signature field", + }, + { + name: "ECDSA SHA-256 deterministic: signature mismatch", + fSig: func(bObj []byte) *refs.Signature { + return &refs.Signature{ + Key: neofscrypto.PublicKeyBytes((*neofsecdsa.SignerRFC6979)(randECDSAPrivateKey(t)).Public()), + Sign: []byte("definitely invalid"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256, + } + }, + expectedCode: 1024, + expectedMsg: "signature mismatch in the object signature field", + }, + { + name: "ECDSA SHA-256 WalletConnect: invalid public key", + fSig: func(_ []byte) *refs.Signature { + return &refs.Signature{ + Key: []byte("not ECDSA key"), + Sign: []byte("any non-empty"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT, + } + }, + expectedCode: 1024, + expectedMsg: "invalid ECDSA public key in the object signature field", + }, + { + name: "ECDSA SHA-256 WalletConnect: signature mismatch", + fSig: func(bObj []byte) *refs.Signature { + return &refs.Signature{ + Key: neofscrypto.PublicKeyBytes((*neofsecdsa.SignerWalletConnect)(randECDSAPrivateKey(t)).Public()), + Sign: []byte("definitely invalid"), + Scheme: refs.SignatureScheme_ECDSA_RFC6979_SHA256_WALLET_CONNECT, + } + }, + expectedCode: 1024, + expectedMsg: "signature mismatch in the object signature field", + }, + } { + obj := objecttest.Object(t) + bObj, err := obj.Marshal() + require.NoError(t, err) + + resp, err := noCallSrv.Replicate(context.Background(), &objectgrpc.ReplicateRequest{ + Object: obj.ToV2().ToGRPCMessage().(*objectgrpc.Object), + Signature: tc.fSig(bObj), + }) + require.NoError(t, err, tc.name) + require.EqualValues(t, tc.expectedCode, resp.GetStatus().GetCode(), tc.name) + require.Equal(t, tc.expectedMsg, resp.GetStatus().GetMessage(), tc.name) + } + }) + + t.Run("apply storage policy failure", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + node.cnrErr = errors.New("any error") + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 1024, resp.GetStatus().GetCode()) + require.Equal(t, "failed to apply object's storage policy: any error", resp.GetStatus().GetMessage()) + }) + + t.Run("client or server mismatches object's storage policy", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + node.serverOutsideCnr = true + node.clientOutsideCnr = true + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 2048, resp.GetStatus().GetCode()) + require.Equal(t, "server does not match the object's storage policy", resp.GetStatus().GetMessage()) + + node.serverOutsideCnr = false + + resp, err = srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 2048, resp.GetStatus().GetCode()) + require.Equal(t, "client does not match the object's storage policy", resp.GetStatus().GetMessage()) + }) + + t.Run("local storage failure", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + node.storeErr = errors.New("any error") + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 1024, resp.GetStatus().GetCode()) + require.Equal(t, "failed to verify and store object locally: any error", resp.GetStatus().GetMessage()) + }) + + t.Run("OK", func(t *testing.T) { + node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) + srv := New(noCallObjSvc, node) + + resp, err := srv.Replicate(context.Background(), req) + require.NoError(t, err) + require.EqualValues(t, 0, resp.GetStatus().GetCode()) + require.Empty(t, resp.GetStatus().GetMessage()) + }) +} + +type nopNode struct{} + +func (x nopNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error { + return nil +} + +func (x nopNode) IsOwnPublicKey([]byte) bool { + return false +} + +func (x nopNode) VerifyAndStoreObject(object.Object) error { + return nil +} + +func BenchmarkServer_Replicate(b *testing.B) { + ctx := context.Background() + var node nopNode + + srv := New(nil, node) + + for _, tc := range []struct { + name string + newSigner func(tb testing.TB) neofscrypto.Signer + }{ + { + name: "ECDSA SHA-512", + newSigner: func(tb testing.TB) neofscrypto.Signer { + return (*neofsecdsa.Signer)(randECDSAPrivateKey(tb)) + }, + }, + { + name: "ECDSA SHA-256 deterministic", + newSigner: func(tb testing.TB) neofscrypto.Signer { + return (*neofsecdsa.SignerRFC6979)(randECDSAPrivateKey(tb)) + }, + }, + { + name: "ECDSA SHA-256 WalletConnect", + newSigner: func(tb testing.TB) neofscrypto.Signer { + return (*neofsecdsa.SignerWalletConnect)(randECDSAPrivateKey(tb)) + }, + }, + } { + b.Run(tc.name, func(b *testing.B) { + req := anyValidRequest(b, tc.newSigner(b), cidtest.ID(), oidtest.ID()) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + resp, err := srv.Replicate(ctx, req) + require.NoError(b, err) + require.Zero(b, resp.GetStatus().GetCode()) + } + }) + } +} diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 29765f6126..511d5413fa 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -7,21 +7,54 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/util" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectsdk "github.com/nspcc-dev/neofs-sdk-go/object" ) +// Various NeoFS protocol status codes. +const ( + codeInternal = uint32(1024*status.Section_SECTION_FAILURE_COMMON) + uint32(status.CommonFail_INTERNAL) + codeAccessDenied = uint32(1024*status.Section_SECTION_OBJECT) + uint32(status.Object_ACCESS_DENIED) + codeContainerNotFound = uint32(1024*status.Section_SECTION_CONTAINER) + uint32(status.Container_CONTAINER_NOT_FOUND) +) + +// Node represents NeoFS storage node that is served by [Server]. +type Node interface { + // ForEachContainerNodePublicKeyInLastTwoEpochs iterates over all nodes matching + // the referenced container's storage policy at the current and the previous + // NeoFS epochs, and passes their public keys into f. IterateContainerNodeKeys + // breaks without an error when f returns false. Keys may be repeated. + // + // Returns [apistatus.ErrContainerNotFound] if referenced container was not + // found. + ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error + + // IsOwnPublicKey checks whether given pubKey assigned to Node in the NeoFS + // network map. + IsOwnPublicKey(pubKey []byte) bool + + // VerifyAndStoreObject checks whether given object has correct format and, if + // so, saves it into local object storage of the Node. StoreObject is called + // only when the Node complies with the container's storage policy. + VerifyAndStoreObject(objectsdk.Object) error +} + // Server wraps NeoFS API Object service and // provides gRPC Object service server interface. type Server struct { - objectGRPC.UnimplementedObjectServiceServer srv objectSvc.ServiceServer + + node Node } // New creates, initializes and returns Server instance. -func New(c objectSvc.ServiceServer) *Server { +func New(c objectSvc.ServiceServer, node Node) *Server { return &Server{ - srv: c, + srv: c, + node: node, } } diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index 77a0d614c9..db8ea03aff 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -1,11 +1,16 @@ package putsvc import ( + "bytes" + "crypto/sha256" + "errors" "fmt" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/tzhash/tz" ) // ObjectStorage is an object storage interface. @@ -38,26 +43,118 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet } func (t *localTarget) Close() (oid.ID, error) { - switch t.meta.Type() { + err := putObjectLocally(t.storage, t.obj, t.meta) + if err != nil { + return oid.ID{}, err + } + + id, _ := t.obj.ID() + + return id, nil +} + +func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore.ContentMeta) error { + switch meta.Type() { case object.TypeTombstone: - err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects()) + err := storage.Delete(objectCore.AddressOf(obj), meta.Objects()) if err != nil { - return oid.ID{}, fmt.Errorf("could not delete objects from tombstone locally: %w", err) + return fmt.Errorf("could not delete objects from tombstone locally: %w", err) } case object.TypeLock: - err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects()) + err := storage.Lock(objectCore.AddressOf(obj), meta.Objects()) if err != nil { - return oid.ID{}, fmt.Errorf("could not lock object from lock objects locally: %w", err) + return fmt.Errorf("could not lock object from lock objects locally: %w", err) } default: // objects that do not change meta storage } - if err := t.storage.Put(t.obj); err != nil { - return oid.ID{}, fmt.Errorf("(%T) could not put object to local storage: %w", t, err) + if err := storage.Put(obj); err != nil { + return fmt.Errorf("could not put object to local storage: %w", err) } - id, _ := t.obj.ID() + return nil +} - return id, nil +// ValidateAndStoreObjectLocally checks format of given object and, if it's +// correct, stores it in the underlying local object storage. Serves operation +// similar to local-only [Service.Put] one. +func (p *Service) ValidateAndStoreObjectLocally(obj object.Object) error { + cnrID, ok := obj.ContainerID() + if !ok { + return errors.New("missing container ID") + } + + cs, csSet := obj.PayloadChecksum() + if !csSet { + return errors.New("missing payload checksum") + } + + csType := cs.Type() + switch csType { + default: + return errors.New("unsupported payload checksum type") + case + checksum.SHA256, + checksum.TZ: + } + + maxPayloadSz := p.maxSizeSrc.MaxObjectSize() + if maxPayloadSz == 0 { + return errors.New("failed to obtain max payload size setting") + } + + payload := obj.Payload() + payloadSz := obj.PayloadSize() + if payloadSz != uint64(len(payload)) { + return ErrWrongPayloadSize + } + + if payloadSz > maxPayloadSz { + return ErrExceedingMaxSize + } + + cnr, err := p.cnrSrc.Get(cnrID) + if err != nil { + return fmt.Errorf("read container by ID: %w", err) + } + + if !cnr.Value.IsHomomorphicHashingDisabled() { + csHomo, csHomoSet := obj.PayloadHomomorphicHash() + switch { + case !csHomoSet: + return errors.New("missing homomorphic payload checksum") + case csHomo.Type() != checksum.TZ: + return fmt.Errorf("wrong/unsupported type of homomorphic payload checksum, expected %s", checksum.TZ) + case len(csHomo.Value()) != tz.Size: + return fmt.Errorf("invalid/unsupported length of %s homomorphic payload checksum, expected %d", + csHomo.Type(), tz.Size) + } + } + + if err := p.fmtValidator.Validate(&obj, false); err != nil { + return fmt.Errorf("validate object format: %w", err) + } + + objMeta, err := p.fmtValidator.ValidateContent(&obj) + if err != nil { + return fmt.Errorf("validate payload content: %w", err) + } + + switch csType { + default: + return errors.New("unsupported payload checksum type") + case checksum.SHA256: + h := sha256.Sum256(payload) + if !bytes.Equal(h[:], cs.Value()) { + return errors.New("payload SHA-256 checksum mismatch") + } + case checksum.TZ: + h := tz.Sum(payload) + if !bytes.Equal(h[:], cs.Value()) { + return errors.New("payload Tillich-Zemor checksum mismatch") + } + } + + return putObjectLocally(p.localStore, &obj, objMeta) } diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 75d64ed09c..acbc95e000 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -3,12 +3,14 @@ package putsvc import ( "context" "fmt" + "io" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { return nil } + +// ReplicateObjectToNode copies binary-encoded NeoFS object from the given +// [io.ReadSeeker] into local storage of the node described by specified +// [netmap.NodeInfo]. +func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, id oid.ID, src io.ReadSeeker, nodeInfo netmap.NodeInfo) error { + var nodeInfoForCons clientcore.NodeInfo + + err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo)) + if err != nil { + return fmt.Errorf("parse remote node info: %w", err) + } + + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return fmt.Errorf("fetch local node's private key: %w", err) + } + + c, err := s.clientConstructor.Get(nodeInfoForCons) + if err != nil { + return fmt.Errorf("init NeoFS API client of the remote node: %w", err) + } + + err = c.ReplicateObject(ctx, id, src, (*neofsecdsa.Signer)(key)) + if err != nil { + return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err) + } + + return nil +} diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 96606070dd..661ba5c4c8 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -1,10 +1,12 @@ package replicator import ( + "bytes" "context" + "io" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -25,9 +27,12 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) ) }() - if task.obj == nil { - var err error - task.obj, err = engine.Get(p.localStorage, task.addr) + var err error + var prm *putsvc.RemotePutPrm + var stream io.ReadSeeker + binReplication := task.obj == nil + if binReplication { + b, err := p.localStorage.GetBytes(task.addr) if err != nil { p.log.Error("could not get object from local storage", zap.Stringer("object", task.addr), @@ -35,11 +40,14 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) return } + stream = bytes.NewReader(b) + if len(task.nodes) > 1 { + stream = client.DemuxReplicatedObject(stream) + } + } else { + prm = new(putsvc.RemotePutPrm).WithObject(task.obj) } - prm := new(putsvc.RemotePutPrm). - WithObject(task.obj) - for i := 0; task.quantity > 0 && i < len(task.nodes); i++ { select { case <-ctx.Done(): @@ -54,7 +62,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) - err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + if binReplication { + err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i]) + // note that we don't need to reset stream because it is used exactly once + // according to the client.DemuxReplicatedObject above + } else { + err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i])) + } cancel()