Skip to content

Commit

Permalink
client/object: extend Replicate with object signature request
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Aug 28, 2024
1 parent 3e7f660 commit d2fcec1
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 20 deletions.
65 changes: 51 additions & 14 deletions client/object_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"

objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
refsgrpc "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
Expand All @@ -19,6 +21,7 @@ import (
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)

// ReplicateObject copies binary-encoded NeoFS object from the given
Expand All @@ -32,6 +35,10 @@ import (
// ReplicateObject is intended for maintaining data storage by NeoFS system
// nodes only, not for regular use.
//
// If signedReplication, client requests server to sign replicated object
// information to ensure replication was successful. Signature is returned
// (nil if not requested).
//
// Object must be encoded in compliance with Protocol Buffers v3 format in
// ascending order of fields.
//
Expand All @@ -48,38 +55,38 @@ import (
// replicated object;
// - [apistatus.ErrContainerNotFound]: the container to which the replicated
// object is associated was not found.
func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error {
func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) (*neofscrypto.Signature, error) {
const svcName = "neo.fs.v2.object.ObjectService"
const opName = "Replicate"
stream, err := c.c.Init(common.CallMethodInfoUnary(svcName, opName),
client.WithContext(ctx), client.AllowBinarySendingOnly())
if err != nil {
return fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err)
return nil, fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err)

Check warning on line 64 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L64

Added line #L64 was not covered by tests
}

msg, err := prepareReplicateMessage(id, src, signer)
msg, err := prepareReplicateMessage(id, src, signer, signedReplication)
if err != nil {
return err
return nil, err

Check warning on line 69 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L69

Added line #L69 was not covered by tests
}

err = stream.WriteMessage(client.BinaryMessage(msg))
if err != nil && !errors.Is(err, io.EOF) { // io.EOF means the server closed the stream on its side
return fmt.Errorf("send request: %w", err)
return nil, fmt.Errorf("send request: %w", err)

Check warning on line 74 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L74

Added line #L74 was not covered by tests
}

var resp replicateResponse
resp := replicateResponse{_sigRequested: signedReplication}
err = stream.ReadMessage(&resp)
if err != nil {
if errors.Is(err, io.EOF) {
err = io.ErrUnexpectedEOF
}

return fmt.Errorf("recv response: %w", err)
return nil, fmt.Errorf("recv response: %w", err)
}

_ = stream.Close()

return resp.err
return &resp.objSig, resp.err
}

// DemuxReplicatedObject allows to share same argument between multiple
Expand Down Expand Up @@ -108,23 +115,23 @@ func (x *demuxReplicationMessage) Seek(offset int64, whence int) (int64, error)
return x.rs.Seek(offset, whence)
}

func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) {
func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) ([]byte, error) {
srm, ok := src.(*demuxReplicationMessage)
if !ok {
return newReplicateMessage(id, src, signer)
return newReplicateMessage(id, src, signer, signedReplication)
}

srm.mtx.Lock()
defer srm.mtx.Unlock()

if srm.msg == nil && srm.err == nil {
srm.msg, srm.err = newReplicateMessage(id, src, signer)
srm.msg, srm.err = newReplicateMessage(id, src, signer, signedReplication)
}

return srm.msg, srm.err
}

func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) {
func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, requireObjectSignature bool) ([]byte, error) {
var objSize uint64
switch v := src.(type) {
default:
Expand Down Expand Up @@ -169,13 +176,15 @@ func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer

const fieldNumObject = 1
const fieldNumSignature = 2
const fieldNumSignObjectFlag = 3

sigSize := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(bPubKey)) +
protowire.SizeTag(fieldNumSigVal) + protowire.SizeBytes(len(idSig)) +
protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(sigScheme)

msgSize := protowire.SizeTag(fieldNumObject) + protowire.SizeVarint(objSize) +
protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize)
protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize) +
protowire.SizeTag(fieldNumSignObjectFlag) + protowire.SizeVarint(protowire.EncodeBool(requireObjectSignature))

// TODO(#544): support external buffers
msg := make([]byte, 0, uint64(msgSize)+objSize)
Expand All @@ -198,12 +207,17 @@ func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer
msg = protowire.AppendBytes(msg, idSig)
msg = protowire.AppendTag(msg, fieldNumSigScheme, protowire.VarintType)
msg = protowire.AppendVarint(msg, sigScheme)
msg = protowire.AppendTag(msg, fieldNumSignObjectFlag, protowire.VarintType)
msg = protowire.AppendVarint(msg, protowire.EncodeBool(requireObjectSignature))

