Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache storage policy application results (Get/Head/GetRange) #2896

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Changelog for NeoFS Node
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)

### Removed

Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ type cfgObject struct {
cfgLocalStorage cfgLocalStorage

tombstoneLifetime uint64

containerNodes *containerNodes
}

type cfgLocalStorage struct {
Expand Down
28 changes: 19 additions & 9 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,12 @@
policer.WithObjectBatchSize(c.applicationConfiguration.policer.objectBatchSize),
)

traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)

c.workers = append(c.workers, c.shared.policer)

sGet := getsvc.New(
sGet := getsvc.New(c,

Check warning on line 229 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L229

Added line #L229 was not covered by tests
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)

Expand All @@ -250,6 +242,7 @@

cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource)
fatalOnErr(err)
c.cfgObject.containerNodes = cnrNodes

Check warning on line 245 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L245

Added line #L245 was not covered by tests

sSearch := searchsvc.New(newRemoteContainerNodes(cnrNodes, c.IsLocalKey),
searchsvc.WithLogger(c.log),
Expand Down Expand Up @@ -719,3 +712,20 @@

return sw.ids, nil
}

// IsLocalNodePublicKey checks whether given binary-encoded public key is
// assigned in the network map to a local storage node.
//
// IsLocalNodePublicKey implements [getsvc.NeoFSNetwork].
func (c *cfg) IsLocalNodePublicKey(b []byte) bool { return c.IsLocalKey(b) }

Check warning on line 720 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L720

Added line #L720 was not covered by tests

// GetNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
//
// GetNodesForObject implements [getsvc.NeoFSNetwork].
func (c *cfg) GetNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
return c.cfgObject.containerNodes.getNodesForObject(addr)

Check warning on line 730 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L729-L730

Added lines #L729 - L730 were not covered by tests
}
145 changes: 123 additions & 22 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,41 @@
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// storagePolicyRes structures persistent storage policy application result for
// particular container and network map incl. error.
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
err error
nodeSets [][]netmapsdk.NodeInfo
repCounts []uint
err error
}

type containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}
type (
containerNodesCacheKey struct {
epoch uint64
cnr cid.ID
}
objectNodesCacheKey struct {
epoch uint64
addr oid.Address
}
)

// max number of container storage policy applications results cached by
// containerNodes.
const cachedContainerNodesNum = 1000
const (
// max number of container storage policy applications results cached by
// containerNodes.
cachedContainerNodesNum = 1000
// max number of object storage policy applications results cached by
// containerNodes.
cachedObjectNodesNum = 10000
)

type (
getContainerNodesFunc = func(netmapsdk.NetMap, netmapsdk.PlacementPolicy, cid.ID) ([][]netmapsdk.NodeInfo, error)
sortContainerNodesFunc = func(netmapsdk.NetMap, [][]netmapsdk.NodeInfo, oid.ID) ([][]netmapsdk.NodeInfo, error)
)

// containerNodes wraps NeoFS network state to apply container storage policies.
//
Expand All @@ -35,18 +53,30 @@
containers container.Source
network netmap.Source

cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need, it contains container nodes

objCache *lru.Cache[objectNodesCacheKey, storagePolicyRes]

// for testing
getContainerNodesFunc getContainerNodesFunc
sortContainerNodesFunc sortContainerNodesFunc
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
l, err := lru.New[containerNodesCacheKey, storagePolicyRes](cachedContainerNodesNum)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err)
}
lo, err := lru.New[objectNodesCacheKey, storagePolicyRes](cachedObjectNodesNum)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for objects: %w", err)

Check warning on line 71 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L71

Added line #L71 was not covered by tests
}
return &containerNodes{
containers: containers,
network: network,
cache: l,
containers: containers,
network: network,
cache: l,
objCache: lo,
getContainerNodesFunc: netmapsdk.NetMap.ContainerNodes,
sortContainerNodesFunc: netmapsdk.NetMap.PlacementVectors,
}, nil
}

