Skip to content

Commit

Permalink
Rebuild object service processing pipeline (#3092)
Browse files Browse the repository at this point in the history
This is yet another extraction from #3057, slightly adjusted. The most
controversial thing here is ACL change, but otherwise it tries to use
more of protobuf definitions and less of api-go definitions simplifying
the structure along the way.
  • Loading branch information
roman-khimov authored Jan 28, 2025
2 parents 8b65833 + 26716ed commit b3e19e2
Show file tree
Hide file tree
Showing 22 changed files with 873 additions and 1,266 deletions.
24 changes: 0 additions & 24 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-node/pkg/services/tree"
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/state"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand All @@ -63,8 +62,6 @@ import (
"google.golang.org/grpc"
)

const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding

const maxMsgSize = 4 << 20 // transport msg limit 4 MiB

// capacity of the pools of the morph notification handlers
Expand Down Expand Up @@ -316,13 +313,6 @@ func (c *internals) stopMaintenance() {
c.log.Info("stopped local node's maintenance")
}

// IsMaintenance checks if storage node is under maintenance.
//
// Provides util.NodeState to Object service.
func (c *internals) IsMaintenance() bool {
return c.isMaintenance.Load()
}

type basics struct {
networkState *networkState
netMapSource netmapCore.Source
Expand Down Expand Up @@ -372,8 +362,6 @@ type shared struct {
// whether the local node is in the netMap
localNodeInNetmap atomic.Bool

respSvc *response.Service

policer *policer.Policer

replicator *replicator.Replicator
Expand Down Expand Up @@ -437,10 +425,6 @@ type cfgGRPC struct {
listeners []net.Listener

servers []*grpc.Server

maxChunkSize uint64

maxAddrAmount uint64
}

type cfgMorph struct {
Expand Down Expand Up @@ -553,9 +537,6 @@ func initCfg(appCfg *config.Config) *cfg {
netAddr = nodeconfig.BootstrapAddresses(appCfg)
}

maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes

persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err)

Expand Down Expand Up @@ -614,7 +595,6 @@ func initCfg(appCfg *config.Config) *cfg {
c.shared = shared{
basics: basicSharedConfig,
localAddr: netAddr,
respSvc: response.NewService(response.WithNetworkState(basicSharedConfig.networkState)),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
Expand All @@ -628,10 +608,6 @@ func initCfg(appCfg *config.Config) *cfg {
workerPool: netmapWorkerPool,
needBootstrap: !relayOnly,
}
c.cfgGRPC = cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
}
c.cfgMorph = cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg),
}
Expand Down
94 changes: 40 additions & 54 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neofs-api-go/v2/object"
Expand Down Expand Up @@ -292,56 +293,38 @@ func initObjectService(c *cfg) {
deletesvcV2.WithInternalService(sDelete),
)

// build service pipeline
// grpc | response | acl | split

splitSvc := objectService.NewTransportSplitter(
c.cfgGRPC.maxChunkSize,
c.cfgGRPC.maxAddrAmount,
&objectSvc{
put: sPutV2,
search: sSearchV2,
get: sGetV2,
delete: sDeleteV2,
},
)
objSvc := &objectSvc{
put: sPutV2,
search: sSearchV2,
get: sGetV2,
delete: sDeleteV2,
}

// cachedFirstObjectsNumber is a total cached objects number; the V2 split scheme
// expects the first part of the chain to hold a user-defined header of the original
// object which should be treated as a header to use for the eACL rules check; so
// every object part in every chain will try to refer to the first part, so caching
// should help a lot here
const cachedFirstObjectsNumber = 1000
objNode := newNodeForObjects(cnrNodes, sPut, c.IsLocalKey)
fsChain := newFSChainForObjects(cnrNodes, c.IsLocalKey, c.networkState, &c.isMaintenance)

aclSvc := v2.New(
v2.WithLogger(c.log),
v2.WithIRFetcher(newCachedIRFetcher(irFetcher)),
v2.WithNetmapper(netmapSourceWithNodes{Source: c.netMapSource, n: objNode}),
v2.WithNetmapper(netmapSourceWithNodes{Source: c.netMapSource, fsChain: fsChain}),
v2.WithContainerSource(
c.cfgObject.cnrSource,
),
v2.WithNextService(splitSvc),
v2.WithEACLChecker(
acl.NewChecker(new(acl.CheckerPrm).
SetNetmapState(c.cfgNetmap.state).
SetEACLSource(c.cfgObject.eaclSource).
SetValidator(eaclSDK.NewValidator()).
SetLocalStorage(ls).
SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber, c.log)),
),
),
)

