Skip to content

Commit

Permalink
object/put: Send ready objects to remote nodes via new replication API
Browse files Browse the repository at this point in the history
Previously, storage nodes always sent "ready" objects after client data
slicing via NeoFS API `ObjectService.Put` RPC. Recently, NeoFS protocol
was extended with `ObjectService.Replicate` RPC allowing to replicate
object from one container node to another. New RPC is more efficient,
and now used by Policer+Replicator tandem to replicate objects on
shortage. Actually, when `ObjectService.Put` server makes initial object
save, it does completely the same.

In total, when `Put` server represents storage node from the container,
it can use `Replicate` RPC for better performance. An additional
advantage is the one-time encoding of the protocol message, which is
reused for sending to different nodes. This reduces the memory cost of
processing each data stream.

Refs #2317.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 30, 2024
1 parent f88af45 commit 152b8a6
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 19 deletions.
93 changes: 85 additions & 8 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
package putsvc

import (
"bytes"
"context"
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
Close() (oid.ID, error)
}

type binReplicationContext struct {
context.Context
b io.ReadSeeker
}

type distributedTarget struct {
ctx context.Context

traversal traversal

remotePool, localPool util.WorkerPool
Expand All @@ -42,6 +55,8 @@ type distributedTarget struct {

// parameters and state of container traversal.
type traversal struct {
placementBuilder placement.Builder

opts []placement.Option

// need of additional broadcast after the object is saved
Expand Down Expand Up @@ -145,34 +160,58 @@ func (t *distributedTarget) Close() (oid.ID, error) {
t.traversal.extraBroadcastEnabled = true
}

return t.iteratePlacement(t.sendObject)
return t.iteratePlacement(t.ctx, t.sendObject)
}

func (t *distributedTarget) sendObject(node nodeDesc) error {
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
if !node.local && t.relay != nil {
return t.relay(node)
}

target := t.nodeTargetInitializer(node)

if err := target.WriteObject(t.obj, t.objMeta); err != nil {
if err := target.WriteObject(ctx, t.obj, t.objMeta); err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
}
return nil
}

func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (oid.ID, error) {
func (t *distributedTarget) iteratePlacement(ctx context.Context, f func(context.Context, nodeDesc) error) (oid.ID, error) {
id, _ := t.obj.ID()

placementBuilder := t.traversal.placementBuilder
var placementIntrcpt *placementInterceptor

if _, ok := ctx.(*binReplicationContext); !ok {
placementIntrcpt = &placementInterceptor{
base: t.traversal.placementBuilder,
isLocalKey: t.isLocalKey,
}

placementBuilder = placementIntrcpt
}

traverser, err := placement.NewTraverser(
append(t.traversal.opts, placement.ForObject(id))...,
append(t.traversal.opts, placement.UseBuilder(placementBuilder), placement.ForObject(id))...,
)
if err != nil {
return oid.ID{}, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
}

if placementIntrcpt != nil && placementIntrcpt.withLocalAndRemoteNodes {
bObj, err := t.obj.Marshal()
if err != nil {
return oid.ID{}, fmt.Errorf("encode object into binary: %w", err)
}

ctx = &binReplicationContext{
Context: ctx,
b: client.DemuxReplicatedObject(bytes.NewReader(bObj)),
}
}

var resErr atomic.Value

loop:
Expand Down Expand Up @@ -207,7 +246,7 @@ loop:
if err := workerPool.Submit(func() {
defer wg.Done()

err := f(nodeDesc{local: isLocal, info: addr})
err := f(ctx, nodeDesc{local: isLocal, info: addr})

// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
Expand Down Expand Up @@ -244,7 +283,7 @@ loop:

// perform additional container broadcast if needed
if t.traversal.submitPrimaryPlacementFinish() {
_, err = t.iteratePlacement(f)
_, err = t.iteratePlacement(ctx, f)
if err != nil {
t.log.Error("additional container broadcast failure",
zap.Error(err),
Expand All @@ -258,3 +297,41 @@ loop:

return id, nil
}

type placementInterceptor struct {
base placement.Builder

isLocalKey func([]byte) bool

withLocalAndRemoteNodes bool
}

func (x *placementInterceptor) BuildPlacement(cnr cid.ID, obj *oid.ID, storagepolicy netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
cnrNodesSets, err := x.base.BuildPlacement(cnr, obj, storagepolicy)
if err != nil {
return nil, err
}

var foundLocal, foundRemote bool
for i := range cnrNodesSets {
for j := range cnrNodesSets[i] {
pubKey := cnrNodesSets[i][j].PublicKey()
isLocal := x.isLocalKey(pubKey)

if !foundLocal {
foundLocal = isLocal
}

if !foundRemote {
foundRemote = !isLocal
}

if foundLocal && foundRemote {
x.withLocalAndRemoteNodes = true
return cnrNodesSets, nil
}
}
}

return cnrNodesSets, nil
}
3 changes: 2 additions & 1 deletion pkg/services/object/put/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package putsvc

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
Expand Down Expand Up @@ -36,7 +37,7 @@ type localTarget struct {
meta objectCore.ContentMeta
}

func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
func (t *localTarget) WriteObject(_ context.Context, obj *object.Object, meta objectCore.ContentMeta) error {
t.obj = obj
t.meta = meta

Expand Down
3 changes: 2 additions & 1 deletion pkg/services/object/put/prm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type PutInitPrm struct {

cnr containerSDK.Container

traverseOpts []placement.Option
placementBuilder placement.Builder
traverseOpts []placement.Option

copiesNumber uint32

Expand Down
45 changes: 38 additions & 7 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,17 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type remoteTarget struct {
log *zap.Logger

// if *binReplicationContext, server attempts to send objects via NeoFS API
// ObjectService.Replicate RPC. If particular remote node does not support
// Replicate op, the object is sent via default Put to the node.
ctx context.Context

keyStorage *util.KeyStorage
Expand Down Expand Up @@ -45,19 +53,25 @@ type RemotePutPrm struct {
obj *object.Object
}

func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
func (t *remoteTarget) WriteObject(ctx context.Context, obj *object.Object, _ objectcore.ContentMeta) error {
t.ctx = ctx
t.obj = obj

return nil
}

func (t *remoteTarget) Close() (oid.ID, error) {
binReplicationCtx, isBinReplication := t.ctx.(*binReplicationContext)
var sessionInfo *util.SessionInfo

if tok := t.commonPrm.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
if !isBinReplication {
// ObjectService.Replicate request must be signed by the container node,
// so we must not use session private key
if tok := t.commonPrm.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
}

Expand All @@ -71,6 +85,24 @@ func (t *remoteTarget) Close() (oid.ID, error) {
return oid.ID{}, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
}

if isBinReplication {
err = c.ReplicateObject(t.ctx, binReplicationCtx.b, (*neofsecdsa.Signer)(key))
if err == nil {
id, _ := t.obj.ID()
return id, nil
}

// FIXME: temporary workaround, see also
// https://github.com/nspcc-dev/neofs-api/issues/201#issuecomment-1891383454
if st, ok := status.FromError(err); !ok || st.Code() != codes.Unimplemented {
return oid.ID{}, err
}

if t.log != nil {
t.log.Debug("node does not support 'Replicate' RPC, fallback to 'Put'")
}
}

var prm internalclient.PutObjectPrm

prm.SetContext(t.ctx)
Expand Down Expand Up @@ -118,7 +150,6 @@ func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm {
// PutObject sends object to remote node.
func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
t := &remoteTarget{
ctx: ctx,
keyStorage: s.keyStorage,
clientConstructor: s.clientConstructor,
}
Expand All @@ -128,7 +159,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return fmt.Errorf("parse client node info: %w", err)
}

if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
if err := t.WriteObject(ctx, p.obj, objectcore.ContentMeta{}); err != nil {
return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err)
Expand Down
7 changes: 5 additions & 2 deletions pkg/services/object/put/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
}

// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
prm.placementBuilder = builder

return nil
}
Expand Down Expand Up @@ -215,7 +215,10 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target {
withBroadcast := !prm.common.LocalOnly() && (typ == object.TypeTombstone || typ == object.TypeLock)

return &distributedTarget{
ctx: p.ctx,
traversal: traversal{
placementBuilder: prm.placementBuilder,

opts: prm.traverseOpts,

extraBroadcastEnabled: withBroadcast,
Expand All @@ -231,10 +234,10 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target {
}

rt := &remoteTarget{
ctx: p.ctx,
keyStorage: p.keyStorage,
commonPrm: prm.common,
clientConstructor: p.clientConstructor,
log: p.log,
}

client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
Expand Down

0 comments on commit 152b8a6

Please sign in to comment.