return msg, nil
}

type replicateResponse struct {
err error
_sigRequested bool

objSig neofscrypto.Signature
err error
}

func (x replicateResponse) ToGRPCMessage() grpc.Message {
Expand All @@ -226,6 +240,29 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
}

x.err = apistatus.ErrorFromV2(st)
if x.err != nil {
return nil
}

if !x._sigRequested {
return nil
}

sigGrpc := new(refsgrpc.Signature)
err := proto.Unmarshal(m.GetObjectSignature(), sigGrpc)
if err != nil {
return fmt.Errorf("decoding signature from proto message: %w", err)

Check warning on line 254 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L254

Added line #L254 was not covered by tests
}

sigV2 := new(refs.Signature)
sigV2.SetKey(sigGrpc.GetKey())
sigV2.SetSign(sigGrpc.GetSign())
sigV2.SetScheme(refs.SignatureScheme(sigGrpc.GetScheme()))

err = x.objSig.ReadFromV2(*sigV2)
if err != nil {
return fmt.Errorf("invalid signature: %w", err)

Check warning on line 264 in client/object_replicate.go

View check run for this annotation

Codecov / codecov/patch

client/object_replicate.go#L264

Added line #L264 was not covered by tests
}

return nil
}
41 changes: 35 additions & 6 deletions client/object_replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
Expand Down Expand Up @@ -37,11 +38,13 @@ func BenchmarkPrepareReplicationMessage(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer)
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer, true)
require.NoError(b, err)
}
}

var testDataToSign = []byte("requested data to sign")

type testReplicationServer struct {
objectgrpc.UnimplementedObjectServiceServer

Expand Down Expand Up @@ -121,6 +124,23 @@ func (x *testReplicationServer) Replicate(_ context.Context, req *objectgrpc.Rep
return &resp, nil
}

if req.GetSignObject() {
var sig neofscrypto.Signature
err = sig.Calculate(x.clientSigner, testDataToSign)
if err != nil {
st.Code = 1024
st.Message = fmt.Sprintf("signing object information: %s", err)
resp.Status = &st

return &resp, nil
}

var sigV2 refs.Signature
sig.WriteToV2(&sigV2)

resp.ObjectSignature = sigV2.StableMarshal(nil)
}

resp.Status = &status.Status{Code: x.respStatusCode}
return &resp, nil
}
Expand Down Expand Up @@ -162,15 +182,15 @@ func TestClient_ReplicateObject(t *testing.T) {
srv, cli := serveObjectReplication(t, signer, obj)
srv.respStatusCode = 0

err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer)
_, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false)
require.NoError(t, err)
})

t.Run("invalid binary object", func(t *testing.T) {
bObj := []byte("Hello, world!") // definitely incorrect binary object
_, cli := serveObjectReplication(t, signer, obj)

err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer)
_, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false)
require.Error(t, err)
})

Expand All @@ -187,16 +207,25 @@ func TestClient_ReplicateObject(t *testing.T) {
srv, cli := serveObjectReplication(t, signer, obj)
srv.respStatusCode = tc.code

err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer)
_, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false)
require.ErrorIs(t, err, tc.expErr, tc.desc)
}
})

t.Run("sign object data", func(t *testing.T) {
srv, cli := serveObjectReplication(t, signer, obj)
srv.respStatusCode = 0

sig, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, true)
require.NoError(t, err)
require.True(t, sig.Verify(testDataToSign))
})

t.Run("demux", func(t *testing.T) {
demuxObj := DemuxReplicatedObject(bytes.NewReader(bObj))
_, cli := serveObjectReplication(t, signer, obj)

err := cli.ReplicateObject(ctx, id, demuxObj, signer)
_, err := cli.ReplicateObject(ctx, id, demuxObj, signer, false)
require.NoError(t, err)

msgCp := bytes.Clone(demuxObj.(*demuxReplicationMessage).msg)
Expand All @@ -208,7 +237,7 @@ func TestClient_ReplicateObject(t *testing.T) {
go func() {
defer wg.Done()

err := cli.ReplicateObject(ctx, id, demuxObj, signer)
_, err := cli.ReplicateObject(ctx, id, demuxObj, signer, false)
fmt.Println(err)
require.NoError(t, err)
}()
Expand Down

0 comments on commit d2fcec1

Please sign in to comment.