Skip to content

Commit

Permalink
node/policy: Cache container policy application results
Browse files Browse the repository at this point in the history
Application result of container (C) storage policy to the network map
(N) does not change for fixed C and N. Previously, `Search` and
`Replicate` object server handlers always calculated the list of
container nodes from scratch. This resulted in excessive node resource
consumption when there was a dense flow of requests for a small number
of containers per epoch. The obvious solution is to cache the latest
results.

A similar attempt had already been made earlier with
9269ed3, but it turned out to be
incorrect and did not change anything. As can be seen from the code, the
cache was checked only if the pointers of the received network map and
the last processed one matched. The latter was never set, so there were
no cache callsю

This adds a caching component for up to 1000 recently requested lists of
container nodes. By increasing the amount of memory retained, the
component will mitigate load spikes on a small number of containers. The
volume limit of 1000 was chosen heuristically as a first approximation.

Tests on the development environment showed a pretty good improvement,
but results on real load tests are yet to be obtained. Based on this,
similar optimization for other layers and queries will be done later.

Refs #2692.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 17, 2024
1 parent d68aa9f commit 265dc5e
Show file tree
Hide file tree
Showing 3 changed files with 461 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog for NeoFS Node
### Changed
- neofs-cli allows several objects deletion at a time (#2774)
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)

### Removed

Expand Down
143 changes: 109 additions & 34 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,59 @@ package main

import (
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// storagePolicyRes structures persistent storage policy application result for
// particular container and network map incl. error.
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
err error
}

type containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}

// containerNodesCache is a thread-safe LRU cache mapping (container, epoch)
// pairs into corresponding storage policy application results.
type containerNodesCache struct {
mtx sync.RWMutex
lru simplelru.LRUCache[containerNodesCacheKey, storagePolicyRes]
}

// max number of container storage policy applications results cached by
// containerNodes.
const cachedContainerNodesNum = 1000

// containerNodes wraps NeoFS network state to apply container storage policies.
//
// Since policy application results are consistent for fixed container and
// network map, they could be cached. The containerNodes caches up to
// cachedContainerNodesNum LRU results.
type containerNodes struct {
containers container.Source
network netmap.Source

cache *containerNodesCache
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
lru, err := simplelru.NewLRU[containerNodesCacheKey, storagePolicyRes](cachedContainerNodesNum, nil)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err)
}
return &containerNodes{
containers: containers,
network: network,
cache: &containerNodesCache{lru: lru},
}, nil
}

Expand All @@ -32,58 +68,97 @@ func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.
}

func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error {
epoch, err := x.network.Epoch()
curEpoch, err := x.network.Epoch()
if err != nil {
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnr, err := x.containers.Get(cnrID)
if err != nil {
return fmt.Errorf("read container by ID: %w", err)
}
cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network}

networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}
// TODO(#2692): node sets remain unchanged for fixed container and network map,
// so recently calculated results worth caching
ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
resCur, err := cnrCtx.applyAtEpoch(curEpoch, x.cache)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
}

for i := range ns {
for j := range ns[i] {
if !f(ns[i][j]) {
return nil
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
} else if resCur.err == nil { // error case handled below
for i := range resCur.nodeSets {
for j := range resCur.nodeSets[i] {
if !f(resCur.nodeSets[i][j]) {
return nil
}
}
}
}

if !withPrevEpoch || epoch == 0 {
if !withPrevEpoch || curEpoch == 0 {
if resCur.err != nil {
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err)
}
return nil
}

epoch--

networkMap, err = x.network.GetNetMapByEpoch(epoch)
resPrev, err := cnrCtx.applyAtEpoch(curEpoch-1, x.cache)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
if resCur.err != nil {
return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w",
curEpoch, resCur.err, curEpoch-1, err)
}
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, err)
} else if resPrev.err == nil { // error case handled below
for i := range resPrev.nodeSets {
for j := range resPrev.nodeSets[i] {
if !f(resPrev.nodeSets[i][j]) {
return nil
}
}
}
}

ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
if resCur.err != nil {
if resPrev.err != nil {
return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w",
curEpoch, resCur.err, curEpoch-1, resPrev.err)
}
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err)
} else if resPrev.err != nil {
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, resPrev.err)
}
return nil
}

for i := range ns {
for j := range ns[i] {
if !f(ns[i][j]) {
return nil
}
// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
id cid.ID
containers container.Source
network netmap.Source
// dynamic
cnr *container.Container
}

// applyAtEpoch applies storage policy of container referenced by parameterized
// ID to the network map at the specified epoch. If cache is non-nil,
// applyAtEpoch read existing results from it and stores new results in it.
func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *containerNodesCache) (storagePolicyRes, error) {
cache.mtx.Lock()
defer cache.mtx.Unlock()
cacheKey := containerNodesCacheKey{epoch, x.id}
if result, ok := cache.lru.Get(cacheKey); ok {
return result, nil
}
var result storagePolicyRes
var err error
if x.cnr == nil {
x.cnr, err = x.containers.Get(x.id)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
}
}

return nil
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read network map by epoch: %w", err)
}
result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id)
cache.lru.Add(cacheKey, result)
return result, nil
}
Loading

0 comments on commit 265dc5e

Please sign in to comment.