Expand All @@ -65,7 +95,7 @@
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network}
cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network, getNodesFunc: x.getContainerNodesFunc}

resCur, err := cnrCtx.applyAtEpoch(curEpoch, x.cache)
if err != nil {
Expand Down Expand Up @@ -116,12 +146,55 @@
return nil
}

// getNodesForObject reads storage policy of the referenced container from the
// underlying container storage, reads network map at the specified epoch from
// the underlying storage, applies the storage policy to it and returns sorted
// lists of selected storage nodes along with the per-list numbers of primary
// object holders. Resulting slices must not be changed.
func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.NodeInfo, []uint, error) {
curEpoch, err := x.network.Epoch()
if err != nil {
return nil, nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
cacheKey := objectNodesCacheKey{curEpoch, addr}
res, ok := x.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {

Check warning on line 177 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L177

Added line #L177 was not covered by tests
// non-persistent error => do not cache
return nil, nil, fmt.Errorf("read network map by epoch: %w", err)

Check warning on line 179 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L179

Added line #L179 was not covered by tests
}
}
res.repCounts = cnrRes.repCounts
res.nodeSets, res.err = x.sortContainerNodesFunc(*networkMap, cnrRes.nodeSets, addr.Object())
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
x.objCache.Add(cacheKey, res)
return res.nodeSets, res.repCounts, res.err
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
id cid.ID
containers container.Source
network netmap.Source
id cid.ID
containers container.Source
network netmap.Source
getNodesFunc getContainerNodesFunc
// dynamic
cnr *container.Container
}
Expand All @@ -130,25 +203,53 @@
// ID to the network map at the specified epoch. applyAtEpoch checks existing
// results in the cache and stores new results in it.
func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, error) {
res, _, err := x.applyToNetmap(epoch, cache)
return res, err
}

// applyToNetmap applies storage policy of container referenced by parameterized
// ID to the network map at the specified epoch. applyAtEpoch checks existing
// results in the cache and stores new results in it. Network map is returned if
// it was requested, i.e. on cache miss only.
func (x *containerPolicyContext) applyToNetmap(epoch uint64, cache *lru.Cache[containerNodesCacheKey, storagePolicyRes]) (storagePolicyRes, *netmapsdk.NetMap, error) {
cacheKey := containerNodesCacheKey{epoch, x.id}
if result, ok := cache.Get(cacheKey); ok {
return result, nil
return result, nil, nil
}
var result storagePolicyRes
var err error
if x.cnr == nil {
x.cnr, err = x.containers.Get(x.id)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
return result, nil, fmt.Errorf("read container by ID: %w", err)
}
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read network map by epoch: %w", err)
return result, nil, fmt.Errorf("read network map by epoch: %w", err)
}
policy := x.cnr.Value.PlacementPolicy()
result.nodeSets, result.err = x.getNodesFunc(*networkMap, policy, x.id)
if result.err == nil {
// ContainerNodes should control following, but still better to double-check
if policyNum := policy.NumberOfReplicas(); len(result.nodeSets) != policyNum {
result.err = fmt.Errorf("invalid result of container's storage policy application to the network map: "+
"diff number of storage node sets (%d) and required replica descriptors (%d)",
len(result.nodeSets), policyNum)
} else {
result.repCounts = make([]uint, len(result.nodeSets))
for i := range result.nodeSets {
if result.repCounts[i] = uint(policy.ReplicaNumberByIndex(i)); result.repCounts[i] > uint(len(result.nodeSets[i])) {
result.err = fmt.Errorf("invalid result of container's storage policy application to the network map: "+
"invalid storage node set #%d: number of nodes (%d) is less than minimum required by the container policy (%d)",
i, len(result.nodeSets[i]), result.repCounts[i])
break
}
}
}
}
result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id)
cache.Add(cacheKey, result)
return result, nil
return result, networkMap, nil
}
Loading
Loading