var commonSvc objectService.Common
commonSvc.Init(&c.internals, aclSvc)

respSvc := objectService.NewResponseService(
&commonSvc,
c.respSvc,
aclChecker := acl.NewChecker(new(acl.CheckerPrm).
SetNetmapState(c.cfgNetmap.state).
SetEACLSource(c.cfgObject.eaclSource).
SetValidator(eaclSDK.NewValidator()).
SetLocalStorage(ls).
SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber, c.log)),
)

server := objectService.New(respSvc, mNumber, objNode, c.shared.basics.key.PrivateKey, c.cfgNetmap.state, c.metricsCollector)
server := objectService.New(objSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc)

for _, srv := range c.cfgGRPC.servers {
objectGRPC.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -613,18 +596,19 @@ func (x *remoteContainerNodes) ForEachRemoteContainerNode(cnr cid.ID, f func(net
})
}

// nodeForObjects represents NeoFS storage node for object storage.
type nodeForObjects struct {
putObjectService *putsvc.Service
containerNodes *containerNodes
isLocalPubKey func([]byte) bool
type fsChainForObjects struct {
netmap.StateDetailed
containerNodes *containerNodes
isLocalPubKey func([]byte) bool
isMaintenance *atomic.Bool
}

func newNodeForObjects(cnrNodes *containerNodes, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) *nodeForObjects {
return &nodeForObjects{
putObjectService: putObjectService,
containerNodes: cnrNodes,
isLocalPubKey: isLocalPubKey,
func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) bool, ns netmap.StateDetailed, isMaintenance *atomic.Bool) *fsChainForObjects {
return &fsChainForObjects{
StateDetailed: ns,
containerNodes: cnrNodes,
isLocalPubKey: isLocalPubKey,
isMaintenance: isMaintenance,
}
}

Expand All @@ -633,24 +617,26 @@ func newNodeForObjects(cnrNodes *containerNodes, putObjectService *putsvc.Servic
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
func (x *fsChainForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
// local storage node in the network map.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool {
func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool {
return x.isLocalPubKey(pubKey)
}

// VerifyAndStoreObject checks given object's format and, if it is correct,
// saves the object in the node's local object storage.
//
// Implements [object.Node] interface.
func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error {
return x.putObjectService.ValidateAndStoreObjectLocally(obj)
// LocalNodeUnderMaintenance checks whether local storage node is under
// maintenance now.
func (x *fsChainForObjects) LocalNodeUnderMaintenance() bool { return x.isMaintenance.Load() }

type putObjectServiceWrapper putsvc.Service

func (x *putObjectServiceWrapper) VerifyAndStoreObject(obj objectSDK.Object) error {
return (*putsvc.Service)(x).ValidateAndStoreObjectLocally(obj)
}

type objectSource struct {
Expand Down Expand Up @@ -715,13 +701,13 @@ func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uin

type netmapSourceWithNodes struct {
netmap.Source
n *nodeForObjects
fsChain *fsChainForObjects
}

func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) {
var serverInContainer bool
err := n.n.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool {
if n.n.isLocalPubKey(pubKey) {
err := n.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool {
if n.fsChain.isLocalPubKey(pubKey) {
serverInContainer = true
return false
}
Expand Down
Loading

0 comments on commit b3e19e2

Please sign in to comment.