From 322933b31e798d5a37c21679922a96c96158755a Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 28 Aug 2024 16:09:31 +0300 Subject: [PATCH] client/object: extend `Replicate` with object signature request Signed-off-by: Pavel Karpy --- client/object_replicate.go | 64 +++++++++++++++++++++++++-------- client/object_replicate_test.go | 41 +++++++++++++++++---- 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/client/object_replicate.go b/client/object_replicate.go index 5b37f27d..689813c4 100644 --- a/client/object_replicate.go +++ b/client/object_replicate.go @@ -10,6 +10,7 @@ import ( "sync" objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" @@ -32,6 +33,10 @@ import ( // ReplicateObject is intended for maintaining data storage by NeoFS system // nodes only, not for regular use. // +// If signedReplication, client requests server to sign replicated object +// information to ensure replication was successful. Signature is returned +// (nil if not requested). +// // Object must be encoded in compliance with Protocol Buffers v3 format in // ascending order of fields. // @@ -48,38 +53,38 @@ import ( // replicated object; // - [apistatus.ErrContainerNotFound]: the container to which the replicated // object is associated was not found. -func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error { +func (c *Client) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) (*neofscrypto.Signature, error) { const svcName = "neo.fs.v2.object.ObjectService" const opName = "Replicate" stream, err := c.c.Init(common.CallMethodInfoUnary(svcName, opName), client.WithContext(ctx), client.AllowBinarySendingOnly()) if err != nil { - return fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err) + return nil, fmt.Errorf("init service=%s/op=%s RPC: %w", svcName, opName, err) } - msg, err := prepareReplicateMessage(id, src, signer) + msg, err := prepareReplicateMessage(id, src, signer, signedReplication) if err != nil { - return err + return nil, err } err = stream.WriteMessage(client.BinaryMessage(msg)) if err != nil && !errors.Is(err, io.EOF) { // io.EOF means the server closed the stream on its side - return fmt.Errorf("send request: %w", err) + return nil, fmt.Errorf("send request: %w", err) } - var resp replicateResponse + resp := replicateResponse{_sigRequested: signedReplication} err = stream.ReadMessage(&resp) if err != nil { if errors.Is(err, io.EOF) { err = io.ErrUnexpectedEOF } - return fmt.Errorf("recv response: %w", err) + return nil, fmt.Errorf("recv response: %w", err) } _ = stream.Close() - return resp.err + return resp.objSig, resp.err } // DemuxReplicatedObject allows to share same argument between multiple @@ -108,23 +113,23 @@ func (x *demuxReplicationMessage) Seek(offset int64, whence int) (int64, error) return x.rs.Seek(offset, whence) } -func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) { +func prepareReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, signedReplication bool) ([]byte, error) { srm, ok := src.(*demuxReplicationMessage) if !ok { - return newReplicateMessage(id, src, signer) + return newReplicateMessage(id, src, signer, signedReplication) } srm.mtx.Lock() defer srm.mtx.Unlock() if srm.msg == nil && srm.err == nil { - srm.msg, srm.err = newReplicateMessage(id, src, signer) + srm.msg, srm.err = newReplicateMessage(id, src, signer, signedReplication) } return srm.msg, srm.err } -func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) ([]byte, error) { +func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer, requireObjectSignature bool) ([]byte, error) { var objSize uint64 switch v := src.(type) { default: @@ -169,13 +174,15 @@ func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer const fieldNumObject = 1 const fieldNumSignature = 2 + const fieldNumSignObjectFlag = 3 sigSize := protowire.SizeTag(fieldNumSigPubKey) + protowire.SizeBytes(len(bPubKey)) + protowire.SizeTag(fieldNumSigVal) + protowire.SizeBytes(len(idSig)) + protowire.SizeTag(fieldNumSigScheme) + protowire.SizeVarint(sigScheme) msgSize := protowire.SizeTag(fieldNumObject) + protowire.SizeVarint(objSize) + - protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize) + protowire.SizeTag(fieldNumSignature) + protowire.SizeBytes(sigSize) + + protowire.SizeTag(fieldNumSignObjectFlag) + protowire.SizeVarint(protowire.EncodeBool(requireObjectSignature)) // TODO(#544): support external buffers msg := make([]byte, 0, uint64(msgSize)+objSize) @@ -198,12 +205,17 @@ func newReplicateMessage(id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer msg = protowire.AppendBytes(msg, idSig) msg = protowire.AppendTag(msg, fieldNumSigScheme, protowire.VarintType) msg = protowire.AppendVarint(msg, sigScheme) + msg = protowire.AppendTag(msg, fieldNumSignObjectFlag, protowire.VarintType) + msg = protowire.AppendVarint(msg, protowire.EncodeBool(requireObjectSignature)) return msg, nil } type replicateResponse struct { - err error + _sigRequested bool + + objSig *neofscrypto.Signature + err error } func (x replicateResponse) ToGRPCMessage() grpc.Message { @@ -226,6 +238,30 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error { } x.err = apistatus.ErrorFromV2(st) + if x.err != nil { + return nil + } + + if !x._sigRequested { + return nil + } + + sig := m.GetObjectSignature() + if sig == nil { + return errors.New("requested but missing signature") + } + + sigV2 := new(refs.Signature) + err := sigV2.Unmarshal(sig) + if err != nil { + return fmt.Errorf("decoding signature from proto message: %w", err) + } + + x.objSig = new(neofscrypto.Signature) + err = x.objSig.ReadFromV2(*sigV2) + if err != nil { + return fmt.Errorf("invalid signature: %w", err) + } return nil } diff --git a/client/object_replicate_test.go b/client/object_replicate_test.go index 1790c529..12255429 100644 --- a/client/object_replicate_test.go +++ b/client/object_replicate_test.go @@ -10,6 +10,7 @@ import ( "testing" objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -37,11 +38,13 @@ func BenchmarkPrepareReplicationMessage(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer) + _, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer, true) require.NoError(b, err) } } +var testDataToSign = []byte("requested data to sign") + type testReplicationServer struct { objectgrpc.UnimplementedObjectServiceServer @@ -121,6 +124,23 @@ func (x *testReplicationServer) Replicate(_ context.Context, req *objectgrpc.Rep return &resp, nil } + if req.GetSignObject() { + var sig neofscrypto.Signature + err = sig.Calculate(x.clientSigner, testDataToSign) + if err != nil { + st.Code = 1024 + st.Message = fmt.Sprintf("signing object information: %s", err) + resp.Status = &st + + return &resp, nil + } + + var sigV2 refs.Signature + sig.WriteToV2(&sigV2) + + resp.ObjectSignature = sigV2.StableMarshal(nil) + } + resp.Status = &status.Status{Code: x.respStatusCode} return &resp, nil } @@ -162,7 +182,7 @@ func TestClient_ReplicateObject(t *testing.T) { srv, cli := serveObjectReplication(t, signer, obj) srv.respStatusCode = 0 - err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer) + _, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false) require.NoError(t, err) }) @@ -170,7 +190,7 @@ func TestClient_ReplicateObject(t *testing.T) { bObj := []byte("Hello, world!") // definitely incorrect binary object _, cli := serveObjectReplication(t, signer, obj) - err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer) + _, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false) require.Error(t, err) }) @@ -187,16 +207,25 @@ func TestClient_ReplicateObject(t *testing.T) { srv, cli := serveObjectReplication(t, signer, obj) srv.respStatusCode = tc.code - err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer) + _, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, false) require.ErrorIs(t, err, tc.expErr, tc.desc) } }) + t.Run("sign object data", func(t *testing.T) { + srv, cli := serveObjectReplication(t, signer, obj) + srv.respStatusCode = 0 + + sig, err := cli.ReplicateObject(ctx, id, bytes.NewReader(bObj), signer, true) + require.NoError(t, err) + require.True(t, sig.Verify(testDataToSign)) + }) + t.Run("demux", func(t *testing.T) { demuxObj := DemuxReplicatedObject(bytes.NewReader(bObj)) _, cli := serveObjectReplication(t, signer, obj) - err := cli.ReplicateObject(ctx, id, demuxObj, signer) + _, err := cli.ReplicateObject(ctx, id, demuxObj, signer, false) require.NoError(t, err) msgCp := bytes.Clone(demuxObj.(*demuxReplicationMessage).msg) @@ -208,7 +237,7 @@ func TestClient_ReplicateObject(t *testing.T) { go func() { defer wg.Done() - err := cli.ReplicateObject(ctx, id, demuxObj, signer) + _, err := cli.ReplicateObject(ctx, id, demuxObj, signer, false) fmt.Println(err) require.NoError(t, err) }()