Skip to content

Commit

Permalink
node/policy: Cache up to 1000 container policy application results
Browse files Browse the repository at this point in the history
TBD

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 11, 2024
1 parent d68aa9f commit b7b6717
Show file tree
Hide file tree
Showing 2 changed files with 442 additions and 91 deletions.
200 changes: 168 additions & 32 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,114 @@ package main

import (
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// storagePolicyRes structures persistent storage policy application result for
// particular container and network map incl. error.
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
err error
}

// containerNodesAtEpoch is a thread-safe LRU cache mapping containers into
// storage policy application results for particular network map.
type containerNodesAtEpoch struct {
mtx sync.RWMutex
lru simplelru.LRUCache[cid.ID, storagePolicyRes]
}

// containerNodesCaches groups caches for all tracked epochs.
type containerNodesCaches struct{ cur, prev *containerNodesAtEpoch }

// max number of container storage policy applications results cached by for
// each epoch.
const cachedContainerNodesPerEpochNum = 1000

// containerNodes wraps NeoFS network state to apply container storage policies.
//
// Since policy application results are consistent for fixed container and
// network map, they could be cached. The containerNodes caches up to
// cachedContainerNodesPerEpochNum results for the latest and the previous
// epochs. The previous one is required to support data migration after the
// epoch tick. Older epochs are not cached.
type containerNodes struct {
containers container.Source
network netmap.Source

lastSeenCurrentEpochMtx sync.Mutex
lastSeenCurrentEpoch uint64
epochCaches containerNodesCaches
}

func newContainerNodesAtEpochLRUCache() (simplelru.LRUCache[cid.ID, storagePolicyRes], error) {
lru, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err)

Check warning on line 54 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L54

Added line #L54 was not covered by tests
}
return lru, nil
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
lru, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return nil, err

Check warning on line 62 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L62

Added line #L62 was not covered by tests
}
return &containerNodes{
containers: containers,
network: network,
epochCaches: containerNodesCaches{
cur: &containerNodesAtEpoch{lru: lru},
},
}, nil
}

func (x *containerNodes) updateAndGetCachesForCurrentEpoch(curEpoch uint64) (containerNodesCaches, error) {
x.lastSeenCurrentEpochMtx.Lock()
defer x.lastSeenCurrentEpochMtx.Unlock()

if curEpoch == x.lastSeenCurrentEpoch {
return x.epochCaches, nil
}

lru, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return containerNodesCaches{}, err

Check warning on line 83 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L83

Added line #L83 was not covered by tests
}
if curEpoch > x.lastSeenCurrentEpoch {
if curEpoch == x.lastSeenCurrentEpoch+1 {
fmt.Println("INCREMENT")
x.epochCaches.prev = x.epochCaches.cur
} else {
lruPrev, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return containerNodesCaches{}, err

Check warning on line 92 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L92

Added line #L92 was not covered by tests
}
x.epochCaches.prev = &containerNodesAtEpoch{lru: lruPrev}
}
x.epochCaches.cur = &containerNodesAtEpoch{lru: lru}
} else if curEpoch < x.lastSeenCurrentEpoch { // not really expected in practice
if curEpoch == x.lastSeenCurrentEpoch-1 {
x.epochCaches.cur = x.epochCaches.prev
} else {
lruCur, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return containerNodesCaches{}, err

Check warning on line 103 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L103

Added line #L103 was not covered by tests
}
x.epochCaches.cur = &containerNodesAtEpoch{lru: lruCur}
}
x.epochCaches.prev = &containerNodesAtEpoch{lru: lru}
}
x.lastSeenCurrentEpoch = curEpoch
return x.epochCaches, nil
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
Expand All @@ -32,58 +120,106 @@ func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.
}

func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error {
epoch, err := x.network.Epoch()
curEpoch, err := x.network.Epoch()
if err != nil {
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnr, err := x.containers.Get(cnrID)
caches, err := x.updateAndGetCachesForCurrentEpoch(curEpoch)
if err != nil {
return fmt.Errorf("read container by ID: %w", err)
return err

Check warning on line 130 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L130

Added line #L130 was not covered by tests
}

networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}
// TODO(#2692): node sets remain unchanged for fixed container and network map,
// so recently calculated results worth caching
ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
}
cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network}

for i := range ns {
for j := range ns[i] {
if !f(ns[i][j]) {
return nil
resCur, err := cnrCtx.applyAtEpoch(curEpoch, caches.cur)
if err != nil {
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
} else if resCur.err == nil { // error case handled below
for i := range resCur.nodeSets {
for j := range resCur.nodeSets[i] {
if !f(resCur.nodeSets[i][j]) {
return nil

Check warning on line 142 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L142

Added line #L142 was not covered by tests
}
}
}
}

if !withPrevEpoch || epoch == 0 {
if !withPrevEpoch || curEpoch == 0 {
if resCur.err != nil {
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err)

Check warning on line 150 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L150

Added line #L150 was not covered by tests
}
return nil
}

epoch--

networkMap, err = x.network.GetNetMapByEpoch(epoch)
resPrev, err := cnrCtx.applyAtEpoch(curEpoch-1, caches.prev)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
if resCur.err != nil {
return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w",
curEpoch, resCur.err, curEpoch-1, err)

Check warning on line 159 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L158-L159

Added lines #L158 - L159 were not covered by tests
}
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, err)
} else if resPrev.err == nil { // error case handled below
for i := range resPrev.nodeSets {
for j := range resPrev.nodeSets[i] {
if !f(resPrev.nodeSets[i][j]) {
return nil

Check warning on line 166 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L166

Added line #L166 was not covered by tests
}
}
}
}

ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
if resCur.err != nil {
if resPrev.err != nil {
return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w",
curEpoch, resCur.err, curEpoch-1, resPrev.err)
}
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err)
} else if resPrev.err != nil {
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, resPrev.err)
}
return nil
}

for i := range ns {
for j := range ns[i] {
if !f(ns[i][j]) {
return nil
}
// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
id cid.ID
containers container.Source
network netmap.Source
// dynamic
cnr *container.Container
}

// applyAtEpoch applies storage policy of container referenced by parameterized
// ID to the network map at the specified epoch. If cache is non-nil,
// applyAtEpoch read existing results from it and stores new results in it.
func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *containerNodesAtEpoch) (storagePolicyRes, error) {
if cache != nil {
cache.mtx.Lock()
defer cache.mtx.Unlock()
if result, ok := cache.lru.Get(x.id); ok {
return result, nil
}
}

return nil
var result storagePolicyRes
var err error
if x.cnr == nil {
x.cnr, err = x.containers.Get(x.id)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
}
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read network map by epoch: %w", err)
}
result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id)
if cache != nil {
// lock already acquired above!
cache.lru.Add(x.id, result)
}
return result, nil
}
Loading

0 comments on commit b7b6717

Please sign in to comment.