From b7b67172506dd205c58c9953a4271dee7a244a41 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 11 Jul 2024 17:25:27 +0400 Subject: [PATCH] node/policy: Cache up to 1000 container policy application results TBD Signed-off-by: Leonard Lyubich --- cmd/neofs-node/policy.go | 200 ++++++++++++++++---- cmd/neofs-node/policy_test.go | 333 ++++++++++++++++++++++++++++------ 2 files changed, 442 insertions(+), 91 deletions(-) diff --git a/cmd/neofs-node/policy.go b/cmd/neofs-node/policy.go index 8ac25d5f04..341852f694 100644 --- a/cmd/neofs-node/policy.go +++ b/cmd/neofs-node/policy.go @@ -2,26 +2,114 @@ package main import ( "fmt" + "sync" + "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" ) +// storagePolicyRes structures persistent storage policy application result for +// particular container and network map incl. error. +type storagePolicyRes struct { + nodeSets [][]netmapsdk.NodeInfo + err error +} + +// containerNodesAtEpoch is a thread-safe LRU cache mapping containers into +// storage policy application results for particular network map. +type containerNodesAtEpoch struct { + mtx sync.RWMutex + lru simplelru.LRUCache[cid.ID, storagePolicyRes] +} + +// containerNodesCaches groups caches for all tracked epochs. +type containerNodesCaches struct{ cur, prev *containerNodesAtEpoch } + +// max number of container storage policy applications results cached by for +// each epoch. +const cachedContainerNodesPerEpochNum = 1000 + // containerNodes wraps NeoFS network state to apply container storage policies. +// +// Since policy application results are consistent for fixed container and +// network map, they could be cached. The containerNodes caches up to +// cachedContainerNodesPerEpochNum results for the latest and the previous +// epochs. The previous one is required to support data migration after the +// epoch tick. Older epochs are not cached. type containerNodes struct { containers container.Source network netmap.Source + + lastSeenCurrentEpochMtx sync.Mutex + lastSeenCurrentEpoch uint64 + epochCaches containerNodesCaches +} + +func newContainerNodesAtEpochLRUCache() (simplelru.LRUCache[cid.ID, storagePolicyRes], error) { + lru, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil) + if err != nil { + return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err) + } + return lru, nil } func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) { + lru, err := newContainerNodesAtEpochLRUCache() + if err != nil { + return nil, err + } return &containerNodes{ containers: containers, network: network, + epochCaches: containerNodesCaches{ + cur: &containerNodesAtEpoch{lru: lru}, + }, }, nil } +func (x *containerNodes) updateAndGetCachesForCurrentEpoch(curEpoch uint64) (containerNodesCaches, error) { + x.lastSeenCurrentEpochMtx.Lock() + defer x.lastSeenCurrentEpochMtx.Unlock() + + if curEpoch == x.lastSeenCurrentEpoch { + return x.epochCaches, nil + } + + lru, err := newContainerNodesAtEpochLRUCache() + if err != nil { + return containerNodesCaches{}, err + } + if curEpoch > x.lastSeenCurrentEpoch { + if curEpoch == x.lastSeenCurrentEpoch+1 { + fmt.Println("INCREMENT") + x.epochCaches.prev = x.epochCaches.cur + } else { + lruPrev, err := newContainerNodesAtEpochLRUCache() + if err != nil { + return containerNodesCaches{}, err + } + x.epochCaches.prev = &containerNodesAtEpoch{lru: lruPrev} + } + x.epochCaches.cur = &containerNodesAtEpoch{lru: lru} + } else if curEpoch < x.lastSeenCurrentEpoch { // not really expected in practice + if curEpoch == x.lastSeenCurrentEpoch-1 { + x.epochCaches.cur = x.epochCaches.prev + } else { + lruCur, err := newContainerNodesAtEpochLRUCache() + if err != nil { + return containerNodesCaches{}, err + } + x.epochCaches.cur = &containerNodesAtEpoch{lru: lruCur} + } + x.epochCaches.prev = &containerNodesAtEpoch{lru: lru} + } + x.lastSeenCurrentEpoch = curEpoch + return x.epochCaches, nil +} + // forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key // of each node match the referenced container's storage policy at two latest // epochs into f. When f returns false, nil is returned instantly. @@ -32,58 +120,106 @@ func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid. } func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error { - epoch, err := x.network.Epoch() + curEpoch, err := x.network.Epoch() if err != nil { return fmt.Errorf("read current NeoFS epoch: %w", err) } - cnr, err := x.containers.Get(cnrID) + caches, err := x.updateAndGetCachesForCurrentEpoch(curEpoch) if err != nil { - return fmt.Errorf("read container by ID: %w", err) + return err } - networkMap, err := x.network.GetNetMapByEpoch(epoch) - if err != nil { - return fmt.Errorf("read network map at epoch #%d: %w", epoch, err) - } - // TODO(#2692): node sets remain unchanged for fixed container and network map, - // so recently calculated results worth caching - ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID) - if err != nil { - return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) - } + cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network} - for i := range ns { - for j := range ns[i] { - if !f(ns[i][j]) { - return nil + resCur, err := cnrCtx.applyAtEpoch(curEpoch, caches.cur) + if err != nil { + return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err) + } else if resCur.err == nil { // error case handled below + for i := range resCur.nodeSets { + for j := range resCur.nodeSets[i] { + if !f(resCur.nodeSets[i][j]) { + return nil + } } } } - if !withPrevEpoch || epoch == 0 { + if !withPrevEpoch || curEpoch == 0 { + if resCur.err != nil { + return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err) + } return nil } - epoch-- - - networkMap, err = x.network.GetNetMapByEpoch(epoch) + resPrev, err := cnrCtx.applyAtEpoch(curEpoch-1, caches.prev) if err != nil { - return fmt.Errorf("read network map at epoch #%d: %w", epoch, err) + if resCur.err != nil { + return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w", + curEpoch, resCur.err, curEpoch-1, err) + } + return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, err) + } else if resPrev.err == nil { // error case handled below + for i := range resPrev.nodeSets { + for j := range resPrev.nodeSets[i] { + if !f(resPrev.nodeSets[i][j]) { + return nil + } + } + } } - ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID) - if err != nil { - return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err) + if resCur.err != nil { + if resPrev.err != nil { + return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w", + curEpoch, resCur.err, curEpoch-1, resPrev.err) + } + return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err) + } else if resPrev.err != nil { + return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, resPrev.err) } + return nil +} - for i := range ns { - for j := range ns[i] { - if !f(ns[i][j]) { - return nil - } +// preserves context of storage policy processing for the particular container. +type containerPolicyContext struct { + // static + id cid.ID + containers container.Source + network netmap.Source + // dynamic + cnr *container.Container +} + +// applyAtEpoch applies storage policy of container referenced by parameterized +// ID to the network map at the specified epoch. If cache is non-nil, +// applyAtEpoch read existing results from it and stores new results in it. +func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *containerNodesAtEpoch) (storagePolicyRes, error) { + if cache != nil { + cache.mtx.Lock() + defer cache.mtx.Unlock() + if result, ok := cache.lru.Get(x.id); ok { + return result, nil } } - - return nil + var result storagePolicyRes + var err error + if x.cnr == nil { + x.cnr, err = x.containers.Get(x.id) + if err != nil { + // non-persistent error => do not cache + return result, fmt.Errorf("read container by ID: %w", err) + } + } + networkMap, err := x.network.GetNetMapByEpoch(epoch) + if err != nil { + // non-persistent error => do not cache + return result, fmt.Errorf("read network map by epoch: %w", err) + } + result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id) + if cache != nil { + // lock already acquired above! + cache.lru.Add(x.id, result) + } + return result, nil } diff --git a/cmd/neofs-node/policy_test.go b/cmd/neofs-node/policy_test.go index 772589cedd..baeaad07ef 100644 --- a/cmd/neofs-node/policy_test.go +++ b/cmd/neofs-node/policy_test.go @@ -18,9 +18,19 @@ type testContainer struct { id cid.ID val container.Container err error + + calls []cid.ID +} + +func (x testContainer) assertCalledNTimesWith(t testing.TB, n int, id cid.ID) { + require.Len(t, x.calls, n) + for i := range x.calls { + require.Equal(t, id, x.calls[i]) + } } func (x *testContainer) Get(id cid.ID) (*containercore.Container, error) { + x.calls = append(x.calls, id) if !id.Equals(x.id) { return nil, fmt.Errorf("unexpected container requested %s!=%s", id, x.id) } @@ -38,13 +48,24 @@ type testNetwork struct { curNetmapErr error prevNetmap *netmap.NetMap prevNetmapErr error + + callEpochCount int + callsNetmap []uint64 } func (x *testNetwork) GetNetMap(diff uint64) (*netmap.NetMap, error) { panic("unexpected call") } +func (x testNetwork) assertNetmapCalledNTimes(t testing.TB, n int, epoch uint64) { + require.Len(t, x.callsNetmap, n) + for i := range x.callsNetmap { + require.EqualValues(t, epoch, x.callsNetmap[i]) + } +} + func (x *testNetwork) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) { + x.callsNetmap = append(x.callsNetmap, epoch) if epoch == x.epoch { return x.curNetmap, x.curNetmapErr } @@ -54,7 +75,12 @@ func (x *testNetwork) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) { return nil, fmt.Errorf("unexpected epoch #%d requested", epoch) } +func (x testNetwork) assertEpochCallCount(t testing.TB, n int) { + require.EqualValues(t, x.callEpochCount, n) +} + func (x *testNetwork) Epoch() (uint64, error) { + x.callEpochCount++ return x.epoch, x.epochErr } @@ -110,100 +136,289 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing. t.Run("read current epoch", func(t *testing.T) { epochErr := errors.New("any epoch error") - ns, err := newContainerNodes(new(testContainer), &testNetwork{epochErr: epochErr}) + network := &testNetwork{epochErr: epochErr} + ns, err := newContainerNodes(new(testContainer), network) require.NoError(t, err) - err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) - require.ErrorIs(t, err, epochErr) + for n := 1; n < 10; n++ { + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, epochErr) + require.EqualError(t, err, "read current NeoFS epoch: any epoch error") + // such error must not be cached + network.assertEpochCallCount(t, n) + } }) t.Run("read container failure", func(t *testing.T) { cnrErr := errors.New("any container error") - ns, err := newContainerNodes(&testContainer{ - id: anyCnr, - err: cnrErr, - }, &testNetwork{epoch: anyEpoch}) + cnrs := &testContainer{id: anyCnr, err: cnrErr} + ns, err := newContainerNodes(cnrs, &testNetwork{epoch: anyEpoch}) require.NoError(t, err) - err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) - require.ErrorIs(t, err, cnrErr) + for n := 1; n < 10; n++ { + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, cnrErr) + require.EqualError(t, err, "select container nodes for current epoch #42: read container by ID: any container error") + // such error must not be cached + cnrs.assertCalledNTimesWith(t, n, anyCnr) + } }) t.Run("read current netmap failure", func(t *testing.T) { curNetmapErr := errors.New("any current netmap error") - ns, err := newContainerNodes(&testContainer{id: anyCnr}, &testNetwork{ - epoch: anyEpoch, - curNetmapErr: curNetmapErr, - }) + network := &testNetwork{epoch: anyEpoch, curNetmapErr: curNetmapErr} + ns, err := newContainerNodes(&testContainer{id: anyCnr}, network) require.NoError(t, err) - err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) - require.ErrorIs(t, err, curNetmapErr) + for n := 1; n <= 10; n++ { + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t)) + require.ErrorIs(t, err, curNetmapErr) + require.EqualError(t, err, "select container nodes for current epoch #42: read network map by epoch: any current netmap error") + network.assertEpochCallCount(t, n) + // such error must not be cached + network.assertNetmapCalledNTimes(t, n, network.epoch) + } }) t.Run("zero current epoch", func(t *testing.T) { nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) - - ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ - epoch: 0, - curNetmap: curNetmap, - }) + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: 0, curNetmap: curNetmap} + ns, err := newContainerNodes(cnrs, network) require.NoError(t, err) - var calledKeys [][]byte - err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { - calledKeys = append(calledKeys, pubKey) - return true - }) - require.NoError(t, err) - require.Len(t, calledKeys, 2) - require.Contains(t, calledKeys, nodes[1].PublicKey()) - require.Contains(t, calledKeys, nodes[3].PublicKey()) + for n := 1; n < 10; n++ { + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + network.assertEpochCallCount(t, n) + // result is cached, no longer disturb the components + cnrs.assertCalledNTimesWith(t, 1, anyCnr) + network.assertNetmapCalledNTimes(t, 1, 0) + } }) t.Run("read previous network map failure", func(t *testing.T) { nodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) prevNetmapErr := errors.New("any previous netmap error") + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: curNetmap, prevNetmapErr: prevNetmapErr} - ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ - epoch: anyEpoch, - curNetmap: curNetmap, - prevNetmapErr: prevNetmapErr, - }) + ns, err := newContainerNodes(cnrs, network) require.NoError(t, err) - var calledKeys [][]byte - err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { - calledKeys = append(calledKeys, pubKey) - return true + for n := 1; n <= 10; n++ { + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.ErrorIs(t, err, prevNetmapErr) + require.EqualError(t, err, "select container nodes for previous epoch #41: read network map by epoch: any previous netmap error") + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, nodes[1].PublicKey()) + require.Contains(t, calledKeys, nodes[3].PublicKey()) + network.assertEpochCallCount(t, n) + // previous epoch result not cached, so container requested each time + cnrs.assertCalledNTimesWith(t, n, anyCnr) + require.Len(t, network.callsNetmap, 1+n) // 1st time succeeds for current epoch + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + for _, e := range network.callsNetmap[1:] { + require.EqualValues(t, network.epoch-1, e) + } + } + }) + + t.Run("apply policy failures", func(t *testing.T) { + curNodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + prevNodes, prevNetmap, _ := newNetmapWithContainer(t, 5, []int{0, 4}) + failNetmap := new(netmap.NetMap) + _, policyErr := failNetmap.ContainerNodes(cnr.PlacementPolicy(), anyCnr) + require.Error(t, policyErr) + + t.Run("current epoch succeeds, previous fails", func(t *testing.T) { + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: curNetmap, prevNetmap: failNetmap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.EqualError(t, err, fmt.Sprintf("select container nodes for previous epoch #41: %v", policyErr)) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, curNodes[1].PublicKey()) + require.Contains(t, calledKeys, curNodes[3].PublicKey()) + network.assertEpochCallCount(t, n) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyCnr) + require.Len(t, network.callsNetmap, 2) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + require.EqualValues(t, network.epoch-1, network.callsNetmap[1]) + } + }) + t.Run("current epoch fails, previous succeeds", func(t *testing.T) { + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: failNetmap, prevNetmap: prevNetmap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.EqualError(t, err, fmt.Sprintf("select container nodes for current epoch #42: %v", policyErr)) + require.Len(t, calledKeys, 2) + require.Contains(t, calledKeys, prevNodes[0].PublicKey()) + require.Contains(t, calledKeys, prevNodes[4].PublicKey()) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyCnr) + require.Len(t, network.callsNetmap, 2) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + require.EqualValues(t, network.epoch-1, network.callsNetmap[1]) + } + }) + t.Run("fail for both epochs", func(t *testing.T) { + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: failNetmap, prevNetmap: failNetmap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + for n := 1; n <= 10; n++ { + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.EqualError(t, err, + fmt.Sprintf("select container nodes for both epochs: (current#42) %v; (previous#41) %v", + policyErr, policyErr)) + require.Empty(t, calledKeys) + // assert results are cached + cnrs.assertCalledNTimesWith(t, 1, anyCnr) + require.Len(t, network.callsNetmap, 2) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + require.EqualValues(t, network.epoch-1, network.callsNetmap[1]) + } }) - require.ErrorIs(t, err, prevNetmapErr) - require.Len(t, calledKeys, 2) - require.Contains(t, calledKeys, nodes[1].PublicKey()) - require.Contains(t, calledKeys, nodes[3].PublicKey()) }) t.Run("both epochs OK", func(t *testing.T) { curNodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) prevNodes, prevNetmap, _ := newNetmapWithContainer(t, 5, []int{0, 4}) - - ns, err := newContainerNodes(&testContainer{id: anyCnr, val: cnr}, &testNetwork{ - epoch: anyEpoch, - curNetmap: curNetmap, - prevNetmap: prevNetmap, - }) + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: curNetmap, prevNetmap: prevNetmap} + ns, err := newContainerNodes(cnrs, network) require.NoError(t, err) - var calledKeys [][]byte - err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { - calledKeys = append(calledKeys, pubKey) - return true - }) - require.NoError(t, err) - require.Len(t, calledKeys, 4) - require.Contains(t, calledKeys, curNodes[1].PublicKey()) - require.Contains(t, calledKeys, curNodes[3].PublicKey()) - require.Contains(t, calledKeys, prevNodes[0].PublicKey()) - require.Contains(t, calledKeys, prevNodes[4].PublicKey()) + for n := 1; n <= 10; n++ { + var calledKeys [][]byte + err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + calledKeys = append(calledKeys, pubKey) + return true + }) + require.NoError(t, err) + require.Len(t, calledKeys, 4) + require.Contains(t, calledKeys, curNodes[1].PublicKey()) + require.Contains(t, calledKeys, curNodes[3].PublicKey()) + require.Contains(t, calledKeys, prevNodes[0].PublicKey()) + require.Contains(t, calledKeys, prevNodes[4].PublicKey()) + cnrs.assertCalledNTimesWith(t, 1, anyCnr) + require.Len(t, network.callsNetmap, 2) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + require.EqualValues(t, network.epoch-1, network.callsNetmap[1]) + } + }) + + t.Run("epoch switches", func(t *testing.T) { + curNodes, curNetmap, cnr := newNetmapWithContainer(t, 5, []int{1, 3}) + prevNodes, prevNetmap, _ := newNetmapWithContainer(t, 5, []int{0, 4}) + newNodes1, newNetmap1, _ := newNetmapWithContainer(t, 6, []int{2, 5}) + newNodes2, newNetmap2, _ := newNetmapWithContainer(t, 6, []int{3, 4}) + call := func(ns *containerNodes) (res [][]byte) { + err := ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool { + res = append(res, pubKey) + return true + }) + require.NoError(t, err) + return res + } + assertCall := func(cns *containerNodes, ns ...netmap.NodeInfo) { + res := call(cns) + require.Len(t, res, len(ns)) + for i := range ns { + require.Contains(t, res, ns[i].PublicKey()) + } + } + for _, tc := range []struct { + name string + changeEpoch func(*uint64) + newNetmaps [2]*netmap.NetMap // current, previous + selectedNodes []netmap.NodeInfo + extraReadNetmap []uint64 // current, previous + }{ + { + name: "increment", + changeEpoch: func(e *uint64) { *e++ }, + newNetmaps: [2]*netmap.NetMap{newNetmap1, curNetmap}, + selectedNodes: []netmap.NodeInfo{newNodes1[2], newNodes1[5], curNodes[1], curNodes[3]}, + extraReadNetmap: []uint64{anyEpoch + 1}, + }, + { + name: "long jump forward", + changeEpoch: func(e *uint64) { *e += 10 }, + newNetmaps: [2]*netmap.NetMap{newNetmap1, newNetmap2}, + selectedNodes: []netmap.NodeInfo{newNodes1[2], newNodes1[5], newNodes2[3], newNodes2[4]}, + extraReadNetmap: []uint64{anyEpoch + 10, anyEpoch + 9}, + }, + { + name: "decrement", + changeEpoch: func(e *uint64) { *e-- }, + newNetmaps: [2]*netmap.NetMap{prevNetmap, newNetmap1}, + selectedNodes: []netmap.NodeInfo{prevNodes[0], prevNodes[4], newNodes1[2], newNodes1[5]}, + extraReadNetmap: []uint64{anyEpoch - 2}, + }, + { + name: "long jump backward", + changeEpoch: func(e *uint64) { *e -= 10 }, + newNetmaps: [2]*netmap.NetMap{newNetmap1, newNetmap2}, + selectedNodes: []netmap.NodeInfo{newNodes1[2], newNodes1[5], newNodes2[3], newNodes2[4]}, + extraReadNetmap: []uint64{anyEpoch - 10, anyEpoch - 11}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnrs := &testContainer{id: anyCnr, val: cnr} + network := &testNetwork{epoch: anyEpoch, curNetmap: curNetmap, prevNetmap: prevNetmap} + ns, err := newContainerNodes(cnrs, network) + require.NoError(t, err) + + assertCall(ns, curNodes[1], curNodes[3], prevNodes[0], prevNodes[4]) + cnrs.assertCalledNTimesWith(t, 1, anyCnr) + require.Len(t, network.callsNetmap, 2) + require.EqualValues(t, network.epoch, network.callsNetmap[0]) + require.EqualValues(t, network.epoch-1, network.callsNetmap[1]) + + // update epoch + tc.changeEpoch(&network.epoch) + network.curNetmap, network.prevNetmap = tc.newNetmaps[0], tc.newNetmaps[1] + + assertCall(ns, tc.selectedNodes...) + // one more container and netmap calls + cnrs.assertCalledNTimesWith(t, 2, anyCnr) + require.Len(t, network.callsNetmap, 2+len(tc.extraReadNetmap)) + require.Equal(t, tc.extraReadNetmap, network.callsNetmap[2:]) + }) + } }) }