Skip to content

Commit

Permalink
node/object/replicate: add network magic object's meta information
Browse files Browse the repository at this point in the history
It saves from replay attacks and makes replication operation (and meta
information in particular) more explicit.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Sep 6, 2024
1 parent 8163db8 commit 96f271f
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 41 deletions.
6 changes: 5 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ func initObjectService(c *cfg) {
searchsvcV2.WithKeyStorage(keyStorage),
)

mNumber, err := c.shared.basics.cli.MagicNumber()
fatalOnErr(err)

Check warning on line 261 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L260-L261

Added lines #L260 - L261 were not covered by tests

sPut := putsvc.NewService(&transport{clients: putConstructor}, c,
putsvc.WithNetworkMagic(mNumber),

Check warning on line 264 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L264

Added line #L264 was not covered by tests
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
Expand Down Expand Up @@ -351,7 +355,7 @@ func initObjectService(c *cfg) {
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
}

server := objectTransportGRPC.New(firstSvc, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey))
server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey))

Check warning on line 358 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L358

Added line #L358 was not covered by tests

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down
20 changes: 12 additions & 8 deletions pkg/core/object/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,25 @@ import (

const (
validInterval = 10 // in epochs
currentVersion = 6 // it is also a number of fields
currentVersion = 7 // it is also a number of fields
)

const (
cidKey = "cid"
oidKey = "oid"
sizeKey = "size"
deletedKey = "deleted"
lockedKey = "locked"
validUntilKey = "validuntil"
networkMagicKey = "network"
cidKey = "cid"
oidKey = "oid"
sizeKey = "size"
deletedKey = "deleted"
lockedKey = "locked"
validUntilKey = "validuntil"
)

// EncodeReplicationMetaInfo uses NEO's map (strict order) serialized format as a raw
// representation of object's meta information.
//
// This (ordered) format is used (keys are strings):
//
// "network": network magic
// "cid": _raw_ container ID (32 bytes)
// "oid": _raw_ object ID (32 bytes)
// "size": payload size
Expand All @@ -35,8 +37,10 @@ const (
// "validuntil": last valid epoch number for meta information
//
// Last valid epoch is object's creation epoch + 10.
func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64, deleted, locked []oid.ID, createdAt uint64) []byte {
func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64,
deleted, locked []oid.ID, createdAt uint64, magicNumber uint32) []byte {
kvs := []stackitem.MapElement{
kv(networkMagicKey, magicNumber),
kv(cidKey, cID[:]),
kv(oidKey, oID[:]),
kv(sizeKey, pSize),
Expand Down
30 changes: 17 additions & 13 deletions pkg/core/object/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
)

func TestMetaInfo(t *testing.T) {
network := rand.Uint32()
oID := oidtest.ID()
cID := cidtest.ID()
size := rand.Uint64()
deleted := oidtest.IDs(10)
locked := oidtest.IDs(10)
validUntil := rand.Uint64()

raw := EncodeReplicationMetaInfo(cID, oID, size, deleted, locked, validUntil)
raw := EncodeReplicationMetaInfo(cID, oID, size, deleted, locked, validUntil, network)
item, err := stackitem.Deserialize(raw)
require.NoError(t, err)

Expand All @@ -30,23 +31,26 @@ func TestMetaInfo(t *testing.T) {

require.Len(t, mm, currentVersion)

require.Equal(t, cidKey, string(mm[0].Key.Value().([]byte)))
require.Equal(t, cID[:], mm[0].Value.Value().([]byte))
require.Equal(t, networkMagicKey, string(mm[0].Key.Value().([]byte)))
require.Equal(t, network, uint32(mm[0].Value.Value().(*big.Int).Uint64()))

require.Equal(t, oidKey, string(mm[1].Key.Value().([]byte)))
require.Equal(t, oID[:], mm[1].Value.Value().([]byte))
require.Equal(t, cidKey, string(mm[1].Key.Value().([]byte)))
require.Equal(t, cID[:], mm[1].Value.Value().([]byte))

require.Equal(t, sizeKey, string(mm[2].Key.Value().([]byte)))
require.Equal(t, size, mm[2].Value.Value().(*big.Int).Uint64())
require.Equal(t, oidKey, string(mm[2].Key.Value().([]byte)))
require.Equal(t, oID[:], mm[2].Value.Value().([]byte))

require.Equal(t, deletedKey, string(mm[3].Key.Value().([]byte)))
require.Equal(t, deleted, stackItemToOIDs(t, mm[3].Value))
require.Equal(t, sizeKey, string(mm[3].Key.Value().([]byte)))
require.Equal(t, size, mm[3].Value.Value().(*big.Int).Uint64())

require.Equal(t, lockedKey, string(mm[4].Key.Value().([]byte)))
require.Equal(t, locked, stackItemToOIDs(t, mm[4].Value))
require.Equal(t, deletedKey, string(mm[4].Key.Value().([]byte)))
require.Equal(t, deleted, stackItemToOIDs(t, mm[4].Value))

require.Equal(t, validUntilKey, string(mm[5].Key.Value().([]byte)))
require.Equal(t, validUntil+validInterval, mm[5].Value.Value().(*big.Int).Uint64())
require.Equal(t, lockedKey, string(mm[5].Key.Value().([]byte)))
require.Equal(t, locked, stackItemToOIDs(t, mm[5].Value))

require.Equal(t, validUntilKey, string(mm[6].Key.Value().([]byte)))
require.Equal(t, validUntil+validInterval, mm[6].Value.Value().(*big.Int).Uint64())
}

func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID {
Expand Down
3 changes: 2 additions & 1 deletion pkg/network/transport/object/grpc/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) {
default:
}

metaInfo := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, o.CreationEpoch())
metaInfo := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked,
o.CreationEpoch(), s.mNumber)

var sig neofscrypto.Signature
err := sig.Calculate(s.signer, metaInfo)
Expand Down
17 changes: 9 additions & 8 deletions pkg/network/transport/object/grpc/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID
func TestServer_Replicate(t *testing.T) {
var noCallNode noCallTestNode
var noCallObjSvc noCallObjectService
noCallSrv := New(noCallObjSvc, &noCallNode, neofscryptotest.Signer())
noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer())
clientSigner := neofscryptotest.Signer()
clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public())
serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public())
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("apply storage policy failure", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())

