From 314f87a4719ab69101a006d3985c4f8f3f921fc5 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 18 Jan 2024 10:31:12 +0400 Subject: [PATCH] object/get: Refactor storage policy processing Continues 2f2933817b45b776cca76d8d32e91c8c3d1139d1 for `ObjectService.Get` server handler. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 2 + cmd/neofs-node/object.go | 63 +++++++++++++++++++---- pkg/services/object/get/container.go | 75 +++++++++++++++++----------- pkg/services/object/get/exec.go | 21 -------- pkg/services/object/get/get_test.go | 32 ++++++------ pkg/services/object/get/service.go | 53 ++++++++++---------- 6 files changed, 143 insertions(+), 103 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 37d8ec46e1..05171db716 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -472,6 +472,8 @@ type cfgObject struct { cfgLocalStorage cfgLocalStorage tombstoneLifetime uint64 + + containerNodes *containerNodes } type cfgLocalStorage struct { diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index e63cee3c28..0de625a36b 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -224,20 +224,12 @@ func initObjectService(c *cfg) { policer.WithObjectBatchSize(c.applicationConfiguration.policer.objectBatchSize), ) - traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.workers = append(c.workers, c.shared.policer) - sGet := getsvc.New( + sGet := getsvc.New(c, getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), getsvc.WithClientConstructor(coreConstructor), - getsvc.WithTraverserGenerator( - traverseGen.WithTraverseOptions( - placement.SuccessAfter(1), - ), - ), - getsvc.WithNetMapSource(c.netMapSource), getsvc.WithKeyStorage(keyStorage), ) @@ -250,6 +242,7 @@ func initObjectService(c *cfg) { cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource) fatalOnErr(err) + c.cfgObject.containerNodes = cnrNodes sSearch := searchsvc.New(newRemoteContainerNodes(cnrNodes, c.IsLocalKey), searchsvc.WithLogger(c.log), @@ -677,6 +670,58 @@ func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error { return x.putObjectService.ValidateAndStoreObjectLocally(obj) } +// IsLocalNodePublicKey checks whether given binary-encoded public key is +// assigned in the network map to a local storage node. +// +// IsLocalNodePublicKey implements [getsvc.NeoFS]. +func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) } + +// GetNodesForObject reads storage policy of the referenced container from the +// underlying container storage, reads network map at the specified epoch from +// the underlying storage, applies the storage policy to it and returns sorted +// lists of selected storage nodes along with the numbers of primary object +// holders. Resulting slices must not be changed. +// +// GetNodesForObject does not cache results. +// +// GetNodesForObject implements [getsvc.NeoFS]. +func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) { + epoch, err := c.cfgObject.containerNodes.network.Epoch() + if err != nil { + return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err) + } + cnrID := addr.Container() + cnr, err := c.cfgObject.containerNodes.containers.Get(cnrID) + if err != nil { + return nil, nil, fmt.Errorf("read container by ID: %w", err) + } + networkMap, err := c.cfgObject.containerNodes.network.GetNetMapByEpoch(epoch) + if err != nil { + return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err) + } + + policy := cnr.Value.PlacementPolicy() + nodeLists, err := networkMap.ContainerNodes(policy, cnrID) + if err != nil { + return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) + } else if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil { + return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err) + } else if len(nodeLists) != policy.NumberOfReplicas() { + return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ + "diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas()) + } + + primaryCounts := make([]uint, len(nodeLists)) + for i := range nodeLists { + if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) { + return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+ + "invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)", + epoch, i, len(nodeLists), policy.NumberOfReplicas()) + } + } + return nodeLists, primaryCounts, nil +} + type objectSource struct { get *getsvc.Service search *searchsvc.Service diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 5a902c2eff..895350e897 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -4,6 +4,8 @@ import ( "context" "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -15,71 +17,84 @@ func (exec *execCtx) executeOnContainer() { exec.log.Debug("trying to execute in container...") - // initialize epoch number - epoch, err := exec.svc.currentEpochReceiver.currentEpoch() + nodeLists, primaryCounts, err := exec.svc.neoFS.GetNodesForObject(exec.address()) if err != nil { exec.status = statusUndefined exec.err = err - exec.log.Debug("could not get current epoch number", zap.Error(err)) + exec.log.Debug("failed to list storage nodes for the object", zap.Error(err)) return } - exec.processEpoch(epoch) -} - -func (exec *execCtx) processEpoch(epoch uint64) bool { - exec.log.Debug("process epoch", - zap.Uint64("number", epoch), - ) - - traverser, ok := exec.generateTraverser(exec.address(), epoch) - if !ok { - return true - } - ctx, cancel := context.WithCancel(exec.context()) defer cancel() exec.status = statusUndefined mProcessedNodes := make(map[string]struct{}) - - for { - addrs := traverser.Next() - if len(addrs) == 0 { - exec.log.Debug("no more nodes, abort placement iteration") - - return false + var endpoints, externalEndpoints network.AddressGroup + var j, jLim uint + primary := true + + for i := 0; i < len(nodeLists); i++ { // do not use for-range! + if primary { + j, jLim = 0, primaryCounts[i] + } else { + j, jLim = primaryCounts[i], uint(len(nodeLists[i])) } - for i := range addrs { + for ; j < jLim; j++ { select { case <-ctx.Done(): exec.log.Debug("interrupt placement iteration by context", zap.String("error", ctx.Err().Error()), ) - return true + return default: } - strKey := string(addrs[i].PublicKey()) - if _, ok = mProcessedNodes[strKey]; ok { + bKey := nodeLists[i][j].PublicKey() + strKey := string(bKey) + if _, ok := mProcessedNodes[strKey]; ok || exec.svc.neoFS.IsLocalNodePublicKey(bKey) { continue } mProcessedNodes[strKey] = struct{}{} + if err = endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil { + // critical error that may ultimately block the storage service. Normally it + // should not appear because entry into the network map under strict control + exec.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(err)) + continue + } + // TODO: #1142 consider parallel execution // TODO: #1142 consider optimization: if status == SPLIT we can continue until // we reach the best result - split info with linking object ID. var info client.NodeInfo - - client.NodeInfoFromNetmapElement(&info, addrs[i]) + info.SetAddressGroup(endpoints) + info.SetPublicKey(bKey) + if ext := nodeLists[i][j].ExternalAddresses(); len(ext) > 0 { + if err = externalEndpoints.FromStringSlice(ext); err != nil { + // less critical since the main ones must work, but also important + exec.log.Warn("failed to decode external network endpoints of the storage node, ignore them", + zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), + zap.Strings("endpoints", ext), zap.Error(err)) + } else { + info.SetExternalAddressGroup(externalEndpoints) + } + } if exec.processNode(info) { exec.log.Debug("completing the operation") - return true + return } } + + if primary && i == len(nodeLists)-1 { + // switch to reserve nodes + primary = false + i = -1 + } } } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index f11b3be265..ada3e44017 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -8,7 +8,6 @@ import ( clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -162,26 +161,6 @@ func (exec *execCtx) headOnly() bool { return exec.head } -func (exec *execCtx) generateTraverser(addr oid.Address, epoch uint64) (*placement.Traverser, bool) { - obj := addr.Object() - - t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, epoch) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - exec.log.Debug("could not generate container traverser", - zap.String("error", err.Error()), - ) - - return nil, false - case err == nil: - return t, true - } -} - func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) { log := exec.log if rng != nil { diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index cb3facdf7c..44d1e5db1a 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -34,9 +34,9 @@ type testStorage struct { phy map[string]*objectSDK.Object } -type testTraverserGenerator struct { +type testNeoFS struct { c container.Container - b map[uint64]placement.Builder + b placement.Builder } type testPlacementBuilder struct { @@ -68,19 +68,21 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { - opts := make([]placement.Option, 0, 4) - opts = append(opts, - placement.ForContainer(g.c), - placement.UseBuilder(g.b[e]), - placement.SuccessAfter(1), - ) +func (g *testNeoFS) IsLocalNodePublicKey([]byte) bool { return false } - if obj != nil { - opts = append(opts, placement.ForObject(*obj)) +func (g *testNeoFS) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, error) { + obj := addr.Object() + nodeLists, err := g.b.BuildPlacement(addr.Container(), &obj, netmap.PlacementPolicy{}) // policy is ignored in this test + if err != nil { + return nil, nil, err } - return placement.NewTraverser(opts...) + primaryNums := make([]uint, len(nodeLists)) + for i := range primaryNums { + primaryNums[i] = 1 + } + + return nodeLists, primaryNums, nil } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { @@ -502,11 +504,9 @@ func TestGetRemoteSmall(t *testing.T) { const curEpoch = 13 - svc.traverserGenerator = &testTraverserGenerator{ + svc.neoFS = &testNeoFS{ c: cnr, - b: map[uint64]placement.Builder{ - curEpoch: b, - }, + b: b, } svc.clientCache = c svc.currentEpochReceiver = testEpochReceiver(curEpoch) diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index a53eba0d55..9ecbd2f5b3 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -2,19 +2,39 @@ package getsvc import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) +// NeoFS provides access to the NeoFS network to get information necessary for +// the [Service] to work. +type NeoFS interface { + // GetNodesForObject returns descriptors of storage nodes matching storage + // policy of the referenced object for now. Nodes are identified by their public + // keys and can be repeated in different lists. The second value specifies the + // number (N) of primary object holders for each list (L) so: + // - size of each L >= N; + // - first N nodes of each L are primary data holders while others (if any) + // are backup. + // + // GetContainerNodes does not change resulting slices and their elements. + // + // Returns [apistatus.ContainerNotFound] if requested container is missing in + // the network. + GetNodesForObject(oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) + // IsLocalNodePublicKey checks whether given binary-encoded public key is + // assigned in the network map to a local storage node providing [Service]. + IsLocalNodePublicKey([]byte) bool +} + // Service utility serving requests of Object.Get service. type Service struct { *cfg + neoFS NeoFS } // Option is a Service's constructor option. @@ -37,10 +57,6 @@ type cfg struct { get(client.NodeInfo) (getClient, error) } - traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) - } - currentEpochReceiver interface { currentEpoch() (uint64, error) } @@ -59,7 +75,7 @@ func defaultCfg() *cfg { // New creates, initializes and returns utility serving // Object.Get service requests. -func New(opts ...Option) *Service { +func New(neoFS NeoFS, opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -67,7 +83,8 @@ func New(opts ...Option) *Service { } return &Service{ - cfg: c, + cfg: c, + neoFS: neoFS, } } @@ -104,24 +121,6 @@ func WithClientConstructor(v ClientConstructor) Option { } } -// WithTraverserGenerator returns option to set generator of -// placement traverser to get the objects from containers. -func WithTraverserGenerator(t *util.TraverserGenerator) Option { - return func(c *cfg) { - c.traverserGenerator = t - } -} - -// WithNetMapSource returns option to set network -// map storage to receive current network state. -func WithNetMapSource(nmSrc netmap.Source) Option { - return func(c *cfg) { - c.currentEpochReceiver = &nmSrcWrapper{ - nmSrc: nmSrc, - } - } -} - // WithKeyStorage returns option to set private // key storage for session tokens and node key. func WithKeyStorage(store *util.KeyStorage) Option {