Skip to content

Commit

Permalink
node/policy: Cache object policy application results
Browse files Browse the repository at this point in the history
Continues 10d05a4 for sorting container
nodes for objects.

Since each container may include plenty of objects, cache size limit is
chosen - again heuristically - 10 times bigger, i.e. 10K.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 18, 2024
1 parent 3ed1ae7 commit 383e341
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changelog for NeoFS Node
- neofs-cli allows several objects deletion at a time (#2774)
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)

### Removed

Expand Down
156 changes: 100 additions & 56 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,30 @@ import (
// storagePolicyRes structures persistent storage policy application result for
// particular container and network map incl. error.
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
err error
nodeSets [][]netmapsdk.NodeInfo
repCounts []uint
err error
}

type containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}
type (
containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}
objectNodesCacheKey struct {
epoch uint64
addr oid.Address
}
)

// max number of container storage policy applications results cached by
// containerNodes.
const cachedContainerNodesNum = 1000
const (
// max number of container storage policy applications results cached by
// containerNodes.
cachedContainerNodesNum = 1000
// max number of object storage policy applications results cached by
// containerNodes.
cachedObjectNodesNum = 10000
)

// containerNodes wraps NeoFS network state to apply container storage policies.
//
Expand All @@ -36,18 +48,24 @@ type containerNodes struct {
containers container.Source
network netmap.Source

cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes]
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
l, err := lru.New[containerNodesCacheKey, storagePolicyRes](cachedContainerNodesNum)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err)
}
lo, err := lru.New[objectNodesCacheKey, storagePolicyRes](cachedObjectNodesNum)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for objects: %w", err)
}
return &containerNodes{
containers: containers,
network: network,
cache: l,
objCache: lo,
}, nil
}

Expand Down Expand Up @@ -117,6 +135,47 @@ func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool,
return nil
}

// getNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
curEpoch, err := x.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cacheKey := objectNodesCacheKey{curEpoch, addr}
res, ok := x.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
// non-persistent error => do not cache
return nil, nil, fmt.Errorf("read network map by epoch: %w", err)
}
}
res.repCounts = cnrRes.repCounts
res.nodeSets, res.err = networkMap.PlacementVectors(cnrRes.nodeSets, addr.Object())
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
x.objCache.Add(cacheKey, res)
return res.nodeSets, res.repCounts, res.err
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
Expand All @@ -131,67 +190,52 @@ type containerPolicyContext struct {
// ID to the network map at the specified epoch. applyAtEpoch checks existing
// results in the cache and stores new results in it.
func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, error) {
res, _, err := x.applyToNetmap(epoch, cache)
return res, err
}

// applyToNetmap applies storage policy of container referenced by parameterized
// ID to the network map at the specified epoch. applyAtEpoch checks existing
// results in the cache and stores new results in it. Network map is returned if
// it was requested, i.e. on cache miss only.
func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, *netmapsdk.NetMap, error) {
cacheKey := containerNodesCacheKey{epoch, x.id}
if result, ok := cache.Get(cacheKey); ok {
return result, nil
return result, nil, nil
}
var result storagePolicyRes
var err error
if x.cnr == nil {
x.cnr, err = x.containers.Get(x.id)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
return result, nil, fmt.Errorf("read container by ID: %w", err)
}
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read network map by epoch: %w", err)
return result, nil, fmt.Errorf("read network map by epoch: %w", err)
}
result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id)
cache.Add(cacheKey, result)
return result, nil
}

// getNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
epoch, err := x.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cnrID := addr.Container()
cnr, err := x.containers.Get(cnrID)
if err != nil {
return nil, nil, fmt.Errorf("read container by ID: %w", err)
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}

policy := cnr.Value.PlacementPolicy()
nodeLists, err := networkMap.ContainerNodes(policy, cnrID)
if err != nil {
return nil, nil, fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
} else if nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object()); err != nil {
return nil, nil, fmt.Errorf("sort container nodes from the network map at epoch #%d: %w", epoch, err)
} else if len(nodeLists) != policy.NumberOfReplicas() {
return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"diff number of storage node lists (%d) and required replica descriptors (%d)", epoch, len(nodeLists), policy.NumberOfReplicas())
}

primaryCounts := make([]uint, len(nodeLists))
for i := range nodeLists {
if primaryCounts[i] = uint(policy.ReplicaNumberByIndex(i)); primaryCounts[i] > uint(len(nodeLists[i])) {
return nil, nil, fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"invalid storage node list #%d: number of nodes (%d) is less than minimum required by the container policy (%d)",
epoch, i, len(nodeLists), policy.NumberOfReplicas())
policy := x.cnr.Value.PlacementPolicy()
result.nodeSets, result.err = networkMap.ContainerNodes(policy, x.id)
if result.err == nil {
// ContainerNodes should control following, but still better to double-check
if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum {
result.err = fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"diff number of storage node sets (%d) and required replica descriptors (%d)",
epoch, len(result.nodeSets), policyNum)
}
result.repCounts = make([]uint, len(result.nodeSets))
for i := range result.nodeSets {
if result.repCounts[i] = uint(policy.ReplicaNumberByIndex(i)); result.repCounts[i] > uint(len(result.nodeSets[i])) {
result.err = fmt.Errorf("invalid result of container's storage policy application to the network map at epoch #%d: "+
"invalid storage node set #%d: number of nodes (%d) is less than minimum required by the container policy (%d)",
epoch, i, len(result.nodeSets), result.repCounts[i])
break
}
}
}
return nodeLists, primaryCounts, nil
cache.Add(cacheKey, result)
return result, networkMap, nil
}
Loading

0 comments on commit 383e341

Please sign in to comment.