Skip to content

Commit

Permalink
node/object: Cache object policy application results for Put service
Browse files Browse the repository at this point in the history
Continues 5389a1e for `ObjectService`'s
`Put` server handler. It shares the cache with other object processors.

Refs #1803.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Aug 21, 2024
1 parent 5076fe2 commit cd8a33c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Changelog for NeoFS Node
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Search` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)
- `ObjectService`'s `Get`/`Head`/`GetRange`/`Put` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896, #2901)

### Updated
- neofs-contract dependency to 0.20.0 (#2872)
Expand Down
41 changes: 32 additions & 9 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) {
for i := range repCounts {
repCounts[i] = uint(policy.ReplicaNumberByIndex(i))

Check warning on line 787 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L785-L787

Added lines #L785 - L787 were not covered by tests
}
return containerNodesSorter{
return &containerNodesSorter{
policy: storagePolicyRes{
nodeSets: nodeSets,
repCounts: repCounts,
Expand All @@ -797,12 +797,35 @@ func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) {

// implements [putsvc.ContainerNodes].
type containerNodesSorter struct {
policy storagePolicyRes
networkMap *netmapsdk.NetMap
}

func (x containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
return x.networkMap.PlacementVectors(x.policy.nodeSets, obj)
policy storagePolicyRes
networkMap *netmapsdk.NetMap
cnrID cid.ID
curEpoch uint64
containerNodes *containerNodes
}

func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
cacheKey := objectNodesCacheKey{epoch: x.curEpoch}
cacheKey.addr.SetContainer(x.cnrID)
cacheKey.addr.SetObject(obj)
res, ok := x.containerNodes.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.err

Check warning on line 815 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L807-L815

Added lines #L807 - L815 were not covered by tests
}
if x.networkMap == nil {
var err error
if x.networkMap, err = x.containerNodes.network.GetNetMapByEpoch(x.curEpoch); err != nil {

Check warning on line 819 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L817-L819

Added lines #L817 - L819 were not covered by tests
// non-persistent error => do not cache
return nil, fmt.Errorf("read network map by epoch: %w", err)

Check warning on line 821 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L821

Added line #L821 was not covered by tests
}
}
res.repCounts = x.policy.repCounts
res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj)
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)

Check warning on line 827 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L824-L827

Added lines #L824 - L827 were not covered by tests
}
x.containerNodes.objCache.Add(cacheKey, res)
return res.nodeSets, res.err

Check warning on line 830 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L829-L830

Added lines #L829 - L830 were not covered by tests
}
30 changes: 19 additions & 11 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,9 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container())
if err != nil {
return nil, nil, err
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
Expand All @@ -188,6 +180,22 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
return res.nodeSets, res.repCounts, res.err
}

func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) {
policy, networkMap, err := (&containerPolicyContext{
id: cnr,
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || policy.err != nil {
if err == nil {
err = policy.err // cached in x.cache, no need to store in x.objCache
}
return storagePolicyRes{}, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
return policy, networkMap, nil
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
Expand Down

0 comments on commit cd8a33c

Please sign in to comment.