diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index 603681a1e1..a98d0a43d0 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -39,6 +39,11 @@ const ( cachedObjectNodesNum = 10000 ) +type ( + getContainerNodesFunc = func(netmapsdk.NetMap, netmapsdk.PlacementPolicy, cid.ID) ([][]netmapsdk.NodeInfo, error) + sortContainerNodesFunc = func(netmapsdk.NetMap, [][]netmapsdk.NodeInfo, oid.ID) ([][]netmapsdk.NodeInfo, error) +) + // containerNodes wraps NeoFS network state to apply container storage policies. // // Since policy application results are consistent for fixed container and @@ -50,6 +55,10 @@ type containerNodes struct { cache *lru.Cache[containerNodesCacheKey, storagePolicyRes] objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes] + + // for testing + getContainerNodesFunc getContainerNodesFunc + sortContainerNodesFunc sortContainerNodesFunc } func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { @@ -62,10 +71,12 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con return nil, fmt.Errorf("create LRU container node cache for objects: %w", err) } return &containerNodes{ - containers: containers, - network: network, - cache: l, - objCache: lo, + containers: containers, + network: network, + cache: l, + objCache: lo, + getContainerNodesFunc: netmapsdk.NetMap.ContainerNodes, + sortContainerNodesFunc: netmapsdk.NetMap.PlacementVectors, }, nil } @@ -84,7 +95,7 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, return fmt.Errorf("read current NeoFS epoch: %w", err) } - cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network} + cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network, getNodesFunc: x.getContainerNodesFunc} resCur, err := cnrCtx.applyAtEpoch(curEpoch, x.cache) if err != nil { @@ -151,9 +162,10 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node return res.nodeSets, res.repCounts, res.err } cnrRes, networkMap, err := (&containerPolicyContext{ - id: addr.Container(), - containers: x.containers, - network: x.network, + id: addr.Container(), + containers: x.containers, + network: x.network, + getNodesFunc: x.getContainerNodesFunc, }).applyToNetmap(curEpoch, x.cache) if err != nil || cnrRes.err != nil { if err == nil { @@ -168,7 +180,7 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node } } res.repCounts = cnrRes.repCounts - res.nodeSets, res.err = networkMap.PlacementVectors(cnrRes.nodeSets, addr.Object()) + res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object()) if res.err != nil { res.err = fmt.Errorf("sort container nodes for object: %w", res.err) } @@ -179,9 +191,10 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node // preserves context of storage policy processing for the particular container. type containerPolicyContext struct { // static - id cid.ID - containers container.Source - network netmap.Source + id cid.ID + containers container.Source + network netmap.Source + getNodesFunc getContainerNodesFunc // dynamic cnr *container.Container } @@ -218,7 +231,7 @@ func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[co return result, nil, fmt.Errorf("read network map by epoch: %w", err) } policy := x.cnr.Value.PlacementPolicy() - result.nodeSets, result.err = networkMap.ContainerNodes(policy, x.id) + result.nodeSets, result.err = x.getNodesFunc(*networkMap, policy, x.id) if result.err == nil { // ContainerNodes should control following, but still better to double-check if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum { diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go index 150348e6b1..43cb425528 100644 --- a/cmd/neofs-node/policy_test.go +++ b/cmd/neofs-node/policy_test.go @@ -13,6 +13,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/netmap" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) @@ -580,6 +581,115 @@ func TestContainerNodes_GetNodesForObject(t *testing.T) { require.EqualValues(t, network.epoch, network.callsNetmap[0]) } }) + t.Run("diff num of node lists and replica descriptors", func(t *testing.T) { + _, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, cnr.PlacementPolicy(), policy) + require.Equal(t, anyAddr.Container(), cnrID) + return make([][]netmap.NodeInfo, 4), nil + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "select container nodes for current epoch #42: "+ + "invalid result of container's storage policy application to the network map: "+ + "diff number of storage node sets (4) and required replica descriptors (2)") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + + t.Run("not enough nodes in some list", func(t *testing.T) { + _, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, cnr.PlacementPolicy(), policy) + require.Equal(t, anyAddr.Container(), cnrID) + nodeLists, err := nm.ContainerNodes(policy, cnrID) + require.NoError(t, err) + res := make([][]netmap.NodeInfo, len(nodeLists)) + copy(res, nodeLists) + res[1] = res[1][:len(res[1])-1] + return res, nil + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "select container nodes for current epoch #42: "+ + "invalid result of container's storage policy application to the network map: "+ + "invalid storage node set #1: number of nodes (1) is less than minimum required by the container policy (2)") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + t.Run("diff num of node lists and replica descriptors", func(t *testing.T) { + _, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}, []int{3, 4}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.getContainerNodesFunc = func(nm netmap.NetMap, policy netmap.PlacementPolicy, cnrID cid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, cnr.PlacementPolicy(), policy) + require.Equal(t, anyAddr.Container(), cnrID) + return make([][]netmap.NodeInfo, 4), nil + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "select container nodes for current epoch #42: "+ + "invalid result of container's storage policy application to the network map: "+ + "diff number of storage node sets (4) and required replica descriptors (2)") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) + + t.Run("sort nodes failure", func(t *testing.T) { + nodes, networkMap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + cnrs := &testContainer{id: anyAddr.Container(), val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: networkMap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + ns.sortContainerNodesFunc = func(nm netmap.NetMap, ns [][]netmap.NodeInfo, id oid.ID) ([][]netmap.NodeInfo, error) { + require.Equal(t, *networkMap, nm) + require.Equal(t, anyAddr.Object(), id) + for i := range ns { + for j := range ns[i] { + require.Contains(t, nodes, ns[i][j], [2]int{i, j}) + } + } + return nil, errors.New("any sort error") + } + + for n := 1; n <= 10; n++ { + _, _, err = ns.getNodesForObject(anyAddr) + require.EqualError(t, err, "sort container nodes for object: any sort error") + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyAddr.Container()) + require.Len(t, network.callsNetmap, 1) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + } + }) }) t.Run("OK", func(t *testing.T) { nodes, networkMap, cnr := newNetmapWithContainer(t, 10, [][]int{