Skip to content

Commit

Permalink
object/get: Refactor storage policy processing
Browse files Browse the repository at this point in the history
Continues 2f29338 for `ObjectService`'s
`Get`/`Head`/`GetRange` server handlers.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 24, 2024
1 parent 8448ff3 commit 803e9a4
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 119 deletions.
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ type cfgObject struct {
cfgLocalStorage cfgLocalStorage

tombstoneLifetime uint64

containerNodes *containerNodes
}

type cfgLocalStorage struct {
Expand Down
28 changes: 19 additions & 9 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,12 @@ func initObjectService(c *cfg) {
policer.WithObjectBatchSize(c.applicationConfiguration.policer.objectBatchSize),
)

traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)

c.workers = append(c.workers, c.shared.policer)

sGet := getsvc.New(
sGet := getsvc.New(c,
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)

Expand All @@ -250,6 +242,7 @@ func initObjectService(c *cfg) {

cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource)
fatalOnErr(err)
c.cfgObject.containerNodes = cnrNodes

sSearch := searchsvc.New(newRemoteContainerNodes(cnrNodes, c.IsLocalKey),
searchsvc.WithLogger(c.log),
Expand Down Expand Up @@ -719,3 +712,20 @@ func (o objectSource) Search(ctx context.Context, cnr cid.ID, filters objectSDK.

return sw.ids, nil
}

// IsLocalNodePublicKey checks whether given binary-encoded public key is
// assigned in the network map to a local storage node.
//
// IsLocalNodePublicKey implements [getsvc.NeoFSNetwork].
func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
//
// GetNodesForObject implements [getsvc.NeoFSNetwork].
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
return c.cfgObject.containerNodes.getNodesForObject(addr)
}
45 changes: 45 additions & 0 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// storagePolicyRes structures persistent storage policy application result for
Expand Down Expand Up @@ -152,3 +153,47 @@ func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[con
cache.Add(cacheKey, result)
return result, nil
}

// getNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
epoch, err := x.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cnrID := addr.Container()
cnr, err := x.containers.Get(cnrID)
if err != nil {
return nil, nil, fmt.Errorf("read container by ID: %w", err)
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}

policy := cnr.Value.PlacementPolicy()
nodeLists, err := networkMap.ContainerNodes(policy, cnrID)
if err != nil {
return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
}
if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil {
return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err)
}
if len(nodeLists) != policy.NumberOfReplicas() {
return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas())
}

primaryCounts := make([]uint, len(nodeLists))
for i := range nodeLists {
if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) {
return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)",
epoch, i, len(nodeLists), policy.NumberOfReplicas())
}
}
return nodeLists, primaryCounts, nil
}
75 changes: 45 additions & 30 deletions pkg/services/object/get/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)

Expand All @@ -15,71 +17,84 @@ func (exec *execCtx) executeOnContainer() {

exec.log.Debug("trying to execute in container...")

// initialize epoch number
epoch, err := exec.svc.currentEpochReceiver.currentEpoch()
nodeLists, primaryCounts, err := exec.svc.neoFSNet.GetNodesForObject(exec.address())
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number", zap.Error(err))
exec.log.Debug("failed to list storage nodes for the object", zap.Error(err))
return
}

exec.processEpoch(epoch)
}

func (exec *execCtx) processEpoch(epoch uint64) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", epoch),
)

traverser, ok := exec.generateTraverser(exec.address(), epoch)
if !ok {
return true
}

ctx, cancel := context.WithCancel(exec.context())
defer cancel()

exec.status = statusUndefined
mProcessedNodes := make(map[string]struct{})

for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug("no more nodes, abort placement iteration")

return false
var endpoints, externalEndpoints network.AddressGroup
var j, jLim uint
primary := true

for i := 0; i < len(nodeLists); i++ { // do not use for-range!
if primary {
j, jLim = 0, primaryCounts[i]
} else {
j, jLim = primaryCounts[i], uint(len(nodeLists[i]))
}

for i := range addrs {
for ; j < jLim; j++ {
select {
case <-ctx.Done():
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)

return true
return
default:
}

strKey := string(addrs[i].PublicKey())
if _, ok = mProcessedNodes[strKey]; ok {
bKey := nodeLists[i][j].PublicKey()
strKey := string(bKey)
if _, ok := mProcessedNodes[strKey]; ok || exec.svc.neoFSNet.IsLocalNodePublicKey(bKey) {
continue
}

mProcessedNodes[strKey] = struct{}{}

if err = endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
exec.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(err))
continue
}

// TODO: #1142 consider parallel execution
// TODO: #1142 consider optimization: if status == SPLIT we can continue until
// we reach the best result - split info with linking object ID.
var info client.NodeInfo

client.NodeInfoFromNetmapElement(&info, addrs[i])
info.SetAddressGroup(endpoints)
info.SetPublicKey(bKey)
if ext := nodeLists[i][j].ExternalAddresses(); len(ext) > 0 {
if err = externalEndpoints.FromStringSlice(ext); err != nil {
// less critical since the main ones must work, but also important
exec.log.Warn("failed to decode external network endpoints of the storage node, ignore them",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])),
zap.Strings("endpoints", ext), zap.Error(err))
} else {
info.SetExternalAddressGroup(externalEndpoints)
}
}

if exec.processNode(info) {
exec.log.Debug("completing the operation")
return true
return
}
}

if primary && i == len(nodeLists)-1 {
// switch to reserve nodes
primary = false
i = -1
}
}
}
21 changes: 0 additions & 21 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -162,26 +161,6 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}

func (exec *execCtx) generateTraverser(addr oid.Address, epoch uint64) (*placement.Traverser, bool) {
obj := addr.Object()

t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, epoch)

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not generate container traverser",
zap.String("error", err.Error()),
)

return nil, false
case err == nil:
return t, true
}
}

func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
log := exec.log
if rng != nil {
Expand Down
35 changes: 16 additions & 19 deletions pkg/services/object/get/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type testStorage struct {
phy map[string]*objectSDK.Object
}

type testTraverserGenerator struct {
type testNeoFS struct {
c container.Container
b map[uint64]placement.Builder
b placement.Builder
}

type testPlacementBuilder struct {
Expand Down Expand Up @@ -68,19 +68,21 @@ func newTestStorage() *testStorage {
}
}

func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
opts := make([]placement.Option, 0, 4)
opts = append(opts,
placement.ForContainer(g.c),
placement.UseBuilder(g.b[e]),
placement.SuccessAfter(1),
)
func (g *testNeoFS) IsLocalNodePublicKey([]byte) bool { return false }

if obj != nil {
opts = append(opts, placement.ForObject(*obj))
func (g *testNeoFS) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, error) {
obj := addr.Object()
nodeLists, err := g.b.BuildPlacement(addr.Container(), &obj, netmap.PlacementPolicy{}) // policy is ignored in this test
if err != nil {
return nil, nil, err
}

primaryNums := make([]uint, len(nodeLists))
for i := range primaryNums {
primaryNums[i] = 1
}

return placement.NewTraverser(opts...)
return nodeLists, primaryNums, nil
}

func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
Expand Down Expand Up @@ -500,16 +502,11 @@ func TestGetRemoteSmall(t *testing.T) {
svc.localStorage = newTestStorage()
svc.assembly = true

const curEpoch = 13

svc.traverserGenerator = &testTraverserGenerator{
svc.neoFSNet = &testNeoFS{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: b,
},
b: b,
}
svc.clientCache = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)

return svc
}
Expand Down
Loading

0 comments on commit 803e9a4

Please sign in to comment.