From 14951bf02e3d1c67807bba9d99b1a1bd215008f0 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 24 Dec 2024 18:14:14 +0300 Subject: [PATCH 1/7] services/object: Refactor dependencies Split the node abstraction into FS chain and local storage ones. The service works only with these two root subsystems, so increasing the number of abstractions is not expected, while the structure of the real system is reflected better. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 45 ++++++------ pkg/services/object/server.go | 35 +++++---- pkg/services/object/server_test.go | 109 ++++++++++++++++------------- 3 files changed, 102 insertions(+), 87 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 59acd698a6..c13cd96eb5 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -312,12 +312,12 @@ func initObjectService(c *cfg) { // every object part in every chain will try to refer to the first part, so caching // should help a lot here const cachedFirstObjectsNumber = 1000 - objNode := newNodeForObjects(cnrNodes, sPut, c.IsLocalKey) + fsChain := newFSChainForObjects(cnrNodes, c.IsLocalKey, c.networkState) aclSvc := v2.New( v2.WithLogger(c.log), v2.WithIRFetcher(newCachedIRFetcher(irFetcher)), - v2.WithNetmapper(netmapSourceWithNodes{Source: c.netMapSource, n: objNode}), + v2.WithNetmapper(netmapSourceWithNodes{Source: c.netMapSource, fsChain: fsChain}), v2.WithContainerSource( c.cfgObject.cnrSource, ), @@ -341,7 +341,7 @@ func initObjectService(c *cfg) { c.respSvc, ) - server := objectService.New(respSvc, mNumber, objNode, c.shared.basics.key.PrivateKey, c.cfgNetmap.state, c.metricsCollector) + server := objectService.New(respSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) @@ -613,18 +613,17 @@ func (x *remoteContainerNodes) ForEachRemoteContainerNode(cnr cid.ID, f func(net }) } -// nodeForObjects represents NeoFS storage node for object storage. -type nodeForObjects struct { - putObjectService *putsvc.Service - containerNodes *containerNodes - isLocalPubKey func([]byte) bool +type fsChainForObjects struct { + netmap.StateDetailed + containerNodes *containerNodes + isLocalPubKey func([]byte) bool } -func newNodeForObjects(cnrNodes *containerNodes, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) *nodeForObjects { - return &nodeForObjects{ - putObjectService: putObjectService, - containerNodes: cnrNodes, - isLocalPubKey: isLocalPubKey, +func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) bool, ns netmap.StateDetailed) *fsChainForObjects { + return &fsChainForObjects{ + StateDetailed: ns, + containerNodes: cnrNodes, + isLocalPubKey: isLocalPubKey, } } @@ -633,7 +632,7 @@ func newNodeForObjects(cnrNodes *containerNodes, putObjectService *putsvc.Servic // epochs into f. When f returns false, nil is returned instantly. // // Implements [object.Node] interface. -func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error { +func (x *fsChainForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error { return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f) } @@ -641,16 +640,14 @@ func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, // local storage node in the network map. // // Implements [object.Node] interface. -func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool { +func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool { return x.isLocalPubKey(pubKey) } -// VerifyAndStoreObject checks given object's format and, if it is correct, -// saves the object in the node's local object storage. -// -// Implements [object.Node] interface. -func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error { - return x.putObjectService.ValidateAndStoreObjectLocally(obj) +type putObjectServiceWrapper putsvc.Service + +func (x *putObjectServiceWrapper) VerifyAndStoreObject(obj objectSDK.Object) error { + return (*putsvc.Service)(x).ValidateAndStoreObjectLocally(obj) } type objectSource struct { @@ -715,13 +712,13 @@ func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uin type netmapSourceWithNodes struct { netmap.Source - n *nodeForObjects + fsChain *fsChainForObjects } func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) { var serverInContainer bool - err := n.n.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool { - if n.n.isLocalPubKey(pubKey) { + err := n.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool { + if n.fsChain.isLocalPubKey(pubKey) { serverInContainer = true return false } diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 4347bc6c90..3018605994 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -88,8 +88,11 @@ type MetricCollector interface { AddGetPayload(int) } -// Node represents NeoFS storage node that is served by [Server]. -type Node interface { +// FSChain provides access to the FS chain required to serve NeoFS API Object +// service. +type FSChain interface { + netmap.StateDetailed + // ForEachContainerNodePublicKeyInLastTwoEpochs iterates over all nodes matching // the referenced container's storage policy at the current and the previous // NeoFS epochs, and passes their public keys into f. IterateContainerNodeKeys @@ -102,31 +105,35 @@ type Node interface { // IsOwnPublicKey checks whether given pubKey assigned to Node in the NeoFS // network map. IsOwnPublicKey(pubKey []byte) bool +} +// Storage groups ops of the node's object storage required to serve NeoFS API +// Object service. +type Storage interface { // VerifyAndStoreObject checks whether given object has correct format and, if - // so, saves it into local object storage of the Node. StoreObject is called - // only when the Node complies with the container's storage policy. + // so, saves it into the Storage. StoreObject is called only when local node + // complies with the container's storage policy. VerifyAndStoreObject(object.Object) error } type server struct { srv ServiceServer - node Node + fsChain FSChain + storage Storage signer ecdsa.PrivateKey mNumber uint32 - nmState netmap.StateDetailed metrics MetricCollector } // New provides protoobject.ObjectServiceServer for the given parameters. -func New(c ServiceServer, magicNumber uint32, node Node, signer ecdsa.PrivateKey, nmState netmap.StateDetailed, m MetricCollector) protoobject.ObjectServiceServer { +func New(c ServiceServer, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector) protoobject.ObjectServiceServer { return &server{ srv: c, - node: node, + fsChain: fsChain, + storage: st, signer: signer, mNumber: magicNumber, - nmState: nmState, metrics: m, } } @@ -540,9 +547,9 @@ func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest) } var clientInCnr, serverInCnr bool - err = s.node.ForEachContainerNodePublicKeyInLastTwoEpochs(cnr, func(pubKey []byte) bool { + err = s.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cnr, func(pubKey []byte) bool { if !serverInCnr { - serverInCnr = s.node.IsOwnPublicKey(pubKey) + serverInCnr = s.fsChain.IsOwnPublicKey(pubKey) } if !clientInCnr { clientInCnr = bytes.Equal(pubKey, req.Signature.Key) @@ -580,7 +587,7 @@ func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest) }}, nil } - err = s.node.VerifyAndStoreObject(*obj) + err = s.storage.VerifyAndStoreObject(*obj) if err != nil { return &protoobject.ReplicateResponse{Status: &protostatus.Status{ Code: codeInternal, @@ -644,8 +651,8 @@ func (s *server) metaInfoSignature(o object.Object) ([]byte, error) { default: } - currentBlock := s.nmState.CurrentBlock() - currentEpochDuration := s.nmState.CurrentEpochDuration() + currentBlock := s.fsChain.CurrentBlock() + currentEpochDuration := s.fsChain.CurrentEpochDuration() firstBlock := (uint64(currentBlock)/currentEpochDuration + 1) * currentEpochDuration secondBlock := firstBlock + currentEpochDuration thirdBlock := secondBlock + currentEpochDuration diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index fc6f2a6740..a79addc90f 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -67,19 +67,19 @@ func (x noCallObjectService) GetRangeHash(context.Context, *objectV2.GetRangeHas panic("must not be called") } -type noCallTestNode struct{} +type noCallTestFSChain struct{} -func (x *noCallTestNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func([]byte) bool) error { +func (x *noCallTestFSChain) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func([]byte) bool) error { panic("must not be called") } +func (x *noCallTestFSChain) IsOwnPublicKey([]byte) bool { panic("must not be called") } +func (x *noCallTestFSChain) CurrentEpoch() uint64 { panic("must not be called") } +func (x *noCallTestFSChain) CurrentBlock() uint32 { panic("must not be called") } +func (x *noCallTestFSChain) CurrentEpochDuration() uint64 { panic("must not be called") } -func (x *noCallTestNode) IsOwnPublicKey([]byte) bool { - panic("must not be called") -} +type noCallTestStorage struct{} -func (x *noCallTestNode) VerifyAndStoreObject(object.Object) error { - panic("must not be called") -} +func (noCallTestStorage) VerifyAndStoreObject(object.Object) error { panic("must not be called") } type nopMetrics struct{} @@ -87,7 +87,8 @@ func (nopMetrics) HandleOpExecResult(stat.Method, bool, time.Duration) {} func (nopMetrics) AddPutPayload(int) {} func (nopMetrics) AddGetPayload(int) {} -type testNode struct { +type testFSChain struct { + nopFSChain tb testing.TB // server state @@ -96,27 +97,23 @@ type testNode struct { // request data clientPubKey []byte cnr cid.ID - obj *objectgrpc.Object // return cnrErr error clientOutsideCnr bool serverOutsideCnr bool - - storeErr error } -func newTestNode(tb testing.TB, serverPubKey, clientPubKey []byte, cnr cid.ID, obj *objectgrpc.Object) *testNode { - return &testNode{ +func newTestFSChain(tb testing.TB, serverPubKey, clientPubKey []byte, cnr cid.ID) *testFSChain { + return &testFSChain{ tb: tb, serverPubKey: serverPubKey, clientPubKey: clientPubKey, cnr: cnr, - obj: obj, } } -func (x *testNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cnr cid.ID, f func(pubKey []byte) bool) error { +func (x *testFSChain) ForEachContainerNodePublicKeyInLastTwoEpochs(cnr cid.ID, f func(pubKey []byte) bool) error { require.True(x.tb, cnr == x.cnr) require.NotNil(x.tb, f) if x.cnrErr != nil { @@ -131,10 +128,22 @@ func (x *testNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cnr cid.ID, f fu return nil } -func (x *testNode) IsOwnPublicKey(pubKey []byte) bool { return bytes.Equal(x.serverPubKey, pubKey) } +func (x *testFSChain) IsOwnPublicKey(pubKey []byte) bool { return bytes.Equal(x.serverPubKey, pubKey) } -func (x *testNode) VerifyAndStoreObject(obj object.Object) error { - require.Equal(x.tb, x.obj, obj.ToV2().ToGRPCMessage().(*objectgrpc.Object)) +type testStorage struct { + t testing.TB + // request data + obj *objectgrpc.Object + // return + storeErr error +} + +func newTestStorage(t testing.TB, obj *objectgrpc.Object) *testStorage { + return &testStorage{t: t, obj: obj} +} + +func (x *testStorage) VerifyAndStoreObject(obj object.Object) error { + require.Equal(x.t, x.obj, obj.ToV2().ToGRPCMessage().(*objectgrpc.Object)) return x.storeErr } @@ -173,9 +182,10 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID } func TestServer_Replicate(t *testing.T) { - var noCallNode noCallTestNode + var noCallFSChain noCallTestFSChain var noCallObjSvc noCallObjectService - noCallSrv := objectSvc.New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer().ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + var noCallStorage noCallTestStorage + noCallSrv := objectSvc.New(noCallObjSvc, 0, &noCallFSChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) clientSigner := neofscryptotest.Signer() clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public()) @@ -338,10 +348,10 @@ func TestServer_Replicate(t *testing.T) { }) t.Run("apply storage policy failure", func(t *testing.T) { - node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := objectSvc.New(noCallObjSvc, 0, node, neofscryptotest.Signer().ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) - node.cnrErr = errors.New("any error") + fsChain.cnrErr = errors.New("any error") resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -350,18 +360,18 @@ func TestServer_Replicate(t *testing.T) { }) t.Run("client or server mismatches object's storage policy", func(t *testing.T) { - node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := objectSvc.New(noCallObjSvc, 0, node, neofscryptotest.Signer().ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) - node.serverOutsideCnr = true - node.clientOutsideCnr = true + fsChain.serverOutsideCnr = true + fsChain.clientOutsideCnr = true resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) require.EqualValues(t, 2048, resp.GetStatus().GetCode()) require.Equal(t, "server does not match the object's storage policy", resp.GetStatus().GetMessage()) - node.serverOutsideCnr = false + fsChain.serverOutsideCnr = false resp, err = srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -370,10 +380,11 @@ func TestServer_Replicate(t *testing.T) { }) t.Run("local storage failure", func(t *testing.T) { - node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := objectSvc.New(noCallObjSvc, 0, node, neofscryptotest.Signer().ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) + s := newTestStorage(t, req.Object) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, s, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) - node.storeErr = errors.New("any error") + s.storeErr = errors.New("any error") resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -385,8 +396,9 @@ func TestServer_Replicate(t *testing.T) { var mNumber uint32 = 123 signer := neofscryptotest.Signer() reqForSignature, o := anyValidRequest(t, clientSigner, cnr, objID) - node := newTestNode(t, serverPubKey, clientPubKey, cnr, reqForSignature.Object) - srv := objectSvc.New(noCallObjSvc, mNumber, node, signer.ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) + s := newTestStorage(t, reqForSignature.Object) + srv := objectSvc.New(noCallObjSvc, mNumber, fsChain, s, signer.ECDSAPrivateKey, nopMetrics{}) t.Run("signature not requested", func(t *testing.T) { resp, err := srv.Replicate(context.Background(), reqForSignature) @@ -427,8 +439,9 @@ func TestServer_Replicate(t *testing.T) { }) t.Run("OK", func(t *testing.T) { - node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := objectSvc.New(noCallObjSvc, 0, node, neofscryptotest.Signer().ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) + s := newTestStorage(t, req.Object) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, s, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -437,39 +450,37 @@ func TestServer_Replicate(t *testing.T) { }) } -type netmapStateDetailed struct{} +type nopFSChain struct{} -func (n netmapStateDetailed) CurrentEpoch() uint64 { +func (nopFSChain) CurrentEpoch() uint64 { return 123 } -func (n netmapStateDetailed) CurrentBlock() uint32 { +func (nopFSChain) CurrentBlock() uint32 { return 123 * 240 } -func (n netmapStateDetailed) CurrentEpochDuration() uint64 { +func (nopFSChain) CurrentEpochDuration() uint64 { return 240 } -type nopNode struct{} - -func (x nopNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error { +func (x nopFSChain) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error { return nil } -func (x nopNode) IsOwnPublicKey([]byte) bool { +func (x nopFSChain) IsOwnPublicKey([]byte) bool { return false } -func (x nopNode) VerifyAndStoreObject(object.Object) error { - return nil -} +type nopStorage struct{} + +func (nopStorage) VerifyAndStoreObject(object.Object) error { return nil } func BenchmarkServer_Replicate(b *testing.B) { ctx := context.Background() - var node nopNode + var fsChain nopFSChain - srv := objectSvc.New(nil, 0, node, neofscryptotest.Signer().ECDSAPrivateKey, netmapStateDetailed{}, nopMetrics{}) + srv := objectSvc.New(nil, 0, fsChain, nopStorage{}, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) for _, tc := range []struct { name string From d29bbd0e64068ed761e0909db1ccc71c5998e62a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 26 Dec 2024 17:25:12 +0300 Subject: [PATCH 2/7] services/object: Inline service setting response meta headers Continues 4bbe31e029421a14cef940c12952cf135241a52b. Now the service is gone. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 4 - cmd/neofs-node/object.go | 9 +- pkg/services/object/response.go | 156 -------------------- pkg/services/object/server.go | 10 +- pkg/services/object/server_test.go | 2 + pkg/services/util/response/client_stream.go | 47 ------ pkg/services/util/response/server_stream.go | 37 ----- pkg/services/util/response/service.go | 66 --------- pkg/services/util/response/unary.go | 21 --- pkg/services/util/sign.go | 18 --- 10 files changed, 13 insertions(+), 357 deletions(-) delete mode 100644 pkg/services/object/response.go delete mode 100644 pkg/services/util/response/client_stream.go delete mode 100644 pkg/services/util/response/server_stream.go delete mode 100644 pkg/services/util/response/service.go delete mode 100644 pkg/services/util/response/unary.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index f92f79bdcd..b0e300af84 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -50,7 +50,6 @@ import ( trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" "github.com/nspcc-dev/neofs-node/pkg/services/tree" - "github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/state" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -372,8 +371,6 @@ type shared struct { // whether the local node is in the netMap localNodeInNetmap atomic.Bool - respSvc *response.Service - policer *policer.Policer replicator *replicator.Replicator @@ -614,7 +611,6 @@ func initCfg(appCfg *config.Config) *cfg { c.shared = shared{ basics: basicSharedConfig, localAddr: netAddr, - respSvc: response.NewService(response.WithNetworkState(basicSharedConfig.networkState)), clientCache: cache.NewSDKClientCache(cacheOpts), bgClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts), diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index c13cd96eb5..9aea05fa24 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -293,7 +293,7 @@ func initObjectService(c *cfg) { ) // build service pipeline - // grpc | response | acl | split + // grpc | acl | split splitSvc := objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, @@ -336,12 +336,7 @@ func initObjectService(c *cfg) { var commonSvc objectService.Common commonSvc.Init(&c.internals, aclSvc) - respSvc := objectService.NewResponseService( - &commonSvc, - c.respSvc, - ) - - server := objectService.New(respSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector) + server := objectService.New(&commonSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go deleted file mode 100644 index c56facc45e..0000000000 --- a/pkg/services/object/response.go +++ /dev/null @@ -1,156 +0,0 @@ -package object - -import ( - "context" - "fmt" - - "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-node/pkg/services/util" - "github.com/nspcc-dev/neofs-node/pkg/services/util/response" -) - -type ResponseService struct { - respSvc *response.Service - - svc ServiceServer -} - -type searchStreamResponser struct { - util.ServerStream - - respWriter util.ResponseMessageWriter -} - -type getStreamResponser struct { - util.ServerStream - - respWriter util.ResponseMessageWriter -} - -type getRangeStreamResponser struct { - util.ServerStream - - respWriter util.ResponseMessageWriter -} - -type putStreamResponser struct { - stream *response.ClientMessageStreamer -} - -// NewResponseService returns object service instance that passes internal service -// call to response service. -func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService { - return &ResponseService{ - respSvc: respSvc, - svc: objSvc, - } -} - -func (s *getStreamResponser) Send(resp *object.GetResponse) error { - return s.respWriter(resp) -} - -func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error { - return s.svc.Get(req, &getStreamResponser{ - ServerStream: stream, - respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error { - return stream.Send(resp.(*object.GetResponse)) - }), - }) -} - -func (s *putStreamResponser) Send(req *object.PutRequest) error { - return s.stream.Send(req) -} - -func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) { - r, err := s.stream.CloseAndRecv() - if err != nil { - return nil, fmt.Errorf("(%T) could not receive response: %w", s, err) - } - - return r.(*object.PutResponse), nil -} - -func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) { - stream, err := s.svc.Put(ctx) - if err != nil { - return nil, fmt.Errorf("could not create Put object streamer: %w", err) - } - - return &putStreamResponser{ - stream: s.respSvc.CreateRequestStreamer( - func(req any) error { - return stream.Send(req.(*object.PutRequest)) - }, - func() (util.ResponseMessage, error) { - return stream.CloseAndRecv() - }, - ), - }, nil -} - -func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Head(ctx, req.(*object.HeadRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*object.HeadResponse), nil -} - -func (s *searchStreamResponser) Send(resp *object.SearchResponse) error { - return s.respWriter(resp) -} - -func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error { - return s.svc.Search(req, &searchStreamResponser{ - ServerStream: stream, - respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error { - return stream.Send(resp.(*object.SearchResponse)) - }), - }) -} - -func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.Delete(ctx, req.(*object.DeleteRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*object.DeleteResponse), nil -} - -func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error { - return s.respWriter(resp) -} - -func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { - return s.svc.GetRange(req, &getRangeStreamResponser{ - ServerStream: stream, - respWriter: s.respSvc.HandleServerStreamRequest(func(resp util.ResponseMessage) error { - return stream.Send(resp.(*object.GetRangeResponse)) - }), - }) -} - -func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - resp, err := s.respSvc.HandleUnaryRequest(ctx, req, - func(ctx context.Context, req any) (util.ResponseMessage, error) { - return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest)) - }, - ) - if err != nil { - return nil, err - } - - return resp.(*object.GetRangeHashResponse), nil -} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 3018605994..0e972190bf 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -27,6 +27,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/stat" + "github.com/nspcc-dev/neofs-sdk-go/version" ) // GetObjectStream is an interface of NeoFS API v2 compatible object streamer. @@ -143,7 +144,14 @@ func (s *server) pushOpExecResult(op stat.Method, err error, startedAt time.Time } func (s *server) makeResponseMetaHeader(st *protostatus.Status) *protosession.ResponseMetaHeader { - return &protosession.ResponseMetaHeader{Status: st} + v := version.Current() + var v2 refsv2.Version + v.WriteToV2(&v2) + return &protosession.ResponseMetaHeader{ + Version: v2.ToGRPCMessage().(*refs.Version), + Epoch: s.fsChain.CurrentEpoch(), + Status: st, + } } func (s *server) sendPutResponse(stream protoobject.ObjectService_PutServer, resp *protoobject.PutResponse) error { diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index a79addc90f..be31cacd35 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -130,6 +130,8 @@ func (x *testFSChain) ForEachContainerNodePublicKeyInLastTwoEpochs(cnr cid.ID, f func (x *testFSChain) IsOwnPublicKey(pubKey []byte) bool { return bytes.Equal(x.serverPubKey, pubKey) } +func (x *testFSChain) CurrentEpoch() uint64 { return 0 } + type testStorage struct { t testing.TB // request data diff --git a/pkg/services/util/response/client_stream.go b/pkg/services/util/response/client_stream.go deleted file mode 100644 index d0c6a9fac9..0000000000 --- a/pkg/services/util/response/client_stream.go +++ /dev/null @@ -1,47 +0,0 @@ -package response - -import ( - "fmt" - - "github.com/nspcc-dev/neofs-node/pkg/services/util" -) - -// ClientMessageStreamer represents client-side message streamer -// that sets meta values to the response. -type ClientMessageStreamer struct { - cfg *cfg - - send util.RequestMessageWriter - - close util.ClientStreamCloser -} - -// Send calls send method of internal streamer. -func (s *ClientMessageStreamer) Send(req any) error { - if err := s.send(req); err != nil { - return fmt.Errorf("(%T) could not send the request: %w", s, err) - } - return nil -} - -// CloseAndRecv closes internal stream, receivers the response, -// sets meta values and returns the result. -func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) { - resp, err := s.close() - if err != nil { - return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err) - } - - setMeta(resp, s.cfg) - - return resp, nil -} - -// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance. -func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer { - return &ClientMessageStreamer{ - cfg: s.cfg, - send: sender, - close: closer, - } -} diff --git a/pkg/services/util/response/server_stream.go b/pkg/services/util/response/server_stream.go deleted file mode 100644 index 33534f42fd..0000000000 --- a/pkg/services/util/response/server_stream.go +++ /dev/null @@ -1,37 +0,0 @@ -package response - -import ( - "fmt" - - "github.com/nspcc-dev/neofs-node/pkg/services/util" -) - -// ServerMessageStreamer represents server-side message streamer -// that sets meta values to all response messages. -type ServerMessageStreamer struct { - cfg *cfg - - recv util.ResponseMessageReader -} - -// Recv calls Recv method of internal streamer, sets response meta -// values and returns the response. -func (s *ServerMessageStreamer) Recv() (util.ResponseMessage, error) { - m, err := s.recv() - if err != nil { - return nil, fmt.Errorf("could not receive response message for signing: %w", err) - } - - setMeta(m, s.cfg) - - return m, nil -} - -// HandleServerStreamRequest builds internal streamer via handlers, wraps it to ServerMessageStreamer and returns the result. -func (s *Service) HandleServerStreamRequest(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter { - return func(resp util.ResponseMessage) error { - setMeta(resp, s.cfg) - - return respWriter(resp) - } -} diff --git a/pkg/services/util/response/service.go b/pkg/services/util/response/service.go deleted file mode 100644 index 6e7df925c6..0000000000 --- a/pkg/services/util/response/service.go +++ /dev/null @@ -1,66 +0,0 @@ -package response - -import ( - "github.com/nspcc-dev/neofs-api-go/v2/refs" - "github.com/nspcc-dev/neofs-api-go/v2/session" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/services/util" - "github.com/nspcc-dev/neofs-sdk-go/version" -) - -// Service represents universal v2 service -// that sets response meta header values. -type Service struct { - cfg *cfg -} - -// Option is an option of Service constructor. -type Option func(*cfg) - -type cfg struct { - version refs.Version - - state netmap.State -} - -func defaultCfg() *cfg { - var c cfg - - version.Current().WriteToV2(&c.version) - - return &c -} - -// NewService creates, initializes and returns Service instance. -func NewService(opts ...Option) *Service { - c := defaultCfg() - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -func setMeta(resp util.ResponseMessage, cfg *cfg) { - meta := new(session.ResponseMetaHeader) - meta.SetVersion(&cfg.version) - meta.SetTTL(1) // FIXME: #1160 TTL must be calculated - meta.SetEpoch(cfg.state.CurrentEpoch()) - - if origin := resp.GetMetaHeader(); origin != nil { - // FIXME: #1160 what if origin is set by local server? - meta.SetOrigin(origin) - } - - resp.SetMetaHeader(meta) -} - -// WithNetworkState returns option to set network state of Service. -func WithNetworkState(v netmap.State) Option { - return func(c *cfg) { - c.state = v - } -} diff --git a/pkg/services/util/response/unary.go b/pkg/services/util/response/unary.go deleted file mode 100644 index 604eed46d9..0000000000 --- a/pkg/services/util/response/unary.go +++ /dev/null @@ -1,21 +0,0 @@ -package response - -import ( - "context" - "fmt" - - "github.com/nspcc-dev/neofs-node/pkg/services/util" -) - -// HandleUnaryRequest call passes request to handler, sets response meta header values and returns it. -func (s *Service) HandleUnaryRequest(ctx context.Context, req any, handler util.UnaryHandler) (util.ResponseMessage, error) { - // process request - resp, err := handler(ctx, req) - if err != nil { - return nil, fmt.Errorf("could not handle request: %w", err) - } - - setMeta(resp, s.cfg) - - return resp, nil -} diff --git a/pkg/services/util/sign.go b/pkg/services/util/sign.go index afdbd5f6f4..a25c04ed23 100644 --- a/pkg/services/util/sign.go +++ b/pkg/services/util/sign.go @@ -1,34 +1,16 @@ package util import ( - "context" "crypto/ecdsa" "errors" "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" - "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "google.golang.org/protobuf/proto" ) -// ResponseMessage is an interface of NeoFS response message. -type ResponseMessage interface { - GetMetaHeader() *session.ResponseMetaHeader - SetMetaHeader(*session.ResponseMetaHeader) -} - -type UnaryHandler func(context.Context, any) (ResponseMessage, error) - -type ResponseMessageWriter func(ResponseMessage) error - -type ResponseMessageReader func() (ResponseMessage, error) - -type RequestMessageWriter func(any) error - -type ClientStreamCloser func() (ResponseMessage, error) - func SignResponse[R proto.Message, RV2 any, RV2PTR interface { *RV2 ToGRPCMessage() grpc.Message From c7126ac245594fbee9326659fc95693be303e8e2 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sun, 22 Dec 2024 13:34:34 +0300 Subject: [PATCH 3/7] services/object: Inline service checking node maintenance Continues d29bbd0e64068ed761e0909db1ccc71c5998e62a. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 7 --- cmd/neofs-node/object.go | 16 ++++-- pkg/services/object/common.go | 91 ------------------------------ pkg/services/object/server.go | 38 +++++++++++++ pkg/services/object/server_test.go | 13 +++-- 5 files changed, 57 insertions(+), 108 deletions(-) delete mode 100644 pkg/services/object/common.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index b0e300af84..214946d610 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -315,13 +315,6 @@ func (c *internals) stopMaintenance() { c.log.Info("stopped local node's maintenance") } -// IsMaintenance checks if storage node is under maintenance. -// -// Provides util.NodeState to Object service. -func (c *internals) IsMaintenance() bool { - return c.isMaintenance.Load() -} - type basics struct { networkState *networkState netMapSource netmapCore.Source diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 9aea05fa24..8f3ebd7c61 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" lru "github.com/hashicorp/golang-lru/v2" "github.com/nspcc-dev/neofs-api-go/v2/object" @@ -312,7 +313,7 @@ func initObjectService(c *cfg) { // every object part in every chain will try to refer to the first part, so caching // should help a lot here const cachedFirstObjectsNumber = 1000 - fsChain := newFSChainForObjects(cnrNodes, c.IsLocalKey, c.networkState) + fsChain := newFSChainForObjects(cnrNodes, c.IsLocalKey, c.networkState, &c.isMaintenance) aclSvc := v2.New( v2.WithLogger(c.log), @@ -333,10 +334,7 @@ func initObjectService(c *cfg) { ), ) - var commonSvc objectService.Common - commonSvc.Init(&c.internals, aclSvc) - - server := objectService.New(&commonSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector) + server := objectService.New(aclSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) @@ -612,13 +610,15 @@ type fsChainForObjects struct { netmap.StateDetailed containerNodes *containerNodes isLocalPubKey func([]byte) bool + isMaintenance *atomic.Bool } -func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) bool, ns netmap.StateDetailed) *fsChainForObjects { +func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) bool, ns netmap.StateDetailed, isMaintenance *atomic.Bool) *fsChainForObjects { return &fsChainForObjects{ StateDetailed: ns, containerNodes: cnrNodes, isLocalPubKey: isLocalPubKey, + isMaintenance: isMaintenance, } } @@ -639,6 +639,10 @@ func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool { return x.isLocalPubKey(pubKey) } +// LocalNodeUnderMaintenance checks whether local storage node is under +// maintenance now. +func (x *fsChainForObjects) LocalNodeUnderMaintenance() bool { return x.isMaintenance.Load() } + type putObjectServiceWrapper putsvc.Service func (x *putObjectServiceWrapper) VerifyAndStoreObject(obj objectSDK.Object) error { diff --git a/pkg/services/object/common.go b/pkg/services/object/common.go deleted file mode 100644 index babce3c4df..0000000000 --- a/pkg/services/object/common.go +++ /dev/null @@ -1,91 +0,0 @@ -package object - -import ( - "context" - - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" -) - -// NodeState is storage node state processed by Object service. -type NodeState interface { - // IsMaintenance checks if node is under maintenance. Node MUST NOT serve - // local object operations. Node MUST respond with apistatus.NodeUnderMaintenance - // error if IsMaintenance returns true. - IsMaintenance() bool -} - -// Common is an Object API ServiceServer which encapsulates logic spread to all -// object operations. -// -// If underlying NodeState.IsMaintenance returns true, all operations are -// immediately failed with apistatus.NodeUnderMaintenance. -type Common struct { - state NodeState - - nextHandler ServiceServer -} - -// Init initializes the Common instance. -func (x *Common) Init(state NodeState, nextHandler ServiceServer) { - x.state = state - x.nextHandler = nextHandler -} - -var errMaintenance apistatus.NodeUnderMaintenance - -func (x *Common) Get(req *objectV2.GetRequest, stream GetObjectStream) error { - if x.state.IsMaintenance() { - return errMaintenance - } - - return x.nextHandler.Get(req, stream) -} - -func (x *Common) Put(ctx context.Context) (PutObjectStream, error) { - if x.state.IsMaintenance() { - return nil, errMaintenance - } - - return x.nextHandler.Put(ctx) -} - -func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { - if x.state.IsMaintenance() { - return nil, errMaintenance - } - - return x.nextHandler.Head(ctx, req) -} - -func (x *Common) Search(req *objectV2.SearchRequest, stream SearchStream) error { - if x.state.IsMaintenance() { - return errMaintenance - } - - return x.nextHandler.Search(req, stream) -} - -func (x *Common) Delete(ctx context.Context, req *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) { - if x.state.IsMaintenance() { - return nil, errMaintenance - } - - return x.nextHandler.Delete(ctx, req) -} - -func (x *Common) GetRange(req *objectV2.GetRangeRequest, stream GetObjectRangeStream) error { - if x.state.IsMaintenance() { - return errMaintenance - } - - return x.nextHandler.GetRange(req, stream) -} - -func (x *Common) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { - if x.state.IsMaintenance() { - return nil, errMaintenance - } - - return x.nextHandler.GetRangeHash(ctx, req) -} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 0e972190bf..afc3baef3b 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -106,6 +106,10 @@ type FSChain interface { // IsOwnPublicKey checks whether given pubKey assigned to Node in the NeoFS // network map. IsOwnPublicKey(pubKey []byte) bool + + // LocalNodeUnderMaintenance checks whether local node is under maintenance + // according to the network map from FSChain. + LocalNodeUnderMaintenance() bool } // Storage groups ops of the node's object storage required to serve NeoFS API @@ -202,6 +206,10 @@ func (s *server) Put(gStream protoobject.ObjectService_PutServer) error { return err } + if s.fsChain.LocalNodeUnderMaintenance() { + return s.sendStatusPutResponse(gStream, apistatus.ErrNodeUnderMaintenance) + } + if err = stream.Send(putReq); err != nil { err = s.sendStatusPutResponse(gStream, err) // assign for defer return err @@ -233,6 +241,11 @@ func (s *server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*p if err = signature.VerifyServiceMessage(delReq); err != nil { return s.makeStatusDeleteResponse(err), nil } + + if s.fsChain.LocalNodeUnderMaintenance() { + return s.makeStatusDeleteResponse(apistatus.ErrNodeUnderMaintenance), nil + } + resp, err := s.srv.Delete(ctx, delReq) if err != nil { return s.makeStatusDeleteResponse(err), nil @@ -265,6 +278,11 @@ func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto if err = signature.VerifyServiceMessage(searchReq); err != nil { return s.makeStatusHeadResponse(err), nil } + + if s.fsChain.LocalNodeUnderMaintenance() { + return s.makeStatusHeadResponse(apistatus.ErrNodeUnderMaintenance), nil + } + resp, err := s.srv.Head(ctx, searchReq) if err != nil { return s.makeStatusHeadResponse(err), nil @@ -296,6 +314,11 @@ func (s *server) GetRangeHash(ctx context.Context, req *protoobject.GetRangeHash if err = signature.VerifyServiceMessage(hashRngReq); err != nil { return s.makeStatusHashResponse(err), nil } + + if s.fsChain.LocalNodeUnderMaintenance() { + return s.makeStatusHashResponse(apistatus.ErrNodeUnderMaintenance), nil + } + resp, err := s.srv.GetRangeHash(ctx, hashRngReq) if err != nil { return s.makeStatusHashResponse(err), nil @@ -340,6 +363,11 @@ func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ if err = signature.VerifyServiceMessage(getReq); err != nil { return s.sendStatusGetResponse(gStream, err) } + + if s.fsChain.LocalNodeUnderMaintenance() { + return s.sendStatusGetResponse(gStream, apistatus.ErrNodeUnderMaintenance) + } + err = s.srv.Get( getReq, &getStreamerV2{ @@ -385,6 +413,11 @@ func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. if err = signature.VerifyServiceMessage(getRngReq); err != nil { return s.sendStatusRangeResponse(gStream, err) } + + if s.fsChain.LocalNodeUnderMaintenance() { + return s.sendStatusRangeResponse(gStream, apistatus.ErrNodeUnderMaintenance) + } + err = s.srv.GetRange( getRngReq, &getRangeStreamerV2{ @@ -430,6 +463,11 @@ func (s *server) Search(req *protoobject.SearchRequest, gStream protoobject.Obje if err = signature.VerifyServiceMessage(searchReq); err != nil { return s.sendStatusSearchResponse(gStream, err) } + + if s.fsChain.LocalNodeUnderMaintenance() { + return s.sendStatusSearchResponse(gStream, apistatus.ErrNodeUnderMaintenance) + } + err = s.srv.Search( searchReq, &searchStreamerV2{ diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index be31cacd35..0f9bab229b 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -72,10 +72,11 @@ type noCallTestFSChain struct{} func (x *noCallTestFSChain) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func([]byte) bool) error { panic("must not be called") } -func (x *noCallTestFSChain) IsOwnPublicKey([]byte) bool { panic("must not be called") } -func (x *noCallTestFSChain) CurrentEpoch() uint64 { panic("must not be called") } -func (x *noCallTestFSChain) CurrentBlock() uint32 { panic("must not be called") } -func (x *noCallTestFSChain) CurrentEpochDuration() uint64 { panic("must not be called") } +func (*noCallTestFSChain) IsOwnPublicKey([]byte) bool { panic("must not be called") } +func (*noCallTestFSChain) CurrentEpoch() uint64 { panic("must not be called") } +func (*noCallTestFSChain) CurrentBlock() uint32 { panic("must not be called") } +func (*noCallTestFSChain) CurrentEpochDuration() uint64 { panic("must not be called") } +func (*noCallTestFSChain) LocalNodeUnderMaintenance() bool { panic("must not be called") } type noCallTestStorage struct{} @@ -132,6 +133,8 @@ func (x *testFSChain) IsOwnPublicKey(pubKey []byte) bool { return bytes.Equal(x. func (x *testFSChain) CurrentEpoch() uint64 { return 0 } +func (*testFSChain) LocalNodeUnderMaintenance() bool { return false } + type testStorage struct { t testing.TB // request data @@ -474,6 +477,8 @@ func (x nopFSChain) IsOwnPublicKey([]byte) bool { return false } +func (nopFSChain) LocalNodeUnderMaintenance() bool { return false } + type nopStorage struct{} func (nopStorage) VerifyAndStoreObject(object.Object) error { return nil } From 0d86522f0873a9377d2228b7bc36bdbfdde5a34a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 24 Dec 2024 19:13:21 +0300 Subject: [PATCH 4/7] service/object: Inline ACL service Continues c7126ac245594fbee9326659fc95693be303e8e2. Stylistically adjusted by Roman Khimov Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 21 +- pkg/services/object/acl/eacl/v2/eacl_test.go | 109 ++--- pkg/services/object/acl/eacl/v2/headers.go | 131 +++-- pkg/services/object/acl/eacl/v2/xheader.go | 8 +- pkg/services/object/acl/v2/errors.go | 19 - pkg/services/object/acl/v2/opts.go | 15 - pkg/services/object/acl/v2/request.go | 4 +- pkg/services/object/acl/v2/service.go | 474 +++++++------------ pkg/services/object/acl/v2/util.go | 89 ++-- pkg/services/object/acl/v2/util_test.go | 15 +- pkg/services/object/server.go | 196 +++++++- pkg/services/object/server_test.go | 78 ++- 12 files changed, 641 insertions(+), 518 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 8f3ebd7c61..e6391d2f61 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -294,7 +294,7 @@ func initObjectService(c *cfg) { ) // build service pipeline - // grpc | acl | split + // grpc | split splitSvc := objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, @@ -322,19 +322,16 @@ func initObjectService(c *cfg) { v2.WithContainerSource( c.cfgObject.cnrSource, ), - v2.WithNextService(splitSvc), - v2.WithEACLChecker( - acl.NewChecker(new(acl.CheckerPrm). - SetNetmapState(c.cfgNetmap.state). - SetEACLSource(c.cfgObject.eaclSource). - SetValidator(eaclSDK.NewValidator()). - SetLocalStorage(ls). - SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber, c.log)), - ), - ), + ) + aclChecker := acl.NewChecker(new(acl.CheckerPrm). + SetNetmapState(c.cfgNetmap.state). + SetEACLSource(c.cfgObject.eaclSource). + SetValidator(eaclSDK.NewValidator()). + SetLocalStorage(ls). + SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber, c.log)), ) - server := objectService.New(aclSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector) + server := objectService.New(splitSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/pkg/services/object/acl/eacl/v2/eacl_test.go b/pkg/services/object/acl/eacl/v2/eacl_test.go index 4c5e1e1e7d..81aa882ef9 100644 --- a/pkg/services/object/acl/eacl/v2/eacl_test.go +++ b/pkg/services/object/acl/eacl/v2/eacl_test.go @@ -6,9 +6,11 @@ import ( "testing" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/refs" - "github.com/nspcc-dev/neofs-api-go/v2/session" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + apirefs "github.com/nspcc-dev/neofs-api-go/v2/refs" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" + protosession "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -44,40 +46,29 @@ func (s *testLocalStorage) Head(addr oid.Address) (*object.Object, error) { return s.obj, s.err } -func testXHeaders(strs ...string) []session.XHeader { - res := make([]session.XHeader, len(strs)/2) - - for i := 0; i < len(strs); i += 2 { - res[i/2].SetKey(strs[i]) - res[i/2].SetValue(strs[i+1]) - } - - return res -} - func TestHeadRequest(t *testing.T) { - req := new(objectV2.HeadRequest) - - meta := new(session.RequestMetaHeader) - req.SetMetaHeader(meta) - - body := new(objectV2.HeadRequestBody) - req.SetBody(body) - - addr := oidtest.Address() - - var addrV2 refs.Address - addr.WriteToV2(&addrV2) - - body.SetAddress(&addrV2) + cnr := cidtest.ID() + var cnr2 apirefs.ContainerID + cnr.WriteToV2(&cnr2) + id := oidtest.ID() + var id2 apirefs.ObjectID + id.WriteToV2(&id2) xKey := "x-key" xVal := "x-val" - xHdrs := testXHeaders( - xKey, xVal, - ) - - meta.SetXHeaders(xHdrs) + xHdrs := []*protosession.XHeader{{Key: xKey, Value: xVal}} + + req := &protoobject.HeadRequest{ + Body: &protoobject.HeadRequest_Body{ + Address: &refs.Address{ + ContainerId: cnr2.ToGRPCMessage().(*refs.ContainerID), + ObjectId: id2.ToGRPCMessage().(*refs.ObjectID), + }, + }, + MetaHeader: &protosession.RequestMetaHeader{ + XHeaders: xHdrs, + }, + } obj := object.New() @@ -105,24 +96,20 @@ func TestHeadRequest(t *testing.T) { lStorage := &testLocalStorage{ t: t, - expAddr: addr, + expAddr: oid.NewAddress(cnr, id), obj: obj, } - id := addr.Object() - newSource := func(t *testing.T) eaclSDK.TypedHeaderSource { hdrSrc, err := NewMessageHeaderSource( WithObjectStorage(lStorage), WithServiceRequest(req), - WithCID(addr.Container()), + WithCID(cnr), WithOID(&id)) require.NoError(t, err) return hdrSrc } - cnr := addr.Container() - unit := new(eaclSDK.ValidationUnit). WithContainerID(&cnr). WithOperation(eaclSDK.OperationHead). @@ -133,11 +120,11 @@ func TestHeadRequest(t *testing.T) { checkAction(t, eaclSDK.ActionDeny, validator, unit.WithHeaderSource(newSource(t))) - meta.SetXHeaders(nil) + req.MetaHeader.XHeaders = nil checkDefaultAction(t, validator, unit.WithHeaderSource(newSource(t))) - meta.SetXHeaders(xHdrs) + req.MetaHeader.XHeaders = xHdrs obj.SetAttributes() @@ -153,7 +140,7 @@ func TestHeadRequest(t *testing.T) { eaclSDK.ActionDeny, eaclSDK.OperationHead, []eaclSDK.Target{tgt}, - eaclSDK.NewFilterObjectWithID(addr.Object()), + eaclSDK.NewFilterObjectWithID(id), ) table = eaclSDK.ConstructTable([]eaclSDK.Record{r, rID}) @@ -187,34 +174,29 @@ func TestV2Split(t *testing.T) { originalObject.SetID(oid.ID{}) // no object ID for an original object in the first object originalObject.SetSignature(&neofscrypto.Signature{}) - originalObjectV2 := originalObject.ToV2() - firstObject := objecttest.Object() firstObject.SetSplitID(nil) // not V1 split firstObject.SetParent(&originalObject) require.NoError(t, firstObject.CalculateAndSetID()) - var firstIDV2 refs.ObjectID + var firstIDV2 apirefs.ObjectID firstID := firstObject.GetID() firstID.WriteToV2(&firstIDV2) - splitV2 := new(objectV2.SplitHeader) - splitV2.SetFirst(&firstIDV2) - splitV2.SetParentHeader(originalObjectV2.GetHeader()) - headerV2 := new(objectV2.Header) - headerV2.SetSplit(splitV2) - - objPart := new(objectV2.PutObjectPartInit) - objPart.SetHeader(headerV2) - - body := new(objectV2.PutRequestBody) - body.SetObjectPart(objPart) - - meta := new(session.RequestMetaHeader) - - req := new(objectV2.PutRequest) - req.SetMetaHeader(meta) - req.SetBody(body) + hs := &protoobject.Header_Split{ + ParentHeader: originalObject.ToV2().GetHeader().ToGRPCMessage().(*protoobject.Header), + First: firstIDV2.ToGRPCMessage().(*refs.ObjectID), + } + hdr := &protoobject.Header{ + Split: hs, + } + req := &protoobject.PutRequest{ + Body: &protoobject.PutRequest_Body{ + ObjectPart: &protoobject.PutRequest_Body_Init_{Init: &protoobject.PutRequest_Body_Init{ + Header: hdr, + }}, + }, + } priv, err := keys.NewPrivateKey() require.NoError(t, err) @@ -260,7 +242,6 @@ func TestV2Split(t *testing.T) { t.Run("denied by parent's attribute; non-first object", func(t *testing.T) { // get the first object from the "network" hdrSrc.header = &firstObject - headerV2.GetSplit().SetParent(nil) checkAction(t, eaclSDK.ActionDeny, validator, unit.WithHeaderSource(newSource(t))) }) @@ -270,7 +251,7 @@ func TestV2Split(t *testing.T) { originalObjectNoRestrictedAttr.SetID(oid.ID{}) // no object ID for an original object in the first object originalObjectNoRestrictedAttr.SetSignature(&neofscrypto.Signature{}) - splitV2.SetParentHeader(originalObjectNoRestrictedAttr.ToV2().GetHeader()) + hs.ParentHeader = originalObjectNoRestrictedAttr.ToV2().GetHeader().ToGRPCMessage().(*protoobject.Header) // allow an object whose first obj does not have the restricted attribute checkDefaultAction(t, validator, unit.WithHeaderSource(newSource(t))) diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index 5e1e82fed3..70ebce6ed6 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -6,8 +6,10 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/acl" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" refsV2 "github.com/nspcc-dev/neofs-api-go/v2/refs" - "github.com/nspcc-dev/neofs-api-go/v2/session" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" + session "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -92,14 +94,14 @@ func (h headerSource) HeadersOfType(typ eaclSDK.FilterHeaderType) ([]eaclSDK.Hea } } -type xHeader session.XHeader +type xHeader [2]string func (x xHeader) Key() string { - return (*session.XHeader)(&x).GetKey() + return x[0] } func (x xHeader) Value() string { - return (*session.XHeader)(&x).GetValue() + return x[1] } func requestHeaders(msg xHeaderSource) []eaclSDK.Header { @@ -115,8 +117,8 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { case requestXHeaderSource: switch req := m.req.(type) { case - *objectV2.GetRequest, - *objectV2.HeadRequest: + *protoobject.GetRequest, + *protoobject.HeadRequest: if h.obj == nil { return errMissingOID } @@ -126,23 +128,32 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { dst.objectHeaders = objHeaders dst.incompleteObjectHeaders = !completed case - *objectV2.GetRangeRequest, - *objectV2.GetRangeHashRequest, - *objectV2.DeleteRequest: + *protoobject.GetRangeRequest, + *protoobject.GetRangeHashRequest, + *protoobject.DeleteRequest: if h.obj == nil { return errMissingOID } dst.objectHeaders = addressHeaders(h.cnr, h.obj) - case *objectV2.PutRequest: - if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok { - splitHeader := v.GetHeader().GetSplit() - if splitHeader == nil || splitHeader.GetSplitID() != nil { + case *protoobject.PutRequest: + if v, ok := req.GetBody().GetObjectPart().(*protoobject.PutRequest_Body_Init_); ok { + if v == nil || v.Init == nil { + return errors.New("nil oneof field with heading part") + } + in := v.Init + splitHeader := in.Header.GetSplit() + if splitHeader == nil || splitHeader.SplitId != nil { // V1 split scheme or small object, only the received // object's header can be checked + mo := &protoobject.Object{ + ObjectId: in.ObjectId, + Header: in.Header, + } oV2 := new(objectV2.Object) - oV2.SetObjectID(v.GetObjectID()) - oV2.SetHeader(v.GetHeader()) + if err := oV2.FromGRPCMessage(mo); err != nil { + panic(err) + } dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) @@ -153,18 +164,27 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { parentHeader := splitHeader.GetParentHeader() if parentHeader != nil { + mo := &protoobject.Object{ + ObjectId: splitHeader.Parent, + Signature: splitHeader.ParentSignature, + Header: parentHeader, + } var parentObjectV2 objectV2.Object - parentObjectV2.SetObjectID(splitHeader.GetParent()) - parentObjectV2.SetSignature(splitHeader.GetParentSignature()) - parentObjectV2.SetHeader(parentHeader) + if err := parentObjectV2.FromGRPCMessage(mo); err != nil { + panic(err) + } dst.objectHeaders = headersFromObject(object.NewFromV2(&parentObjectV2), h.cnr, h.obj) } else { // middle object, parent header should // be received via the first object - if first := v.GetHeader().GetSplit().GetFirst(); first != nil { + if mf := in.Header.GetSplit().GetFirst(); mf != nil { var firstID oid.ID - err := firstID.ReadFromV2(*first) + var first refsV2.ObjectID + if err := first.FromGRPCMessage(mf); err != nil { + panic(err) + } + err := firstID.ReadFromV2(first) if err != nil { return fmt.Errorf("converting first object ID: %w", err) } @@ -184,12 +204,15 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { // first object not defined, unexpected, do not attach any header } } - case *objectV2.SearchRequest: - cnrV2 := req.GetBody().GetContainerID() + case *protoobject.SearchRequest: var cnr cid.ID - if cnrV2 != nil { - if err := cnr.ReadFromV2(*cnrV2); err != nil { + if mc := req.GetBody().GetContainerId(); mc != nil { + var cnrV2 refsV2.ContainerID + if err := cnrV2.FromGRPCMessage(mc); err != nil { + panic(err) + } + if err := cnr.ReadFromV2(cnrV2); err != nil { return fmt.Errorf("can't parse container ID: %w", err) } } @@ -203,37 +226,57 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { dst.objectHeaders = objectHeaders dst.incompleteObjectHeaders = !completed - case *objectV2.GetResponse: - if v, ok := resp.GetBody().GetObjectPart().(*objectV2.GetObjectPartInit); ok { + case *protoobject.GetResponse: + if v, ok := resp.GetBody().GetObjectPart().(*protoobject.GetResponse_Body_Init_); ok { + if v == nil || v.Init == nil { + return errors.New("nil oneof field with heading part") + } + mo := &protoobject.Object{ + ObjectId: v.Init.ObjectId, + Header: v.Init.Header, + } oV2 := new(objectV2.Object) - oV2.SetObjectID(v.GetObjectID()) - oV2.SetHeader(v.GetHeader()) + if err := oV2.FromGRPCMessage(mo); err != nil { + panic(err) + } dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) } - case *objectV2.HeadResponse: - oV2 := new(objectV2.Object) - - var hdr *objectV2.Header + case *protoobject.HeadResponse: + var hdr *protoobject.Header - switch v := resp.GetBody().GetHeaderPart().(type) { - case *objectV2.ShortHeader: - hdr = new(objectV2.Header) + switch v := resp.GetBody().GetHead().(type) { + case *protoobject.HeadResponse_Body_ShortHeader: + if v == nil || v.ShortHeader == nil { + return errors.New("nil oneof field with short header") + } var idV2 refsV2.ContainerID h.cnr.WriteToV2(&idV2) - hdr.SetContainerID(&idV2) - hdr.SetVersion(v.GetVersion()) - hdr.SetCreationEpoch(v.GetCreationEpoch()) - hdr.SetOwnerID(v.GetOwnerID()) - hdr.SetObjectType(v.GetObjectType()) - hdr.SetPayloadLength(v.GetPayloadLength()) - case *objectV2.HeaderWithSignature: - hdr = v.GetHeader() + h := v.ShortHeader + hdr = &protoobject.Header{ + Version: h.Version, + ContainerId: idV2.ToGRPCMessage().(*refs.ContainerID), + OwnerId: h.OwnerId, + CreationEpoch: h.CreationEpoch, + PayloadLength: h.PayloadLength, + ObjectType: h.ObjectType, + } + case *protoobject.HeadResponse_Body_Header: + if v == nil || v.Header == nil { + return errors.New("nil oneof field carrying header with signature") + } + hdr = v.Header.Header } - oV2.SetHeader(hdr) + mo := &protoobject.Object{ + Header: hdr, + } + oV2 := new(objectV2.Object) + if err := oV2.FromGRPCMessage(mo); err != nil { + panic(err) + } dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) } diff --git a/pkg/services/object/acl/eacl/v2/xheader.go b/pkg/services/object/acl/eacl/v2/xheader.go index aa6f5f9da8..dad68076db 100644 --- a/pkg/services/object/acl/eacl/v2/xheader.go +++ b/pkg/services/object/acl/eacl/v2/xheader.go @@ -1,7 +1,7 @@ package v2 import ( - "github.com/nspcc-dev/neofs-api-go/v2/session" + protosession "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" ) @@ -30,7 +30,7 @@ func (s requestXHeaderSource) GetXHeaders() []eaclSDK.Header { for meta := s.req.GetMetaHeader(); meta != nil; meta = meta.GetOrigin() { x := meta.GetXHeaders() for i := range x { - res = append(res, (xHeader)(x[i])) + res = append(res, xHeader{x[i].GetKey(), x[i].GetValue()}) } } @@ -39,7 +39,7 @@ func (s requestXHeaderSource) GetXHeaders() []eaclSDK.Header { func (s responseXHeaderSource) GetXHeaders() []eaclSDK.Header { ln := 0 - xHdrs := make([][]session.XHeader, 0) + xHdrs := make([][]*protosession.XHeader, 0) for meta := s.req.GetMetaHeader(); meta != nil; meta = meta.GetOrigin() { x := meta.GetXHeaders() @@ -53,7 +53,7 @@ func (s responseXHeaderSource) GetXHeaders() []eaclSDK.Header { for i := range xHdrs { for j := range xHdrs[i] { - res = append(res, xHeader(xHdrs[i][j])) + res = append(res, xHeader{xHdrs[i][j].GetKey(), xHdrs[i][j].GetValue()}) } } diff --git a/pkg/services/object/acl/v2/errors.go b/pkg/services/object/acl/v2/errors.go index 432894df89..cd2de174ae 100644 --- a/pkg/services/object/acl/v2/errors.go +++ b/pkg/services/object/acl/v2/errors.go @@ -2,8 +2,6 @@ package v2 import ( "fmt" - - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" ) const invalidRequestMessage = "malformed request" @@ -20,20 +18,3 @@ var ( errInvalidSessionOwner = malformedRequestError("invalid session token owner") errInvalidVerb = malformedRequestError("session token verb is invalid") ) - -const accessDeniedACLReasonFmt = "access to operation %s is denied by basic ACL check" -const accessDeniedEACLReasonFmt = "access to operation %s is denied by extended ACL check: %v" - -func basicACLErr(info RequestInfo) error { - var errAccessDenied apistatus.ObjectAccessDenied - errAccessDenied.WriteReason(fmt.Sprintf(accessDeniedACLReasonFmt, info.operation)) - - return errAccessDenied -} - -func eACLErr(info RequestInfo, err error) error { - var errAccessDenied apistatus.ObjectAccessDenied - errAccessDenied.WriteReason(fmt.Sprintf(accessDeniedEACLReasonFmt, info.operation, err)) - - return errAccessDenied -} diff --git a/pkg/services/object/acl/v2/opts.go b/pkg/services/object/acl/v2/opts.go index 34fd66ce46..24bed9f246 100644 --- a/pkg/services/object/acl/v2/opts.go +++ b/pkg/services/object/acl/v2/opts.go @@ -2,7 +2,6 @@ package v2 import ( "github.com/nspcc-dev/neofs-node/pkg/core/container" - objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "go.uber.org/zap" ) @@ -28,20 +27,6 @@ func WithContainerSource(v container.Source) Option { } } -// WithNextService returns option to set next object service. -func WithNextService(v objectSvc.ServiceServer) Option { - return func(c *cfg) { - c.next = v - } -} - -// WithEACLChecker returns option to set eACL checker. -func WithEACLChecker(v ACLChecker) Option { - return func(c *cfg) { - c.checker = v - } -} - // WithIRFetcher returns option to set inner ring fetcher. func WithIRFetcher(v InnerRingFetcher) Option { return func(c *cfg) { diff --git a/pkg/services/object/acl/v2/request.go b/pkg/services/object/acl/v2/request.go index b2faca6a5a..0f9ae5b9f9 100644 --- a/pkg/services/object/acl/v2/request.go +++ b/pkg/services/object/acl/v2/request.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" + protosession "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" "github.com/nspcc-dev/neofs-sdk-go/bearer" "github.com/nspcc-dev/neofs-sdk-go/container/acl" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -107,7 +107,7 @@ func (r RequestInfo) RequestRole() acl.Role { // MetaWithToken groups session and bearer tokens, // verification header and raw API request. type MetaWithToken struct { - vheader *sessionV2.RequestVerificationHeader + vheader *protosession.RequestVerificationHeader token *sessionSDK.Object bearer *bearer.Token src any diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index bdf4e910c8..d27aeee920 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -1,14 +1,14 @@ package v2 import ( - "context" "errors" "fmt" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + apirefs "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/services/object" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container/acl" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -26,35 +26,6 @@ type Service struct { c senderClassifier } -type putStreamBasicChecker struct { - source *Service - next object.PutObjectStream -} - -type getStreamBasicChecker struct { - checker ACLChecker - - object.GetObjectStream - - info RequestInfo -} - -type rangeStreamBasicChecker struct { - checker ACLChecker - - object.GetObjectRangeStream - - info RequestInfo -} - -type searchStreamBasicChecker struct { - checker ACLChecker - - object.SearchStream - - info RequestInfo -} - // Option represents Service constructor option. type Option func(*cfg) @@ -72,13 +43,9 @@ type cfg struct { containers container.Source - checker ACLChecker - irFetcher InnerRingFetcher nm Netmapper - - next object.ServiceServer } func defaultCfg() *cfg { @@ -101,10 +68,8 @@ func New(opts ...Option) Service { } } - panicOnNil(cfg.next, "next Service") panicOnNil(cfg.nm, "netmap client") panicOnNil(cfg.irFetcher, "inner Ring fetcher") - panicOnNil(cfg.checker, "acl checker") panicOnNil(cfg.containers, "container source") return Service{ @@ -117,38 +82,38 @@ func New(opts ...Option) Service { } } -// Get implements ServiceServer interface, makes ACL checks and calls -// next Get method in the ServiceServer pipeline. -func (b Service) Get(request *objectV2.GetRequest, stream object.GetObjectStream) error { +// ProcessGetRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. +func (b Service) ProcessGetRequest(request *protoobject.GetRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { - return err + return RequestInfo{}, err } obj, err := getObjectIDFromRequestBody(request.GetBody()) if err != nil { - return err + return RequestInfo{}, err } sTok, err := originalSessionToken(request.GetMetaHeader()) if err != nil { - return err + return RequestInfo{}, err } if sTok != nil { err = assertSessionRelation(*sTok, cnr, obj) if err != nil { - return err + return RequestInfo{}, err } } bTok, err := originalBearerToken(request.GetMetaHeader()) if err != nil { - return err + return RequestInfo{}, err } req := MetaWithToken{ - vheader: request.GetVerificationHeader(), + vheader: request.GetVerifyHeader(), token: sTok, bearer: bTok, src: request, @@ -156,65 +121,46 @@ func (b Service) Get(request *objectV2.GetRequest, stream object.GetObjectStream reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectGet) if err != nil { - return err + return RequestInfo{}, err } reqInfo.obj = obj - if !b.checker.CheckBasicACL(reqInfo) { - return basicACLErr(reqInfo) - } else if err := b.checker.CheckEACL(request, reqInfo); err != nil { - return eACLErr(reqInfo, err) - } - - return b.next.Get(request, &getStreamBasicChecker{ - GetObjectStream: stream, - info: reqInfo, - checker: b.checker, - }) -} - -func (b Service) Put(ctx context.Context) (object.PutObjectStream, error) { - streamer, err := b.next.Put(ctx) - - return putStreamBasicChecker{ - source: &b, - next: streamer, - }, err + return reqInfo, nil } -func (b Service) Head( - ctx context.Context, - request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { +// ProcessHeadRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. +func (b Service) ProcessHeadRequest(request *protoobject.HeadRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { - return nil, err + return RequestInfo{}, err } obj, err := getObjectIDFromRequestBody(request.GetBody()) if err != nil { - return nil, err + return RequestInfo{}, err } sTok, err := originalSessionToken(request.GetMetaHeader()) if err != nil { - return nil, err + return RequestInfo{}, err } if sTok != nil { err = assertSessionRelation(*sTok, cnr, obj) if err != nil { - return nil, err + return RequestInfo{}, err } } bTok, err := originalBearerToken(request.GetMetaHeader()) if err != nil { - return nil, err + return RequestInfo{}, err } req := MetaWithToken{ - vheader: request.GetVerificationHeader(), + vheader: request.GetVerifyHeader(), token: sTok, bearer: bTok, src: request, @@ -222,52 +168,41 @@ func (b Service) Head( reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectHead) if err != nil { - return nil, err + return RequestInfo{}, err } reqInfo.obj = obj - if !b.checker.CheckBasicACL(reqInfo) { - return nil, basicACLErr(reqInfo) - } else if err := b.checker.CheckEACL(request, reqInfo); err != nil { - return nil, eACLErr(reqInfo, err) - } - - resp, err := b.next.Head(ctx, request) - if err == nil { - if err = b.checker.CheckEACL(resp, reqInfo); err != nil { - err = eACLErr(reqInfo, err) - } - } - - return resp, err + return reqInfo, err } -func (b Service) Search(request *objectV2.SearchRequest, stream object.SearchStream) error { +// ProcessSearchRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. +func (b Service) ProcessSearchRequest(request *protoobject.SearchRequest) (RequestInfo, error) { id, err := getContainerIDFromRequest(request) if err != nil { - return err + return RequestInfo{}, err } sTok, err := originalSessionToken(request.GetMetaHeader()) if err != nil { - return err + return RequestInfo{}, err } if sTok != nil { err = assertSessionRelation(*sTok, id, nil) if err != nil { - return err + return RequestInfo{}, err } } bTok, err := originalBearerToken(request.GetMetaHeader()) if err != nil { - return err + return RequestInfo{}, err } req := MetaWithToken{ - vheader: request.GetVerificationHeader(), + vheader: request.GetVerifyHeader(), token: sTok, bearer: bTok, src: request, @@ -275,54 +210,44 @@ func (b Service) Search(request *objectV2.SearchRequest, stream object.SearchStr reqInfo, err := b.findRequestInfo(req, id, acl.OpObjectSearch) if err != nil { - return err - } - - if !b.checker.CheckBasicACL(reqInfo) { - return basicACLErr(reqInfo) - } else if err := b.checker.CheckEACL(request, reqInfo); err != nil { - return eACLErr(reqInfo, err) + return RequestInfo{}, err } - return b.next.Search(request, &searchStreamBasicChecker{ - checker: b.checker, - SearchStream: stream, - info: reqInfo, - }) + return reqInfo, nil } -func (b Service) Delete( - ctx context.Context, - request *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) { +// ProcessDeleteRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. +func (b Service) ProcessDeleteRequest(request *protoobject.DeleteRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { - return nil, err + return RequestInfo{}, err } obj, err := getObjectIDFromRequestBody(request.GetBody()) if err != nil { - return nil, err + return RequestInfo{}, err } sTok, err := originalSessionToken(request.GetMetaHeader()) if err != nil { - return nil, err + return RequestInfo{}, err } if sTok != nil { err = assertSessionRelation(*sTok, cnr, obj) if err != nil { - return nil, err + return RequestInfo{}, err } } bTok, err := originalBearerToken(request.GetMetaHeader()) if err != nil { - return nil, err + return RequestInfo{}, err } req := MetaWithToken{ - vheader: request.GetVerificationHeader(), + vheader: request.GetVerifyHeader(), token: sTok, bearer: bTok, src: request, @@ -330,50 +255,46 @@ func (b Service) Delete( reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectDelete) if err != nil { - return nil, err + return RequestInfo{}, err } reqInfo.obj = obj - if !b.checker.CheckBasicACL(reqInfo) { - return nil, basicACLErr(reqInfo) - } else if err := b.checker.CheckEACL(request, reqInfo); err != nil { - return nil, eACLErr(reqInfo, err) - } - - return b.next.Delete(ctx, request) + return reqInfo, nil } -func (b Service) GetRange(request *objectV2.GetRangeRequest, stream object.GetObjectRangeStream) error { +// ProcessRangeRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. +func (b Service) ProcessRangeRequest(request *protoobject.GetRangeRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { - return err + return RequestInfo{}, err } obj, err := getObjectIDFromRequestBody(request.GetBody()) if err != nil { - return err + return RequestInfo{}, err } sTok, err := originalSessionToken(request.GetMetaHeader()) if err != nil { - return err + return RequestInfo{}, err } if sTok != nil { err = assertSessionRelation(*sTok, cnr, obj) if err != nil { - return err + return RequestInfo{}, err } } bTok, err := originalBearerToken(request.GetMetaHeader()) if err != nil { - return err + return RequestInfo{}, err } req := MetaWithToken{ - vheader: request.GetVerificationHeader(), + vheader: request.GetVerifyHeader(), token: sTok, bearer: bTok, src: request, @@ -381,56 +302,46 @@ func (b Service) GetRange(request *objectV2.GetRangeRequest, stream object.GetOb reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectRange) if err != nil { - return err + return RequestInfo{}, err } reqInfo.obj = obj - if !b.checker.CheckBasicACL(reqInfo) { - return basicACLErr(reqInfo) - } else if err := b.checker.CheckEACL(request, reqInfo); err != nil { - return eACLErr(reqInfo, err) - } - - return b.next.GetRange(request, &rangeStreamBasicChecker{ - checker: b.checker, - GetObjectRangeStream: stream, - info: reqInfo, - }) + return reqInfo, nil } -func (b Service) GetRangeHash( - ctx context.Context, - request *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { +// ProcessHashRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. +func (b Service) ProcessHashRequest(request *protoobject.GetRangeHashRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { - return nil, err + return RequestInfo{}, err } obj, err := getObjectIDFromRequestBody(request.GetBody()) if err != nil { - return nil, err + return RequestInfo{}, err } sTok, err := originalSessionToken(request.GetMetaHeader()) if err != nil { - return nil, err + return RequestInfo{}, err } if sTok != nil { err = assertSessionRelation(*sTok, cnr, obj) if err != nil { - return nil, err + return RequestInfo{}, err } } bTok, err := originalBearerToken(request.GetMetaHeader()) if err != nil { - return nil, err + return RequestInfo{}, err } req := MetaWithToken{ - vheader: request.GetVerificationHeader(), + vheader: request.GetVerifyHeader(), token: sTok, bearer: bTok, src: request, @@ -438,185 +349,160 @@ func (b Service) GetRangeHash( reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectHash) if err != nil { - return nil, err + return RequestInfo{}, err } reqInfo.obj = obj - if !b.checker.CheckBasicACL(reqInfo) { - return nil, basicACLErr(reqInfo) - } else if err := b.checker.CheckEACL(request, reqInfo); err != nil { - return nil, eACLErr(reqInfo, err) - } - - return b.next.GetRangeHash(ctx, request) + return reqInfo, nil } -func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error { +var ErrSkipRequest = errors.New("skip request") + +// ProcessPutRequest resolves RequestInfo from the request to check it using +// [ACLChecker]. Returns [ErrSkipRequest] if check should not be performed. +func (b Service) ProcessPutRequest(request *protoobject.PutRequest) (RequestInfo, user.ID, error) { body := request.GetBody() if body == nil { - return errEmptyBody + return RequestInfo{}, user.ID{}, errEmptyBody } - part := body.GetObjectPart() - if part, ok := part.(*objectV2.PutObjectPartInit); ok { - cnr, err := getContainerIDFromRequest(request) - if err != nil { - return err - } - - header := part.GetHeader() - - cIDV2 := header.GetContainerID().GetValue() - var cID cid.ID - err = cID.Decode(cIDV2) - if err != nil { - return fmt.Errorf("invalid container ID: %w", err) - } - - inContainer, err := p.source.nm.ServerInContainer(cID) - if err != nil { - return fmt.Errorf("checking if node in container: %w", err) - } - - if header.GetSplit() != nil && !inContainer { - // skip ACL checks for split objects if it is not a container - // node, since it requires additional object operations (e.g., - // requesting the other split parts) that may (or may not) be - // prohibited by ACL rules; this node is not going to store such - // objects anyway - return p.next.Send(request) - } + part, ok := body.GetObjectPart().(*protoobject.PutRequest_Body_Init_) + if !ok { + return RequestInfo{}, user.ID{}, ErrSkipRequest + } + if part == nil || part.Init == nil { + return RequestInfo{}, user.ID{}, errors.New("nil oneof field with heading part") + } + cnr, err := getContainerIDFromRequest(request) + if err != nil { + return RequestInfo{}, user.ID{}, err + } - idV2 := header.GetOwnerID() - if idV2 == nil { - return errors.New("missing object owner") - } + header := part.Init.Header - var idOwner user.ID + cIDV2 := header.GetContainerId().GetValue() + var cID cid.ID + err = cID.Decode(cIDV2) + if err != nil { + return RequestInfo{}, user.ID{}, fmt.Errorf("invalid container ID: %w", err) + } - err = idOwner.ReadFromV2(*idV2) - if err != nil { - return fmt.Errorf("invalid object owner: %w", err) - } + inContainer, err := b.nm.ServerInContainer(cID) + if err != nil { + return RequestInfo{}, user.ID{}, fmt.Errorf("checking if node in container: %w", err) + } - objV2 := part.GetObjectID() - var obj *oid.ID + if header.GetSplit() != nil && !inContainer { + // skip ACL checks for split objects if it is not a container + // node, since it requires additional object operations (e.g., + // requesting the other split parts) that may (or may not) be + // prohibited by ACL rules; this node is not going to store such + // objects anyway + return RequestInfo{}, user.ID{}, ErrSkipRequest + } - if objV2 != nil { - obj = new(oid.ID) + mOwner := header.GetOwnerId() + if mOwner == nil { + return RequestInfo{}, user.ID{}, errors.New("missing object owner") + } - err = obj.ReadFromV2(*objV2) - if err != nil { - return err - } - } + var idOwner user.ID + var idV2 apirefs.OwnerID + if err := idV2.FromGRPCMessage(mOwner); err != nil { + panic(err) + } + err = idOwner.ReadFromV2(idV2) + if err != nil { + return RequestInfo{}, user.ID{}, fmt.Errorf("invalid object owner: %w", err) + } - sTok, err := originalSessionToken(request.GetMetaHeader()) - if err != nil { - return err - } + var obj *oid.ID - if sTok != nil { - if sTok.AssertVerb(sessionSDK.VerbObjectDelete) { - // if session relates to object's removal, we don't check - // relation of the tombstone to the session here since user - // can't predict tomb's ID. - err = assertSessionRelation(*sTok, cnr, nil) - } else { - err = assertSessionRelation(*sTok, cnr, obj) - } - - if err != nil { - return err - } + if part.Init.ObjectId != nil { + obj = new(oid.ID) + var objV2 apirefs.ObjectID + if err := objV2.FromGRPCMessage(part.Init.ObjectId); err != nil { + panic(err) } - - bTok, err := originalBearerToken(request.GetMetaHeader()) + err = obj.ReadFromV2(objV2) if err != nil { - return err + return RequestInfo{}, user.ID{}, err } + } - req := MetaWithToken{ - vheader: request.GetVerificationHeader(), - token: sTok, - bearer: bTok, - src: request, - } + sTok, err := originalSessionToken(request.GetMetaHeader()) + if err != nil { + return RequestInfo{}, user.ID{}, err + } - verb := acl.OpObjectPut - tombstone := part.GetHeader().GetObjectType() == objectV2.TypeTombstone - if tombstone { - // such objects are specific - saving them is essentially the removal of other - // objects - verb = acl.OpObjectDelete + if sTok != nil { + if sTok.AssertVerb(sessionSDK.VerbObjectDelete) { + // if session relates to object's removal, we don't check + // relation of the tombstone to the session here since user + // can't predict tomb's ID. + err = assertSessionRelation(*sTok, cnr, nil) + } else { + err = assertSessionRelation(*sTok, cnr, obj) } - reqInfo, err := p.source.findRequestInfo(req, cnr, verb) if err != nil { - return err - } - - replication := reqInfo.requestRole == acl.RoleContainer && request.GetMetaHeader().GetTTL() == 1 - if tombstone { - // the only exception when writing tombstone should not be treated as deletion - // is intra-container replication: container nodes must be able to replicate - // such objects while deleting is prohibited - if replication { - reqInfo.operation = acl.OpObjectPut - } - } - - if !replication { - // header length is unchecked for replication because introducing a restriction - // should not prevent the replication of objects created before. - // See also https://github.com/nspcc-dev/neofs-api/issues/293 - hdrLen := part.GetHeader().StableSize() - if hdrLen > objectsdk.MaxHeaderLen { - return fmt.Errorf("object header length exceeds the limit: %d>%d", hdrLen, objectsdk.MaxHeaderLen) - } + return RequestInfo{}, user.ID{}, err } + } - reqInfo.obj = obj + bTok, err := originalBearerToken(request.GetMetaHeader()) + if err != nil { + return RequestInfo{}, user.ID{}, err + } - if !p.source.checker.CheckBasicACL(reqInfo) || !p.source.checker.StickyBitCheck(reqInfo, idOwner) { - return basicACLErr(reqInfo) - } else if err := p.source.checker.CheckEACL(request, reqInfo); err != nil { - return eACLErr(reqInfo, err) - } + req := MetaWithToken{ + vheader: request.GetVerifyHeader(), + token: sTok, + bearer: bTok, + src: request, } - return p.next.Send(request) -} + verb := acl.OpObjectPut + tombstone := header.GetObjectType() == protoobject.ObjectType_TOMBSTONE + if tombstone { + // such objects are specific - saving them is essentially the removal of other + // objects + verb = acl.OpObjectDelete + } -func (p putStreamBasicChecker) CloseAndRecv() (*objectV2.PutResponse, error) { - return p.next.CloseAndRecv() -} + reqInfo, err := b.findRequestInfo(req, cnr, verb) + if err != nil { + return RequestInfo{}, user.ID{}, err + } -func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error { - if _, ok := resp.GetBody().GetObjectPart().(*objectV2.GetObjectPartInit); ok { - if err := g.checker.CheckEACL(resp, g.info); err != nil { - return eACLErr(g.info, err) + replication := reqInfo.requestRole == acl.RoleContainer && request.GetMetaHeader().GetTtl() == 1 + if tombstone { + // the only exception when writing tombstone should not be treated as deletion + // is intra-container replication: container nodes must be able to replicate + // such objects while deleting is prohibited + if replication { + reqInfo.operation = acl.OpObjectPut } } - return g.GetObjectStream.Send(resp) -} - -func (g *rangeStreamBasicChecker) Send(resp *objectV2.GetRangeResponse) error { - if err := g.checker.CheckEACL(resp, g.info); err != nil { - return eACLErr(g.info, err) + if !replication { + // header length is unchecked for replication because introducing a restriction + // should not prevent the replication of objects created before. + // See also https://github.com/nspcc-dev/neofs-api/issues/293 + var h2 objectV2.Header + if err := h2.FromGRPCMessage(header); err != nil { + panic(err) + } + hdrLen := h2.StableSize() + if hdrLen > objectsdk.MaxHeaderLen { + return RequestInfo{}, user.ID{}, fmt.Errorf("object header length exceeds the limit: %d>%d", hdrLen, objectsdk.MaxHeaderLen) + } } - return g.GetObjectRangeStream.Send(resp) -} - -func (g *searchStreamBasicChecker) Send(resp *objectV2.SearchResponse) error { - if err := g.checker.CheckEACL(resp, g.info); err != nil { - return eACLErr(g.info, err) - } + reqInfo.obj = obj - return g.SearchStream.Send(resp) + return reqInfo, idOwner, nil } func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (info RequestInfo, err error) { diff --git a/pkg/services/object/acl/v2/util.go b/pkg/services/object/acl/v2/util.go index fc77b97990..9c82f17f60 100644 --- a/pkg/services/object/acl/v2/util.go +++ b/pkg/services/object/acl/v2/util.go @@ -7,9 +7,12 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + apiacl "github.com/nspcc-dev/neofs-api-go/v2/acl" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" refsV2 "github.com/nspcc-dev/neofs-api-go/v2/refs" + refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" + protosession "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" "github.com/nspcc-dev/neofs-sdk-go/bearer" "github.com/nspcc-dev/neofs-sdk-go/container/acl" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -21,71 +24,84 @@ import ( var errMissingContainerID = errors.New("missing container ID") func getContainerIDFromRequest(req any) (cid.ID, error) { - var idV2 *refsV2.ContainerID + var mID *refs.ContainerID var id cid.ID switch v := req.(type) { - case *objectV2.GetRequest: - idV2 = v.GetBody().GetAddress().GetContainerID() - case *objectV2.PutRequest: - part, ok := v.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit) + case *protoobject.GetRequest: + mID = v.GetBody().GetAddress().GetContainerId() + case *protoobject.PutRequest: + part, ok := v.GetBody().GetObjectPart().(*protoobject.PutRequest_Body_Init_) if !ok { return cid.ID{}, errors.New("can't get container ID in chunk") } - - idV2 = part.GetHeader().GetContainerID() - case *objectV2.HeadRequest: - idV2 = v.GetBody().GetAddress().GetContainerID() - case *objectV2.SearchRequest: - idV2 = v.GetBody().GetContainerID() - case *objectV2.DeleteRequest: - idV2 = v.GetBody().GetAddress().GetContainerID() - case *objectV2.GetRangeRequest: - idV2 = v.GetBody().GetAddress().GetContainerID() - case *objectV2.GetRangeHashRequest: - idV2 = v.GetBody().GetAddress().GetContainerID() + if part == nil || part.Init == nil { + return cid.ID{}, errors.New("nil oneof heading part") + } + mID = part.Init.GetHeader().GetContainerId() + case *protoobject.HeadRequest: + mID = v.GetBody().GetAddress().GetContainerId() + case *protoobject.SearchRequest: + mID = v.GetBody().GetContainerId() + case *protoobject.DeleteRequest: + mID = v.GetBody().GetAddress().GetContainerId() + case *protoobject.GetRangeRequest: + mID = v.GetBody().GetAddress().GetContainerId() + case *protoobject.GetRangeHashRequest: + mID = v.GetBody().GetAddress().GetContainerId() default: return cid.ID{}, errors.New("unknown request type") } - if idV2 == nil { + if mID == nil { return cid.ID{}, errMissingContainerID } - return id, id.ReadFromV2(*idV2) + var idV2 refsV2.ContainerID + if err := idV2.FromGRPCMessage(mID); err != nil { + panic(err) + } + return id, id.ReadFromV2(idV2) } // originalBearerToken goes down to original request meta header and fetches // bearer token from there. -func originalBearerToken(header *sessionV2.RequestMetaHeader) (*bearer.Token, error) { +func originalBearerToken(header *protosession.RequestMetaHeader) (*bearer.Token, error) { for header.GetOrigin() != nil { header = header.GetOrigin() } - tokV2 := header.GetBearerToken() - if tokV2 == nil { + mt := header.GetBearerToken() + if mt == nil { return nil, nil } var tok bearer.Token - return &tok, tok.ReadFromV2(*tokV2) + var tokV2 apiacl.BearerToken + if err := tokV2.FromGRPCMessage(mt); err != nil { + panic(err) + } + return &tok, tok.ReadFromV2(tokV2) } // originalSessionToken goes down to original request meta header and fetches // session token from there. -func originalSessionToken(header *sessionV2.RequestMetaHeader) (*sessionSDK.Object, error) { +func originalSessionToken(header *protosession.RequestMetaHeader) (*sessionSDK.Object, error) { for header.GetOrigin() != nil { header = header.GetOrigin() } - tokV2 := header.GetSessionToken() - if tokV2 == nil { + mt := header.GetSessionToken() + if mt == nil { return nil, nil } var tok sessionSDK.Object - - err := tok.ReadFromV2(*tokV2) + var tokV2 sessionV2.Token + if err := tokV2.FromGRPCMessage(mt); err != nil { + panic(err) + } + err := tok.ReadFromV2(tokV2) if err != nil { return nil, fmt.Errorf("invalid session token: %w", err) } @@ -95,15 +111,18 @@ func originalSessionToken(header *sessionV2.RequestMetaHeader) (*sessionSDK.Obje // getObjectIDFromRequestBody decodes oid.ID from the common interface of the // object reference's holders. Returns an error if object ID is missing in the request. -func getObjectIDFromRequestBody(body interface{ GetAddress() *refsV2.Address }) (*oid.ID, error) { - idV2 := body.GetAddress().GetObjectID() - if idV2 == nil { +func getObjectIDFromRequestBody(body interface{ GetAddress() *refs.Address }) (*oid.ID, error) { + mID := body.GetAddress().GetObjectId() + if mID == nil { return nil, errors.New("missing object ID") } var id oid.ID - - err := id.ReadFromV2(*idV2) + var idV2 refsV2.ObjectID + if err := idV2.FromGRPCMessage(mID); err != nil { + panic(err) + } + err := id.ReadFromV2(idV2) if err != nil { return nil, err } @@ -129,7 +148,7 @@ func ownerFromToken(token *sessionSDK.Object) (*user.ID, []byte, error) { return &tokenIssuer, key, nil } -func originalBodySignature(v *sessionV2.RequestVerificationHeader) *refsV2.Signature { +func originalBodySignature(v *protosession.RequestVerificationHeader) *refs.Signature { if v == nil { return nil } diff --git a/pkg/services/object/acl/v2/util_test.go b/pkg/services/object/acl/v2/util_test.go index dcc6d72e2d..e7fa444f36 100644 --- a/pkg/services/object/acl/v2/util_test.go +++ b/pkg/services/object/acl/v2/util_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/nspcc-dev/neofs-api-go/v2/acl" + protoacl "github.com/nspcc-dev/neofs-api-go/v2/acl/grpc" "github.com/nspcc-dev/neofs-api-go/v2/session" + protosession "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" bearertest "github.com/nspcc-dev/neofs-sdk-go/bearer/test" aclsdk "github.com/nspcc-dev/neofs-sdk-go/container/acl" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" @@ -36,26 +38,29 @@ func TestOriginalTokens(t *testing.T) { var sTokenV2 session.Token sToken.WriteToV2(&sTokenV2) + mbt := bTokenV2.ToGRPCMessage().(*protoacl.BearerToken) + mst := sTokenV2.ToGRPCMessage().(*protosession.SessionToken) for i := range 10 { - metaHeaders := testGenerateMetaHeader(uint32(i), &bTokenV2, &sTokenV2) + metaHeaders := testGenerateMetaHeader(uint32(i), mbt, mst) res, err := originalSessionToken(metaHeaders) require.NoError(t, err) require.Equal(t, sToken, *res, i) - bTok, err := originalBearerToken(metaHeaders) + bTok, err := originalBearerToken(metaHeaders) //nolint:staticcheck // uncomment on unskip require.NoError(t, err) + t.Skip("https://github.com/nspcc-dev/neofs-sdk-go/issues/606") require.Equal(t, &bToken, bTok, i) } } -func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token) *session.RequestMetaHeader { - metaHeader := new(session.RequestMetaHeader) +func testGenerateMetaHeader(depth uint32, b *protoacl.BearerToken, s *protosession.SessionToken) *protosession.RequestMetaHeader { + metaHeader := new(protosession.RequestMetaHeader) metaHeader.SetBearerToken(b) metaHeader.SetSessionToken(s) for range depth { link := metaHeader - metaHeader = new(session.RequestMetaHeader) + metaHeader = new(protosession.RequestMetaHeader) metaHeader.SetOrigin(link) } diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index afc3baef3b..68602671c4 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -19,6 +19,7 @@ import ( protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + aclsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" "github.com/nspcc-dev/neofs-node/pkg/services/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -27,6 +28,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/stat" + "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" ) @@ -121,25 +123,56 @@ type Storage interface { VerifyAndStoreObject(object.Object) error } +type RequestInfoProcessor interface { + ProcessPutRequest(*protoobject.PutRequest) (aclsvc.RequestInfo, user.ID, error) + ProcessDeleteRequest(*protoobject.DeleteRequest) (aclsvc.RequestInfo, error) + ProcessHeadRequest(*protoobject.HeadRequest) (aclsvc.RequestInfo, error) + ProcessHashRequest(*protoobject.GetRangeHashRequest) (aclsvc.RequestInfo, error) + ProcessGetRequest(*protoobject.GetRequest) (aclsvc.RequestInfo, error) + ProcessRangeRequest(*protoobject.GetRangeRequest) (aclsvc.RequestInfo, error) + ProcessSearchRequest(*protoobject.SearchRequest) (aclsvc.RequestInfo, error) +} + +const accessDeniedACLReasonFmt = "access to operation %s is denied by basic ACL check" +const accessDeniedEACLReasonFmt = "access to operation %s is denied by extended ACL check: %v" + +func basicACLErr(info aclsvc.RequestInfo) error { + var errAccessDenied apistatus.ObjectAccessDenied + errAccessDenied.WriteReason(fmt.Sprintf(accessDeniedACLReasonFmt, info.Operation())) + + return errAccessDenied +} + +func eACLErr(info aclsvc.RequestInfo, err error) error { + var errAccessDenied apistatus.ObjectAccessDenied + errAccessDenied.WriteReason(fmt.Sprintf(accessDeniedEACLReasonFmt, info.Operation(), err)) + + return errAccessDenied +} + type server struct { srv ServiceServer - fsChain FSChain - storage Storage - signer ecdsa.PrivateKey - mNumber uint32 - metrics MetricCollector + fsChain FSChain + storage Storage + signer ecdsa.PrivateKey + mNumber uint32 + metrics MetricCollector + aclChecker aclsvc.ACLChecker + reqInfoProc RequestInfoProcessor } // New provides protoobject.ObjectServiceServer for the given parameters. -func New(c ServiceServer, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector) protoobject.ObjectServiceServer { +func New(c ServiceServer, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp RequestInfoProcessor) protoobject.ObjectServiceServer { return &server{ - srv: c, - fsChain: fsChain, - storage: st, - signer: signer, - mNumber: magicNumber, - metrics: m, + srv: c, + fsChain: fsChain, + storage: st, + signer: signer, + mNumber: magicNumber, + metrics: m, + aclChecker: ac, + reqInfoProc: rp, } } @@ -210,6 +243,26 @@ func (s *server) Put(gStream protoobject.ObjectService_PutServer) error { return s.sendStatusPutResponse(gStream, apistatus.ErrNodeUnderMaintenance) } + if req.Body == nil { + return errors.New("malformed request: empty body") + } + + if reqInfo, objOwner, err := s.reqInfoProc.ProcessPutRequest(req); err != nil { + if !errors.Is(err, aclsvc.ErrSkipRequest) { + return s.sendStatusPutResponse(gStream, err) + } + } else { + if !s.aclChecker.CheckBasicACL(reqInfo) || !s.aclChecker.StickyBitCheck(reqInfo, objOwner) { + err = basicACLErr(reqInfo) // needed for defer + return s.sendStatusPutResponse(gStream, err) + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // needed for defer + return s.sendStatusPutResponse(gStream, err) + } + } + if err = stream.Send(putReq); err != nil { err = s.sendStatusPutResponse(gStream, err) // assign for defer return err @@ -246,6 +299,20 @@ func (s *server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*p return s.makeStatusDeleteResponse(apistatus.ErrNodeUnderMaintenance), nil } + reqInfo, err := s.reqInfoProc.ProcessDeleteRequest(req) + if err != nil { + return s.makeStatusDeleteResponse(err), nil + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + return s.makeStatusDeleteResponse(err), nil + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // needed for defer + return s.makeStatusDeleteResponse(err), nil + } + resp, err := s.srv.Delete(ctx, delReq) if err != nil { return s.makeStatusDeleteResponse(err), nil @@ -283,11 +350,31 @@ func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto return s.makeStatusHeadResponse(apistatus.ErrNodeUnderMaintenance), nil } + reqInfo, err := s.reqInfoProc.ProcessHeadRequest(req) + if err != nil { + return s.makeStatusHeadResponse(err), nil + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + return s.makeStatusHeadResponse(err), nil + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // needed for defer + return s.makeStatusHeadResponse(err), nil + } + resp, err := s.srv.Head(ctx, searchReq) if err != nil { return s.makeStatusHeadResponse(err), nil } + err = s.aclChecker.CheckEACL(resp.ToGRPCMessage().(*protoobject.HeadResponse), reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // defer + return s.makeStatusHeadResponse(err), nil + } + return s.signHeadResponse(resp.ToGRPCMessage().(*protoobject.HeadResponse)), nil } @@ -319,6 +406,20 @@ func (s *server) GetRangeHash(ctx context.Context, req *protoobject.GetRangeHash return s.makeStatusHashResponse(apistatus.ErrNodeUnderMaintenance), nil } + reqInfo, err := s.reqInfoProc.ProcessHashRequest(req) + if err != nil { + return s.makeStatusHashResponse(err), nil + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + return s.makeStatusHashResponse(err), nil + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // needed for defer + return s.makeStatusHashResponse(err), nil + } + resp, err := s.srv.GetRangeHash(ctx, hashRngReq) if err != nil { return s.makeStatusHashResponse(err), nil @@ -339,11 +440,17 @@ func (s *server) sendStatusGetResponse(stream protoobject.ObjectService_GetServe type getStreamerV2 struct { protoobject.ObjectService_GetServer - srv *server + srv *server + reqInfo aclsvc.RequestInfo } func (s *getStreamerV2) Send(resp *v2object.GetResponse) error { r := resp.ToGRPCMessage().(*protoobject.GetResponse) + if _, ok := r.GetBody().GetObjectPart().(*protoobject.GetResponse_Body_Init_); ok { + if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + } if c := r.GetBody().GetChunk(); c != nil { s.srv.metrics.AddGetPayload(len(c)) } @@ -368,11 +475,26 @@ func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ return s.sendStatusGetResponse(gStream, apistatus.ErrNodeUnderMaintenance) } + reqInfo, err := s.reqInfoProc.ProcessGetRequest(req) + if err != nil { + return s.sendStatusGetResponse(gStream, err) + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + return s.sendStatusGetResponse(gStream, err) + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // needed for defer + return s.sendStatusGetResponse(gStream, err) + } + err = s.srv.Get( getReq, &getStreamerV2{ ObjectService_GetServer: gStream, srv: s, + reqInfo: reqInfo, }, ) if err != nil { @@ -393,11 +515,16 @@ func (s *server) sendStatusRangeResponse(stream protoobject.ObjectService_GetRan type getRangeStreamerV2 struct { protoobject.ObjectService_GetRangeServer - srv *server + srv *server + reqInfo aclsvc.RequestInfo } func (s *getRangeStreamerV2) Send(resp *v2object.GetRangeResponse) error { - return s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, resp.ToGRPCMessage().(*protoobject.GetRangeResponse)) + r := resp.ToGRPCMessage().(*protoobject.GetRangeResponse) + if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + return s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, r) } // GetRange converts gRPC GetRangeRequest message and server-side stream and overtakes its data @@ -418,11 +545,26 @@ func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. return s.sendStatusRangeResponse(gStream, apistatus.ErrNodeUnderMaintenance) } + reqInfo, err := s.reqInfoProc.ProcessRangeRequest(req) + if err != nil { + return s.sendStatusRangeResponse(gStream, err) + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + return s.sendStatusRangeResponse(gStream, err) + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) // needed for defer + return s.sendStatusRangeResponse(gStream, err) + } + err = s.srv.GetRange( getRngReq, &getRangeStreamerV2{ ObjectService_GetRangeServer: gStream, srv: s, + reqInfo: reqInfo, }, ) if err != nil { @@ -443,11 +585,16 @@ func (s *server) sendStatusSearchResponse(stream protoobject.ObjectService_Searc type searchStreamerV2 struct { protoobject.ObjectService_SearchServer - srv *server + srv *server + reqInfo aclsvc.RequestInfo } func (s *searchStreamerV2) Send(resp *v2object.SearchResponse) error { - return s.srv.sendSearchResponse(s.ObjectService_SearchServer, resp.ToGRPCMessage().(*protoobject.SearchResponse)) + r := resp.ToGRPCMessage().(*protoobject.SearchResponse) + if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + return s.srv.sendSearchResponse(s.ObjectService_SearchServer, r) } // Search converts gRPC SearchRequest message and server-side stream and overtakes its data @@ -468,11 +615,26 @@ func (s *server) Search(req *protoobject.SearchRequest, gStream protoobject.Obje return s.sendStatusSearchResponse(gStream, apistatus.ErrNodeUnderMaintenance) } + reqInfo, err := s.reqInfoProc.ProcessSearchRequest(req) + if err != nil { + return s.sendStatusSearchResponse(gStream, err) + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + return s.sendStatusSearchResponse(gStream, err) + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + err = eACLErr(reqInfo, err) + return s.sendStatusSearchResponse(gStream, err) + } + err = s.srv.Search( searchReq, &searchStreamerV2{ ObjectService_SearchServer: gStream, srv: s, + reqInfo: reqInfo, }, ) if err != nil { diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index 0f9bab229b..d379675438 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -18,6 +18,7 @@ import ( refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" + v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" @@ -28,6 +29,7 @@ import ( oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/nspcc-dev/neofs-sdk-go/stat" + "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/stretchr/testify/require" ) @@ -82,6 +84,66 @@ type noCallTestStorage struct{} func (noCallTestStorage) VerifyAndStoreObject(object.Object) error { panic("must not be called") } +type noCallTestACLChecker struct{} + +func (noCallTestACLChecker) CheckBasicACL(v2.RequestInfo) bool { panic("must not be called") } +func (noCallTestACLChecker) CheckEACL(any, v2.RequestInfo) error { panic("must not be called") } +func (noCallTestACLChecker) StickyBitCheck(v2.RequestInfo, user.ID) bool { panic("must not be called") } + +type noCallTestReqInfoProcessor struct{} + +func (noCallTestReqInfoProcessor) ProcessPutRequest(*objectgrpc.PutRequest) (v2.RequestInfo, user.ID, error) { + panic("must not be called") +} +func (noCallTestReqInfoProcessor) ProcessDeleteRequest(*objectgrpc.DeleteRequest) (v2.RequestInfo, error) { + panic("must not be called") +} +func (noCallTestReqInfoProcessor) ProcessHeadRequest(*objectgrpc.HeadRequest) (v2.RequestInfo, error) { + panic("must not be called") +} +func (noCallTestReqInfoProcessor) ProcessHashRequest(*objectgrpc.GetRangeHashRequest) (v2.RequestInfo, error) { + panic("must not be called") +} +func (noCallTestReqInfoProcessor) ProcessGetRequest(*objectgrpc.GetRequest) (v2.RequestInfo, error) { + panic("must not be called") +} +func (noCallTestReqInfoProcessor) ProcessRangeRequest(*objectgrpc.GetRangeRequest) (v2.RequestInfo, error) { + panic("must not be called") +} +func (noCallTestReqInfoProcessor) ProcessSearchRequest(*objectgrpc.SearchRequest) (v2.RequestInfo, error) { + panic("must not be called") +} + +type nopACLChecker struct{} + +func (nopACLChecker) CheckBasicACL(v2.RequestInfo) bool { return true } +func (nopACLChecker) CheckEACL(any, v2.RequestInfo) error { return nil } +func (nopACLChecker) StickyBitCheck(v2.RequestInfo, user.ID) bool { return true } + +type nopReqInfoProcessor struct{} + +func (nopReqInfoProcessor) ProcessPutRequest(*objectgrpc.PutRequest) (v2.RequestInfo, user.ID, error) { + return v2.RequestInfo{}, user.ID{}, nil +} +func (nopReqInfoProcessor) ProcessDeleteRequest(*objectgrpc.DeleteRequest) (v2.RequestInfo, error) { + return v2.RequestInfo{}, nil +} +func (nopReqInfoProcessor) ProcessHeadRequest(*objectgrpc.HeadRequest) (v2.RequestInfo, error) { + return v2.RequestInfo{}, nil +} +func (nopReqInfoProcessor) ProcessHashRequest(*objectgrpc.GetRangeHashRequest) (v2.RequestInfo, error) { + return v2.RequestInfo{}, nil +} +func (nopReqInfoProcessor) ProcessGetRequest(*objectgrpc.GetRequest) (v2.RequestInfo, error) { + return v2.RequestInfo{}, nil +} +func (nopReqInfoProcessor) ProcessRangeRequest(*objectgrpc.GetRangeRequest) (v2.RequestInfo, error) { + return v2.RequestInfo{}, nil +} +func (nopReqInfoProcessor) ProcessSearchRequest(*objectgrpc.SearchRequest) (v2.RequestInfo, error) { + return v2.RequestInfo{}, nil +} + type nopMetrics struct{} func (nopMetrics) HandleOpExecResult(stat.Method, bool, time.Duration) {} @@ -190,7 +252,9 @@ func TestServer_Replicate(t *testing.T) { var noCallFSChain noCallTestFSChain var noCallObjSvc noCallObjectService var noCallStorage noCallTestStorage - noCallSrv := objectSvc.New(noCallObjSvc, 0, &noCallFSChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) + var noCallACLChecker noCallTestACLChecker + var noCallReqProc noCallTestReqInfoProcessor + noCallSrv := objectSvc.New(noCallObjSvc, 0, &noCallFSChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) clientSigner := neofscryptotest.Signer() clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public()) @@ -354,7 +418,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("apply storage policy failure", func(t *testing.T) { fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) - srv := objectSvc.New(noCallObjSvc, 0, fsChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) fsChain.cnrErr = errors.New("any error") @@ -366,7 +430,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("client or server mismatches object's storage policy", func(t *testing.T) { fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) - srv := objectSvc.New(noCallObjSvc, 0, fsChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) fsChain.serverOutsideCnr = true fsChain.clientOutsideCnr = true @@ -387,7 +451,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("local storage failure", func(t *testing.T) { fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) s := newTestStorage(t, req.Object) - srv := objectSvc.New(noCallObjSvc, 0, fsChain, s, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, s, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) s.storeErr = errors.New("any error") @@ -403,7 +467,7 @@ func TestServer_Replicate(t *testing.T) { reqForSignature, o := anyValidRequest(t, clientSigner, cnr, objID) fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) s := newTestStorage(t, reqForSignature.Object) - srv := objectSvc.New(noCallObjSvc, mNumber, fsChain, s, signer.ECDSAPrivateKey, nopMetrics{}) + srv := objectSvc.New(noCallObjSvc, mNumber, fsChain, s, signer.ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) t.Run("signature not requested", func(t *testing.T) { resp, err := srv.Replicate(context.Background(), reqForSignature) @@ -446,7 +510,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("OK", func(t *testing.T) { fsChain := newTestFSChain(t, serverPubKey, clientPubKey, cnr) s := newTestStorage(t, req.Object) - srv := objectSvc.New(noCallObjSvc, 0, fsChain, s, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) + srv := objectSvc.New(noCallObjSvc, 0, fsChain, s, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -487,7 +551,7 @@ func BenchmarkServer_Replicate(b *testing.B) { ctx := context.Background() var fsChain nopFSChain - srv := objectSvc.New(nil, 0, fsChain, nopStorage{}, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}) + srv := objectSvc.New(nil, 0, fsChain, nopStorage{}, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, nopACLChecker{}, nopReqInfoProcessor{}) for _, tc := range []struct { name string From 0b817479f7baa45949ed99d12d4c0b6435b382ce Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 23 Dec 2024 13:59:12 +0300 Subject: [PATCH 5/7] pkg/services/acl: Use constants from SDK This will simplify neofs-api-go module deprecation. Signed-off-by: Leonard Lyubich --- pkg/services/object/acl/eacl/v2/headers.go | 7 +++---- pkg/services/object/acl/eacl/v2/object.go | 13 ++++++------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index 70ebce6ed6..d8a563baee 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" - "github.com/nspcc-dev/neofs-api-go/v2/acl" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" refsV2 "github.com/nspcc-dev/neofs-api-go/v2/refs" @@ -302,21 +301,21 @@ func (h *cfg) localObjectHeaders(cnr cid.ID, idObj *oid.ID) ([]eaclSDK.Header, b func cidHeader(idCnr cid.ID) sysObjHdr { return sysObjHdr{ - k: acl.FilterObjectContainerID, + k: eaclSDK.FilterObjectContainerID, v: idCnr.EncodeToString(), } } func oidHeader(obj oid.ID) sysObjHdr { return sysObjHdr{ - k: acl.FilterObjectID, + k: eaclSDK.FilterObjectID, v: obj.EncodeToString(), } } func ownerIDHeader(ownerID user.ID) sysObjHdr { return sysObjHdr{ - k: acl.FilterObjectOwnerID, + k: eaclSDK.FilterObjectOwnerID, v: ownerID.EncodeToString(), } } diff --git a/pkg/services/object/acl/eacl/v2/object.go b/pkg/services/object/acl/eacl/v2/object.go index 4a1e043eff..d7017e8be0 100644 --- a/pkg/services/object/acl/eacl/v2/object.go +++ b/pkg/services/object/acl/eacl/v2/object.go @@ -3,7 +3,6 @@ package v2 import ( "strconv" - "github.com/nspcc-dev/neofs-api-go/v2/acl" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -38,22 +37,22 @@ func headersFromObject(obj *object.Object, cnr cid.ID, oid *oid.ID) []eaclSDK.He cidHeader(cnr), // creation epoch sysObjHdr{ - k: acl.FilterObjectCreationEpoch, + k: eaclSDK.FilterObjectCreationEpoch, v: u64Value(obj.CreationEpoch()), }, // payload size sysObjHdr{ - k: acl.FilterObjectPayloadLength, + k: eaclSDK.FilterObjectPayloadSize, v: u64Value(obj.PayloadSize()), }, // object version sysObjHdr{ - k: acl.FilterObjectVersion, + k: eaclSDK.FilterObjectVersion, v: obj.Version().String(), }, // object type sysObjHdr{ - k: acl.FilterObjectType, + k: eaclSDK.FilterObjectType, v: obj.Type().String(), }, ) @@ -69,7 +68,7 @@ func headersFromObject(obj *object.Object, cnr cid.ID, oid *oid.ID) []eaclSDK.He cs, ok := obj.PayloadChecksum() if ok { res = append(res, sysObjHdr{ - k: acl.FilterObjectPayloadHash, + k: eaclSDK.FilterObjectPayloadChecksum, v: cs.String(), }) } @@ -77,7 +76,7 @@ func headersFromObject(obj *object.Object, cnr cid.ID, oid *oid.ID) []eaclSDK.He cs, ok = obj.PayloadHomomorphicHash() if ok { res = append(res, sysObjHdr{ - k: acl.FilterObjectHomomorphicHash, + k: eaclSDK.FilterObjectPayloadHomomorphicChecksum, v: cs.String(), }) } From d3b4419e7c071da080fd7c1cce103a1dc0f1f40e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 28 Jan 2025 15:00:42 +0300 Subject: [PATCH 6/7] object: rename RequestInfoProcessor to ACLInfoExtractor I think it better fits semantically, there is no processing there, just fetching some data from request. Signed-off-by: Roman Khimov --- pkg/services/object/acl/v2/service.go | 28 ++++++++++----------- pkg/services/object/server.go | 36 ++++++++++++++------------- pkg/services/object/server_test.go | 36 +++++++++++++-------------- 3 files changed, 51 insertions(+), 49 deletions(-) diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index d27aeee920..23351c1b20 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -82,9 +82,9 @@ func New(opts ...Option) Service { } } -// ProcessGetRequest resolves RequestInfo from the request to check it using +// GetRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. -func (b Service) ProcessGetRequest(request *protoobject.GetRequest) (RequestInfo, error) { +func (b Service) GetRequestToInfo(request *protoobject.GetRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { return RequestInfo{}, err @@ -129,9 +129,9 @@ func (b Service) ProcessGetRequest(request *protoobject.GetRequest) (RequestInfo return reqInfo, nil } -// ProcessHeadRequest resolves RequestInfo from the request to check it using +// HeadRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. -func (b Service) ProcessHeadRequest(request *protoobject.HeadRequest) (RequestInfo, error) { +func (b Service) HeadRequestToInfo(request *protoobject.HeadRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { return RequestInfo{}, err @@ -176,9 +176,9 @@ func (b Service) ProcessHeadRequest(request *protoobject.HeadRequest) (RequestIn return reqInfo, err } -// ProcessSearchRequest resolves RequestInfo from the request to check it using +// SearchRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. -func (b Service) ProcessSearchRequest(request *protoobject.SearchRequest) (RequestInfo, error) { +func (b Service) SearchRequestToInfo(request *protoobject.SearchRequest) (RequestInfo, error) { id, err := getContainerIDFromRequest(request) if err != nil { return RequestInfo{}, err @@ -216,9 +216,9 @@ func (b Service) ProcessSearchRequest(request *protoobject.SearchRequest) (Reque return reqInfo, nil } -// ProcessDeleteRequest resolves RequestInfo from the request to check it using +// DeleteRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. -func (b Service) ProcessDeleteRequest(request *protoobject.DeleteRequest) (RequestInfo, error) { +func (b Service) DeleteRequestToInfo(request *protoobject.DeleteRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { return RequestInfo{}, err @@ -263,9 +263,9 @@ func (b Service) ProcessDeleteRequest(request *protoobject.DeleteRequest) (Reque return reqInfo, nil } -// ProcessRangeRequest resolves RequestInfo from the request to check it using +// RangeRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. -func (b Service) ProcessRangeRequest(request *protoobject.GetRangeRequest) (RequestInfo, error) { +func (b Service) RangeRequestToInfo(request *protoobject.GetRangeRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { return RequestInfo{}, err @@ -310,9 +310,9 @@ func (b Service) ProcessRangeRequest(request *protoobject.GetRangeRequest) (Requ return reqInfo, nil } -// ProcessHashRequest resolves RequestInfo from the request to check it using +// HashRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. -func (b Service) ProcessHashRequest(request *protoobject.GetRangeHashRequest) (RequestInfo, error) { +func (b Service) HashRequestToInfo(request *protoobject.GetRangeHashRequest) (RequestInfo, error) { cnr, err := getContainerIDFromRequest(request) if err != nil { return RequestInfo{}, err @@ -359,9 +359,9 @@ func (b Service) ProcessHashRequest(request *protoobject.GetRangeHashRequest) (R var ErrSkipRequest = errors.New("skip request") -// ProcessPutRequest resolves RequestInfo from the request to check it using +// PutRequestToInfo resolves RequestInfo from the request to check it using // [ACLChecker]. Returns [ErrSkipRequest] if check should not be performed. -func (b Service) ProcessPutRequest(request *protoobject.PutRequest) (RequestInfo, user.ID, error) { +func (b Service) PutRequestToInfo(request *protoobject.PutRequest) (RequestInfo, user.ID, error) { body := request.GetBody() if body == nil { return RequestInfo{}, user.ID{}, errEmptyBody diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 68602671c4..5aac58ba8b 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -123,14 +123,16 @@ type Storage interface { VerifyAndStoreObject(object.Object) error } -type RequestInfoProcessor interface { - ProcessPutRequest(*protoobject.PutRequest) (aclsvc.RequestInfo, user.ID, error) - ProcessDeleteRequest(*protoobject.DeleteRequest) (aclsvc.RequestInfo, error) - ProcessHeadRequest(*protoobject.HeadRequest) (aclsvc.RequestInfo, error) - ProcessHashRequest(*protoobject.GetRangeHashRequest) (aclsvc.RequestInfo, error) - ProcessGetRequest(*protoobject.GetRequest) (aclsvc.RequestInfo, error) - ProcessRangeRequest(*protoobject.GetRangeRequest) (aclsvc.RequestInfo, error) - ProcessSearchRequest(*protoobject.SearchRequest) (aclsvc.RequestInfo, error) +// ACLInfoExtractor is the interface that allows to fetch data required for ACL +// checks from various types of grpc requests. +type ACLInfoExtractor interface { + PutRequestToInfo(*protoobject.PutRequest) (aclsvc.RequestInfo, user.ID, error) + DeleteRequestToInfo(*protoobject.DeleteRequest) (aclsvc.RequestInfo, error) + HeadRequestToInfo(*protoobject.HeadRequest) (aclsvc.RequestInfo, error) + HashRequestToInfo(*protoobject.GetRangeHashRequest) (aclsvc.RequestInfo, error) + GetRequestToInfo(*protoobject.GetRequest) (aclsvc.RequestInfo, error) + RangeRequestToInfo(*protoobject.GetRangeRequest) (aclsvc.RequestInfo, error) + SearchRequestToInfo(*protoobject.SearchRequest) (aclsvc.RequestInfo, error) } const accessDeniedACLReasonFmt = "access to operation %s is denied by basic ACL check" @@ -159,11 +161,11 @@ type server struct { mNumber uint32 metrics MetricCollector aclChecker aclsvc.ACLChecker - reqInfoProc RequestInfoProcessor + reqInfoProc ACLInfoExtractor } // New provides protoobject.ObjectServiceServer for the given parameters. -func New(c ServiceServer, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp RequestInfoProcessor) protoobject.ObjectServiceServer { +func New(c ServiceServer, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor) protoobject.ObjectServiceServer { return &server{ srv: c, fsChain: fsChain, @@ -247,7 +249,7 @@ func (s *server) Put(gStream protoobject.ObjectService_PutServer) error { return errors.New("malformed request: empty body") } - if reqInfo, objOwner, err := s.reqInfoProc.ProcessPutRequest(req); err != nil { + if reqInfo, objOwner, err := s.reqInfoProc.PutRequestToInfo(req); err != nil { if !errors.Is(err, aclsvc.ErrSkipRequest) { return s.sendStatusPutResponse(gStream, err) } @@ -299,7 +301,7 @@ func (s *server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*p return s.makeStatusDeleteResponse(apistatus.ErrNodeUnderMaintenance), nil } - reqInfo, err := s.reqInfoProc.ProcessDeleteRequest(req) + reqInfo, err := s.reqInfoProc.DeleteRequestToInfo(req) if err != nil { return s.makeStatusDeleteResponse(err), nil } @@ -350,7 +352,7 @@ func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto return s.makeStatusHeadResponse(apistatus.ErrNodeUnderMaintenance), nil } - reqInfo, err := s.reqInfoProc.ProcessHeadRequest(req) + reqInfo, err := s.reqInfoProc.HeadRequestToInfo(req) if err != nil { return s.makeStatusHeadResponse(err), nil } @@ -406,7 +408,7 @@ func (s *server) GetRangeHash(ctx context.Context, req *protoobject.GetRangeHash return s.makeStatusHashResponse(apistatus.ErrNodeUnderMaintenance), nil } - reqInfo, err := s.reqInfoProc.ProcessHashRequest(req) + reqInfo, err := s.reqInfoProc.HashRequestToInfo(req) if err != nil { return s.makeStatusHashResponse(err), nil } @@ -475,7 +477,7 @@ func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ return s.sendStatusGetResponse(gStream, apistatus.ErrNodeUnderMaintenance) } - reqInfo, err := s.reqInfoProc.ProcessGetRequest(req) + reqInfo, err := s.reqInfoProc.GetRequestToInfo(req) if err != nil { return s.sendStatusGetResponse(gStream, err) } @@ -545,7 +547,7 @@ func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. return s.sendStatusRangeResponse(gStream, apistatus.ErrNodeUnderMaintenance) } - reqInfo, err := s.reqInfoProc.ProcessRangeRequest(req) + reqInfo, err := s.reqInfoProc.RangeRequestToInfo(req) if err != nil { return s.sendStatusRangeResponse(gStream, err) } @@ -615,7 +617,7 @@ func (s *server) Search(req *protoobject.SearchRequest, gStream protoobject.Obje return s.sendStatusSearchResponse(gStream, apistatus.ErrNodeUnderMaintenance) } - reqInfo, err := s.reqInfoProc.ProcessSearchRequest(req) + reqInfo, err := s.reqInfoProc.SearchRequestToInfo(req) if err != nil { return s.sendStatusSearchResponse(gStream, err) } diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index d379675438..5d581a6ac0 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -90,27 +90,27 @@ func (noCallTestACLChecker) CheckBasicACL(v2.RequestInfo) bool { panic func (noCallTestACLChecker) CheckEACL(any, v2.RequestInfo) error { panic("must not be called") } func (noCallTestACLChecker) StickyBitCheck(v2.RequestInfo, user.ID) bool { panic("must not be called") } -type noCallTestReqInfoProcessor struct{} +type noCallTestReqInfoExtractor struct{} -func (noCallTestReqInfoProcessor) ProcessPutRequest(*objectgrpc.PutRequest) (v2.RequestInfo, user.ID, error) { +func (noCallTestReqInfoExtractor) PutRequestToInfo(*objectgrpc.PutRequest) (v2.RequestInfo, user.ID, error) { panic("must not be called") } -func (noCallTestReqInfoProcessor) ProcessDeleteRequest(*objectgrpc.DeleteRequest) (v2.RequestInfo, error) { +func (noCallTestReqInfoExtractor) DeleteRequestToInfo(*objectgrpc.DeleteRequest) (v2.RequestInfo, error) { panic("must not be called") } -func (noCallTestReqInfoProcessor) ProcessHeadRequest(*objectgrpc.HeadRequest) (v2.RequestInfo, error) { +func (noCallTestReqInfoExtractor) HeadRequestToInfo(*objectgrpc.HeadRequest) (v2.RequestInfo, error) { panic("must not be called") } -func (noCallTestReqInfoProcessor) ProcessHashRequest(*objectgrpc.GetRangeHashRequest) (v2.RequestInfo, error) { +func (noCallTestReqInfoExtractor) HashRequestToInfo(*objectgrpc.GetRangeHashRequest) (v2.RequestInfo, error) { panic("must not be called") } -func (noCallTestReqInfoProcessor) ProcessGetRequest(*objectgrpc.GetRequest) (v2.RequestInfo, error) { +func (noCallTestReqInfoExtractor) GetRequestToInfo(*objectgrpc.GetRequest) (v2.RequestInfo, error) { panic("must not be called") } -func (noCallTestReqInfoProcessor) ProcessRangeRequest(*objectgrpc.GetRangeRequest) (v2.RequestInfo, error) { +func (noCallTestReqInfoExtractor) RangeRequestToInfo(*objectgrpc.GetRangeRequest) (v2.RequestInfo, error) { panic("must not be called") } -func (noCallTestReqInfoProcessor) ProcessSearchRequest(*objectgrpc.SearchRequest) (v2.RequestInfo, error) { +func (noCallTestReqInfoExtractor) SearchRequestToInfo(*objectgrpc.SearchRequest) (v2.RequestInfo, error) { panic("must not be called") } @@ -120,27 +120,27 @@ func (nopACLChecker) CheckBasicACL(v2.RequestInfo) bool { return true func (nopACLChecker) CheckEACL(any, v2.RequestInfo) error { return nil } func (nopACLChecker) StickyBitCheck(v2.RequestInfo, user.ID) bool { return true } -type nopReqInfoProcessor struct{} +type nopReqInfoExtractor struct{} -func (nopReqInfoProcessor) ProcessPutRequest(*objectgrpc.PutRequest) (v2.RequestInfo, user.ID, error) { +func (nopReqInfoExtractor) PutRequestToInfo(*objectgrpc.PutRequest) (v2.RequestInfo, user.ID, error) { return v2.RequestInfo{}, user.ID{}, nil } -func (nopReqInfoProcessor) ProcessDeleteRequest(*objectgrpc.DeleteRequest) (v2.RequestInfo, error) { +func (nopReqInfoExtractor) DeleteRequestToInfo(*objectgrpc.DeleteRequest) (v2.RequestInfo, error) { return v2.RequestInfo{}, nil } -func (nopReqInfoProcessor) ProcessHeadRequest(*objectgrpc.HeadRequest) (v2.RequestInfo, error) { +func (nopReqInfoExtractor) HeadRequestToInfo(*objectgrpc.HeadRequest) (v2.RequestInfo, error) { return v2.RequestInfo{}, nil } -func (nopReqInfoProcessor) ProcessHashRequest(*objectgrpc.GetRangeHashRequest) (v2.RequestInfo, error) { +func (nopReqInfoExtractor) HashRequestToInfo(*objectgrpc.GetRangeHashRequest) (v2.RequestInfo, error) { return v2.RequestInfo{}, nil } -func (nopReqInfoProcessor) ProcessGetRequest(*objectgrpc.GetRequest) (v2.RequestInfo, error) { +func (nopReqInfoExtractor) GetRequestToInfo(*objectgrpc.GetRequest) (v2.RequestInfo, error) { return v2.RequestInfo{}, nil } -func (nopReqInfoProcessor) ProcessRangeRequest(*objectgrpc.GetRangeRequest) (v2.RequestInfo, error) { +func (nopReqInfoExtractor) RangeRequestToInfo(*objectgrpc.GetRangeRequest) (v2.RequestInfo, error) { return v2.RequestInfo{}, nil } -func (nopReqInfoProcessor) ProcessSearchRequest(*objectgrpc.SearchRequest) (v2.RequestInfo, error) { +func (nopReqInfoExtractor) SearchRequestToInfo(*objectgrpc.SearchRequest) (v2.RequestInfo, error) { return v2.RequestInfo{}, nil } @@ -253,7 +253,7 @@ func TestServer_Replicate(t *testing.T) { var noCallObjSvc noCallObjectService var noCallStorage noCallTestStorage var noCallACLChecker noCallTestACLChecker - var noCallReqProc noCallTestReqInfoProcessor + var noCallReqProc noCallTestReqInfoExtractor noCallSrv := objectSvc.New(noCallObjSvc, 0, &noCallFSChain, noCallStorage, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, noCallACLChecker, noCallReqProc) clientSigner := neofscryptotest.Signer() clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) @@ -551,7 +551,7 @@ func BenchmarkServer_Replicate(b *testing.B) { ctx := context.Background() var fsChain nopFSChain - srv := objectSvc.New(nil, 0, fsChain, nopStorage{}, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, nopACLChecker{}, nopReqInfoProcessor{}) + srv := objectSvc.New(nil, 0, fsChain, nopStorage{}, neofscryptotest.Signer().ECDSAPrivateKey, nopMetrics{}, nopACLChecker{}, nopReqInfoExtractor{}) for _, tc := range []struct { name string From 26716ed5ef7987e1332db491cc0520abf59cf945 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 23 Dec 2024 14:37:29 +0300 Subject: [PATCH 7/7] service/object: Inline transport splitting service Continues 0d86522f0873a9377d2228b7bc36bdbfdde5a34a. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 13 -- cmd/neofs-node/object.go | 21 +-- pkg/services/object/server.go | 88 +++++++++-- pkg/services/object/transport_splitter.go | 184 ---------------------- 4 files changed, 85 insertions(+), 221 deletions(-) delete mode 100644 pkg/services/object/transport_splitter.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 214946d610..a44fb0eb42 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -62,8 +62,6 @@ import ( "google.golang.org/grpc" ) -const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding - const maxMsgSize = 4 << 20 // transport msg limit 4 MiB // capacity of the pools of the morph notification handlers @@ -427,10 +425,6 @@ type cfgGRPC struct { listeners []net.Listener servers []*grpc.Server - - maxChunkSize uint64 - - maxAddrAmount uint64 } type cfgMorph struct { @@ -543,9 +537,6 @@ func initCfg(appCfg *config.Config) *cfg { netAddr = nodeconfig.BootstrapAddresses(appCfg) } - maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload - maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes - persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) @@ -617,10 +608,6 @@ func initCfg(appCfg *config.Config) *cfg { workerPool: netmapWorkerPool, needBootstrap: !relayOnly, } - c.cfgGRPC = cfgGRPC{ - maxChunkSize: maxChunkSize, - maxAddrAmount: maxAddrAmount, - } c.cfgMorph = cfgMorph{ proxyScriptHash: contractsconfig.Proxy(appCfg), } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index e6391d2f61..e1869a99f4 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -293,19 +293,12 @@ func initObjectService(c *cfg) { deletesvcV2.WithInternalService(sDelete), ) - // build service pipeline - // grpc | split - - splitSvc := objectService.NewTransportSplitter( - c.cfgGRPC.maxChunkSize, - c.cfgGRPC.maxAddrAmount, - &objectSvc{ - put: sPutV2, - search: sSearchV2, - get: sGetV2, - delete: sDeleteV2, - }, - ) + objSvc := &objectSvc{ + put: sPutV2, + search: sSearchV2, + get: sGetV2, + delete: sDeleteV2, + } // cachedFirstObjectsNumber is a total cached objects number; the V2 split scheme // expects the first part of the chain to hold a user-defined header of the original @@ -331,7 +324,7 @@ func initObjectService(c *cfg) { SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber, c.log)), ) - server := objectService.New(splitSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc) + server := objectService.New(objSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 5aac58ba8b..7e21d178b5 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -30,6 +30,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" + "google.golang.org/protobuf/proto" ) // GetObjectStream is an interface of NeoFS API v2 compatible object streamer. @@ -152,6 +153,13 @@ func eACLErr(info aclsvc.RequestInfo, err error) error { return errAccessDenied } +const ( + maxRespMsgSize = 4 << 20 // default gRPC limit + maxRespDataChunkSize = maxRespMsgSize * 3 / 4 // 25% to meta, 75% to payload + addrMsgSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding + maxObjAddrRespAmount = maxRespDataChunkSize / addrMsgSize // each address is about 72 bytes +) + type server struct { srv ServiceServer @@ -448,13 +456,28 @@ type getStreamerV2 struct { func (s *getStreamerV2) Send(resp *v2object.GetResponse) error { r := resp.ToGRPCMessage().(*protoobject.GetResponse) - if _, ok := r.GetBody().GetObjectPart().(*protoobject.GetResponse_Body_Init_); ok { + switch v := r.GetBody().GetObjectPart().(type) { + case *protoobject.GetResponse_Body_Init_: if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { return eACLErr(s.reqInfo, err) } - } - if c := r.GetBody().GetChunk(); c != nil { - s.srv.metrics.AddGetPayload(len(c)) + case *protoobject.GetResponse_Body_Chunk: + for buf := bytes.NewBuffer(v.GetChunk()); buf.Len() > 0; { + newResp := &protoobject.GetResponse{ + Body: &protoobject.GetResponse_Body{ + ObjectPart: &protoobject.GetResponse_Body_Chunk{ + Chunk: buf.Next(maxRespDataChunkSize), + }, + }, + MetaHeader: proto.Clone(r.GetMetaHeader()).(*protosession.ResponseMetaHeader), // TODO: can go w/o cloning? + VerifyHeader: proto.Clone(r.GetVerifyHeader()).(*protosession.ResponseVerificationHeader), + } + if err := s.srv.sendGetResponse(s.ObjectService_GetServer, newResp); err != nil { + return err + } + } + s.srv.metrics.AddGetPayload(len(v.Chunk)) + return nil } return s.srv.sendGetResponse(s.ObjectService_GetServer, r) } @@ -523,10 +546,33 @@ type getRangeStreamerV2 struct { func (s *getRangeStreamerV2) Send(resp *v2object.GetRangeResponse) error { r := resp.ToGRPCMessage().(*protoobject.GetRangeResponse) - if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { - return eACLErr(s.reqInfo, err) + v, ok := r.GetBody().GetRangePart().(*protoobject.GetRangeResponse_Body_Chunk) + if !ok { + if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + return s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, r) + } + for buf := bytes.NewBuffer(v.GetChunk()); buf.Len() > 0; { + newResp := &protoobject.GetRangeResponse{ + Body: &protoobject.GetRangeResponse_Body{ + RangePart: &protoobject.GetRangeResponse_Body_Chunk{ + Chunk: buf.Next(maxRespDataChunkSize), + }, + }, + MetaHeader: proto.Clone(r.GetMetaHeader()).(*protosession.ResponseMetaHeader), // TODO: can go w/o cloning? + VerifyHeader: proto.Clone(r.GetVerifyHeader()).(*protosession.ResponseVerificationHeader), + } + // TODO: do not check response multiple times + // TODO: why check it at all? + if err := s.srv.aclChecker.CheckEACL(newResp, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + if err := s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, newResp); err != nil { + return err + } } - return s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, r) + return nil } // GetRange converts gRPC GetRangeRequest message and server-side stream and overtakes its data @@ -593,10 +639,32 @@ type searchStreamerV2 struct { func (s *searchStreamerV2) Send(resp *v2object.SearchResponse) error { r := resp.ToGRPCMessage().(*protoobject.SearchResponse) - if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { - return eACLErr(s.reqInfo, err) + ids := r.GetBody().GetIdList() + for len(ids) > 0 { + newResp := &protoobject.SearchResponse{ + Body: &protoobject.SearchResponse_Body{}, + MetaHeader: proto.Clone(r.GetMetaHeader()).(*protosession.ResponseMetaHeader), // TODO: can go w/o cloning? + VerifyHeader: proto.Clone(r.GetVerifyHeader()).(*protosession.ResponseVerificationHeader), + } + + cut := maxObjAddrRespAmount + if cut > len(ids) { + cut = len(ids) + } + + newResp.Body.IdList = ids[:cut] + // TODO: do not check response multiple times + // TODO: why check it at all? + if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + if err := s.srv.sendSearchResponse(s.ObjectService_SearchServer, r); err != nil { + return err + } + + ids = ids[cut:] } - return s.srv.sendSearchResponse(s.ObjectService_SearchServer, r) + return nil } // Search converts gRPC SearchRequest message and server-side stream and overtakes its data diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go deleted file mode 100644 index 2ef4dcb4d6..0000000000 --- a/pkg/services/object/transport_splitter.go +++ /dev/null @@ -1,184 +0,0 @@ -package object - -import ( - "bytes" - "context" - - "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-node/pkg/services/util" -) - -type ( - TransportSplitter struct { - next ServiceServer - - chunkSize uint64 - addrAmount uint64 - } - - getStreamMsgSizeCtrl struct { - util.ServerStream - - stream GetObjectStream - - chunkSize int - } - - searchStreamMsgSizeCtrl struct { - util.ServerStream - - stream SearchStream - - addrAmount uint64 - } - - rangeStreamMsgSizeCtrl struct { - util.ServerStream - - stream GetObjectRangeStream - - chunkSize int - } -) - -func (s *getStreamMsgSizeCtrl) Send(resp *object.GetResponse) error { - body := resp.GetBody() - - part := body.GetObjectPart() - - chunkPart, ok := part.(*object.GetObjectPartChunk) - if !ok { - return s.stream.Send(resp) - } - - var newResp *object.GetResponse - - for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; { - if newResp == nil { - newResp = new(object.GetResponse) - newResp.SetBody(body) - } - - chunkPart.SetChunk(buf.Next(s.chunkSize)) - newResp.SetMetaHeader(resp.GetMetaHeader()) - newResp.SetVerificationHeader(resp.GetVerificationHeader()) - - if err := s.stream.Send(newResp); err != nil { - return err - } - } - - return nil -} - -func NewTransportSplitter(size, amount uint64, next ServiceServer) *TransportSplitter { - return &TransportSplitter{ - next: next, - chunkSize: size, - addrAmount: amount, - } -} - -func (c *TransportSplitter) Get(req *object.GetRequest, stream GetObjectStream) error { - return c.next.Get(req, &getStreamMsgSizeCtrl{ - ServerStream: stream, - stream: stream, - chunkSize: int(c.chunkSize), - }) -} - -func (c TransportSplitter) Put(ctx context.Context) (PutObjectStream, error) { - return c.next.Put(ctx) -} - -func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { - return c.next.Head(ctx, request) -} - -func (c TransportSplitter) Search(req *object.SearchRequest, stream SearchStream) error { - return c.next.Search(req, &searchStreamMsgSizeCtrl{ - ServerStream: stream, - stream: stream, - addrAmount: c.addrAmount, - }) -} - -func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { - return c.next.Delete(ctx, request) -} - -func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error { - body := resp.GetBody() - - chunkPart, ok := body.GetRangePart().(*object.GetRangePartChunk) - if !ok { - return s.stream.Send(resp) - } - - var newResp *object.GetRangeResponse - - for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; { - if newResp == nil { - newResp = new(object.GetRangeResponse) - newResp.SetBody(body) - } - - chunkPart.SetChunk(buf.Next(s.chunkSize)) - body.SetRangePart(chunkPart) - newResp.SetMetaHeader(resp.GetMetaHeader()) - newResp.SetVerificationHeader(resp.GetVerificationHeader()) - - if err := s.stream.Send(newResp); err != nil { - return err - } - } - - return nil -} - -func (c TransportSplitter) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { - return c.next.GetRange(req, &rangeStreamMsgSizeCtrl{ - ServerStream: stream, - stream: stream, - chunkSize: int(c.chunkSize), - }) -} - -func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - return c.next.GetRangeHash(ctx, request) -} - -func (s *searchStreamMsgSizeCtrl) Send(resp *object.SearchResponse) error { - body := resp.GetBody() - ids := body.GetIDList() - - var newResp *object.SearchResponse - - for ln := uint64(len(ids)); ; { - if newResp == nil { - newResp = new(object.SearchResponse) - newResp.SetBody(body) - } - - cut := s.addrAmount - if cut > ln { - cut = ln - } - - body.SetIDList(ids[:cut]) - newResp.SetMetaHeader(resp.GetMetaHeader()) - newResp.SetVerificationHeader(resp.GetVerificationHeader()) - - if err := s.stream.Send(newResp); err != nil { - return err - } - - ids = ids[cut:] - - if len(ids) == 0 { - break - } - } - - return nil -}