Skip to content

Commit

Permalink
object/get: Refactor storage policy processing
Browse files Browse the repository at this point in the history
Continues 2f29338 for `ObjectService.Get`
server handler.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 17, 2024
1 parent 22259ce commit 314f87a
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 103 deletions.
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ type cfgObject struct {
cfgLocalStorage cfgLocalStorage

tombstoneLifetime uint64

containerNodes *containerNodes
}

type cfgLocalStorage struct {
Expand Down
63 changes: 54 additions & 9 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,12 @@ func initObjectService(c *cfg) {
policer.WithObjectBatchSize(c.applicationConfiguration.policer.objectBatchSize),
)

traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)

c.workers = append(c.workers, c.shared.policer)

sGet := getsvc.New(
sGet := getsvc.New(c,
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)

Expand All @@ -250,6 +242,7 @@ func initObjectService(c *cfg) {

cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource)
fatalOnErr(err)
c.cfgObject.containerNodes = cnrNodes

sSearch := searchsvc.New(newRemoteContainerNodes(cnrNodes, c.IsLocalKey),
searchsvc.WithLogger(c.log),
Expand Down Expand Up @@ -677,6 +670,58 @@ func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error {
return x.putObjectService.ValidateAndStoreObjectLocally(obj)
}

// IsLocalNodePublicKey checks whether given binary-encoded public key is
// assigned in the network map to a local storage node.
//
// IsLocalNodePublicKey implements [getsvc.NeoFS].
func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the numbers of primary object
// holders. Resulting slices must not be changed.
//
// GetNodesForObject does not cache results.
//
// GetNodesForObject implements [getsvc.NeoFS].
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
epoch, err := c.cfgObject.containerNodes.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cnrID := addr.Container()
cnr, err := c.cfgObject.containerNodes.containers.Get(cnrID)
if err != nil {
return nil, nil, fmt.Errorf("read container by ID: %w", err)
}
networkMap, err := c.cfgObject.containerNodes.network.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}

policy := cnr.Value.PlacementPolicy()
nodeLists, err := networkMap.ContainerNodes(policy, cnrID)
if err != nil {
return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
} else if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil {
return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err)
} else if len(nodeLists) != policy.NumberOfReplicas() {
return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas())
}

primaryCounts := make([]uint, len(nodeLists))
for i := range nodeLists {
if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) {
return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)",
epoch, i, len(nodeLists), policy.NumberOfReplicas())
}
}
return nodeLists, primaryCounts, nil
}

type objectSource struct {
get *getsvc.Service
search *searchsvc.Service
Expand Down
75 changes: 45 additions & 30 deletions pkg/services/object/get/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)

Expand All @@ -15,71 +17,84 @@ func (exec *execCtx) executeOnContainer() {

exec.log.Debug("trying to execute in container...")

// initialize epoch number
epoch, err := exec.svc.currentEpochReceiver.currentEpoch()
nodeLists, primaryCounts, err := exec.svc.neoFS.GetNodesForObject(exec.address())
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number", zap.Error(err))
exec.log.Debug("failed to list storage nodes for the object", zap.Error(err))
return
}

exec.processEpoch(epoch)
}

func (exec *execCtx) processEpoch(epoch uint64) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", epoch),
)

traverser, ok := exec.generateTraverser(exec.address(), epoch)
if !ok {
return true
}

ctx, cancel := context.WithCancel(exec.context())
defer cancel()

exec.status = statusUndefined
mProcessedNodes := make(map[string]struct{})

for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug("no more nodes, abort placement iteration")

return false
var endpoints, externalEndpoints network.AddressGroup
var j, jLim uint
primary := true

for i := 0; i < len(nodeLists); i++ { // do not use for-range!
if primary {
j, jLim = 0, primaryCounts[i]
} else {
j, jLim = primaryCounts[i], uint(len(nodeLists[i]))
}

for i := range addrs {
for ; j < jLim; j++ {
select {
case <-ctx.Done():
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)

return true
return
default:
}

strKey := string(addrs[i].PublicKey())
if _, ok = mProcessedNodes[strKey]; ok {
bKey := nodeLists[i][j].PublicKey()
strKey := string(bKey)
if _, ok := mProcessedNodes[strKey]; ok || exec.svc.neoFS.IsLocalNodePublicKey(bKey) {
continue
}

mProcessedNodes[strKey] = struct{}{}

if err = endpoints.FromIterator(network.NodeEndpointsIterator(nodeLists[i][j])); err != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control
exec.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(err))
continue
}

// TODO: #1142 consider parallel execution
// TODO: #1142 consider optimization: if status == SPLIT we can continue until
// we reach the best result - split info with linking object ID.
var info client.NodeInfo

client.NodeInfoFromNetmapElement(&info, addrs[i])
info.SetAddressGroup(endpoints)
info.SetPublicKey(bKey)
if ext := nodeLists[i][j].ExternalAddresses(); len(ext) > 0 {
if err = externalEndpoints.FromStringSlice(ext); err != nil {
// less critical since the main ones must work, but also important
exec.log.Warn("failed to decode external network endpoints of the storage node, ignore them",
zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])),
zap.Strings("endpoints", ext), zap.Error(err))
} else {
info.SetExternalAddressGroup(externalEndpoints)
}
}

if exec.processNode(info) {
exec.log.Debug("completing the operation")
return true
return
}
}

if primary && i == len(nodeLists)-1 {
// switch to reserve nodes
primary = false
i = -1
}
}
}
21 changes: 0 additions & 21 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -162,26 +161,6 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}

func (exec *execCtx) generateTraverser(addr oid.Address, epoch uint64) (*placement.Traverser, bool) {
obj := addr.Object()

t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, epoch)

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not generate container traverser",
zap.String("error", err.Error()),
)

return nil, false
case err == nil:
return t, true
}
}

func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
log := exec.log
if rng != nil {
Expand Down
32 changes: 16 additions & 16 deletions pkg/services/object/get/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type testStorage struct {
phy map[string]*objectSDK.Object
}

type testTraverserGenerator struct {
type testNeoFS struct {
c container.Container
b map[uint64]placement.Builder
b placement.Builder
}

type testPlacementBuilder struct {
Expand Down Expand Up @@ -68,19 +68,21 @@ func newTestStorage() *testStorage {
}
}

func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
opts := make([]placement.Option, 0, 4)
opts = append(opts,
placement.ForContainer(g.c),
placement.UseBuilder(g.b[e]),
placement.SuccessAfter(1),
)
func (g *testNeoFS) IsLocalNodePublicKey([]byte) bool { return false }

if obj != nil {
opts = append(opts, placement.ForObject(*obj))
func (g *testNeoFS) GetNodesForObject(addr oid.Address) ([][]netmap.NodeInfo, []uint, error) {
obj := addr.Object()
nodeLists, err := g.b.BuildPlacement(addr.Container(), &obj, netmap.PlacementPolicy{}) // policy is ignored in this test
if err != nil {
return nil, nil, err
}

return placement.NewTraverser(opts...)
primaryNums := make([]uint, len(nodeLists))
for i := range primaryNums {
primaryNums[i] = 1
}

return nodeLists, primaryNums, nil
}

func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
Expand Down Expand Up @@ -502,11 +504,9 @@ func TestGetRemoteSmall(t *testing.T) {

const curEpoch = 13

svc.traverserGenerator = &testTraverserGenerator{
svc.neoFS = &testNeoFS{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: b,
},
b: b,
}
svc.clientCache = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
Expand Down
Loading

0 comments on commit 314f87a

Please sign in to comment.