Skip to content

Commit

Permalink
node/policy: Test functions selecting nodes for container and objects
Browse files Browse the repository at this point in the history
They are vendored by NeoFS SDK, but the reaction to their unexpected
behavior also needs to be tested.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 18, 2024
1 parent 001ded4 commit 380a6b4
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 13 deletions.
39 changes: 26 additions & 13 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
cachedObjectNodesNum = 10000
)

type (
getContainerNodesFunc = func(netmapsdk.NetMap, netmapsdk.PlacementPolicy, cid.ID) ([][]netmapsdk.NodeInfo, error)
sortContainerNodesFunc = func(netmapsdk.NetMap, [][]netmapsdk.NodeInfo, oid.ID) ([][]netmapsdk.NodeInfo, error)
)

// containerNodes wraps NeoFS network state to apply container storage policies.
//
// Since policy application results are consistent for fixed container and
Expand All @@ -50,6 +55,10 @@ type containerNodes struct {

cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes]

// for testing
getContainerNodesFunc getContainerNodesFunc
sortContainerNodesFunc sortContainerNodesFunc
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
Expand All @@ -62,10 +71,12 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con
return nil, fmt.Errorf("create LRU container node cache for objects: %w", err)

Check warning on line 71 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L71

Added line #L71 was not covered by tests
}
return &containerNodes{
containers: containers,
network: network,
cache: l,
objCache: lo,
containers: containers,
network: network,
cache: l,
objCache: lo,
getContainerNodesFunc: netmapsdk.NetMap.ContainerNodes,
sortContainerNodesFunc: netmapsdk.NetMap.PlacementVectors,
}, nil
}

Expand All @@ -84,7 +95,7 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool,
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network}
cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network, getNodesFunc: x.getContainerNodesFunc}

resCur, err := cnrCtx.applyAtEpoch(curEpoch, x.cache)
if err != nil {
Expand Down Expand Up @@ -151,9 +162,10 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
Expand All @@ -168,7 +180,7 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
}
}
res.repCounts = cnrRes.repCounts
res.nodeSets, res.err = networkMap.PlacementVectors(cnrRes.nodeSets, addr.Object())
res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object())
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
Expand All @@ -179,9 +191,10 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
id cid.ID
containers container.Source
network netmap.Source
id cid.ID
containers container.Source
network netmap.Source
getNodesFunc getContainerNodesFunc
// dynamic
cnr *container.Container
}
Expand Down Expand Up @@ -218,7 +231,7 @@ func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[co
return result, nil, fmt.Errorf("read network map by epoch: %w", err)
}
policy := x.cnr.Value.PlacementPolicy()
result.nodeSets, result.err = networkMap.ContainerNodes(policy, x.id)
result.nodeSets, result.err = x.getNodesFunc(*networkMap, policy, x.id)
if result.err == nil {
// ContainerNodes should control following, but still better to double-check
if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum {
Expand Down
110 changes: 110 additions & 0 deletions cmd/neofs-node/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -580,6 +581,115 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) {
require.EqualValues(t, network.epoch, network.callsNetmap[0])
}
})
t.Run("diff num of node lists and replica descriptors", func(t *testing.T) {
_, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4})
cnrs := &testContainer{id: anyAddr.Container(), val: cnr}
network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap}
ns, err := newContainerNodes(cnrs, network)
require.NoError(t, err)
ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) {
require.Equal(t, *networkMap, nm)
require.Equal(t, cnr.PlacementPolicy(), policy)
require.Equal(t, anyAddr.Container(), cnrID)
return make([][]netmap.NodeInfo, 4), nil
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "select container nodes for current epoch #42: "+
"invalid result of container's storage policy application to the network map: "+
"diff number of storage node sets (4) and required replica descriptors (2)")
network.assertEpochCallCount(t, n)
// assert results are cached
cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container())
require.Len(t, network.callsNetmap, 1)
require.EqualValues(t, network.epoch, network.callsNetmap[0])
}
})

t.Run("not enough nodes in some list", func(t *testing.T) {
_, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4})
cnrs := &testContainer{id: anyAddr.Container(), val: cnr}
network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap}
ns, err := newContainerNodes(cnrs, network)
require.NoError(t, err)
ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) {
require.Equal(t, *networkMap, nm)
require.Equal(t, cnr.PlacementPolicy(), policy)
require.Equal(t, anyAddr.Container(), cnrID)
nodeLists, err := nm.ContainerNodes(policy, cnrID)
require.NoError(t, err)
res := make([][]netmap.NodeInfo, len(nodeLists))
copy(res, nodeLists)
res[1] = res[1][:len(res[1])-1]
return res, nil
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "select container nodes for current epoch #42: "+
"invalid result of container's storage policy application to the network map: "+
"invalid storage node set #1: number of nodes (1) is less than minimum required by the container policy (2)")
network.assertEpochCallCount(t, n)
// assert results are cached
cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container())
require.Len(t, network.callsNetmap, 1)
require.EqualValues(t, network.epoch, network.callsNetmap[0])
}
})
t.Run("diff num of node lists and replica descriptors", func(t *testing.T) {
_, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4})
cnrs := &testContainer{id: anyAddr.Container(), val: cnr}
network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap}
ns, err := newContainerNodes(cnrs, network)
require.NoError(t, err)
ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) {
require.Equal(t, *networkMap, nm)
require.Equal(t, cnr.PlacementPolicy(), policy)
require.Equal(t, anyAddr.Container(), cnrID)
return make([][]netmap.NodeInfo, 4), nil
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "select container nodes for current epoch #42: "+
"invalid result of container's storage policy application to the network map: "+
"diff number of storage node sets (4) and required replica descriptors (2)")
network.assertEpochCallCount(t, n)
// assert results are cached
cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container())
require.Len(t, network.callsNetmap, 1)
require.EqualValues(t, network.epoch, network.callsNetmap[0])
}
})

t.Run("sort nodes failure", func(t *testing.T) {
nodes, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3})
cnrs := &testContainer{id: anyAddr.Container(), val: cnr}
network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap}
ns, err := newContainerNodes(cnrs, network)
require.NoError(t, err)
ns.sortContainerNodesFunc = func(nm netmap.NetMap, ns [][]netmap.NodeInfo, id oid.ID) ([][]netmap.NodeInfo, error) {
require.Equal(t, *networkMap, nm)
require.Equal(t, anyAddr.Object(), id)
for i := range ns {
for j := range ns[i] {
require.Contains(t, nodes, ns[i][j], [2]int{i, j})
}
}
return nil, errors.New("any sort error")
}

for n := 1; n <= 10; n++ {
_, _, err = ns.getNodesForObject(anyAddr)
require.EqualError(t, err, "sort container nodes for object: any sort error")
network.assertEpochCallCount(t, n)
// assert results are cached
cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container())
require.Len(t, network.callsNetmap, 1)
require.EqualValues(t, network.epoch, network.callsNetmap[0])
}
})
})
t.Run("OK", func(t *testing.T) {
nodes, networkMap, cnr := newNetmapWithContainer(t, 10, [][]int{
Expand Down

0 comments on commit 380a6b4

Please sign in to comment.