node.cnrErr = errors.New("any error")

Expand All @@ -340,7 +340,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("client or server mismatches object's storage policy", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())

node.serverOutsideCnr = true
node.clientOutsideCnr = true
Expand All @@ -360,7 +360,7 @@ func TestServer_Replicate(t *testing.T) {

t.Run("local storage failure", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())

node.storeErr = errors.New("any error")

Expand All @@ -371,10 +371,11 @@ func TestServer_Replicate(t *testing.T) {
})

t.Run("meta information signature", func(t *testing.T) {
var mNumber uint32 = 123
signer := neofscryptotest.Signer()
reqForSignature, o := anyValidRequest(t, clientSigner, cnr, objID)
node := newTestNode(t, serverPubKey, clientPubKey, cnr, reqForSignature.Object)
srv := New(noCallObjSvc, node, signer)
srv := New(noCallObjSvc, mNumber, node, signer)

t.Run("signature not requested", func(t *testing.T) {
resp, err := srv.Replicate(context.Background(), reqForSignature)
Expand All @@ -400,13 +401,13 @@ func TestServer_Replicate(t *testing.T) {
require.NoError(t, sig.ReadFromV2(sigV2))

require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes())
require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, o.CreationEpoch())))
require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, o.CreationEpoch(), mNumber)))
})
})

t.Run("OK", func(t *testing.T) {
node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object)
srv := New(noCallObjSvc, node, neofscryptotest.Signer())
srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer())

resp, err := srv.Replicate(context.Background(), req)
require.NoError(t, err)
Expand All @@ -433,7 +434,7 @@ func BenchmarkServer_Replicate(b *testing.B) {
ctx := context.Background()
var node nopNode

srv := New(nil, node, neofscryptotest.Signer())
srv := New(nil, 0, node, neofscryptotest.Signer())

for _, tc := range []struct {
name string
Expand Down
14 changes: 8 additions & 6 deletions pkg/network/transport/object/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ type Node interface {
type Server struct {
srv objectSvc.ServiceServer

node Node
signer neofscrypto.Signer
node Node
signer neofscrypto.Signer
mNumber uint32
}

// New creates, initializes and returns Server instance.
func New(c objectSvc.ServiceServer, node Node, signer neofscrypto.Signer) *Server {
func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer) *Server {
return &Server{
srv: c,
node: node,
signer: signer,
srv: c,
node: node,
signer: signer,
mNumber: magicNumber,
}
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ type preparedObjectTarget interface {
type distributedTarget struct {
placementIterator placementIterator

obj *objectSDK.Object
objMeta object.ContentMeta
objSharedMeta []byte
obj *objectSDK.Object
objMeta object.ContentMeta
networkMagicNumber uint32
objSharedMeta []byte

localNodeInContainer bool
localNodeSigner neofscrypto.Signer
Expand Down Expand Up @@ -140,7 +141,8 @@ func (t *distributedTarget) Close() (oid.ID, error) {
default:

Check warning on line 141 in pkg/services/object/put/distributed.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/distributed.go#L134-L141

Added lines #L134 - L141 were not covered by tests
}

t.objSharedMeta = object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), t.obj.PayloadSize(), deletedObjs, lockedObjs, t.obj.CreationEpoch())
t.objSharedMeta = object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), t.obj.PayloadSize(), deletedObjs,
lockedObjs, t.obj.CreationEpoch(), t.networkMagicNumber)

Check warning on line 145 in pkg/services/object/put/distributed.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/distributed.go#L144-L145

Added lines #L144 - L145 were not covered by tests
id, _ := t.obj.ID()
return id, t.placementIterator.iterateNodesForObject(id, t.sendObject)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/services/object/put/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type cfg struct {
clientConstructor ClientConstructor

log *zap.Logger

networkMagic uint32
}

func defaultCfg() *cfg {
Expand Down Expand Up @@ -203,3 +205,9 @@ func WithLogger(l *zap.Logger) Option {
c.log = l
}
}

func WithNetworkMagic(m uint32) Option {
return func(c *cfg) {
c.networkMagic = m

Check warning on line 211 in pkg/services/object/put/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/service.go#L209-L211

Added lines #L209 - L211 were not covered by tests
}
}
1 change: 1 addition & 0 deletions pkg/services/object/put/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target {
withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock)

return &distributedTarget{
networkMagicNumber: p.networkMagic,

Check warning on line 210 in pkg/services/object/put/streamer.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/put/streamer.go#L210

Added line #L210 was not covered by tests
placementIterator: placementIterator{
log: p.log,
neoFSNet: p.neoFSNet,
Expand Down

0 comments on commit 96f271f

Please sign in to comment.