Skip to content

Commit

Permalink
Cache storage policy application results (Put) (#2901)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Aug 30, 2024
2 parents 9975aa4 + 2bb75dd commit 35d3c67
Show file tree
Hide file tree
Showing 8 changed files with 896 additions and 235 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Changelog for NeoFS Node
### Fixed

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)

### Removed

Expand Down
78 changes: 76 additions & 2 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,13 @@ func initObjectService(c *cfg) {
searchsvcV2.WithKeyStorage(keyStorage),
)

sPut := putsvc.NewService(&transport{clients: putConstructor},
sPut := putsvc.NewService(&transport{clients: putConstructor}, c,
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
putsvc.WithObjectStorage(storageEngine{engine: ls}),
putsvc.WithContainerSource(c.cfgObject.cnrSource),
putsvc.WithNetworkMapSource(c.netMapSource),
putsvc.WithNetmapKeys(c),
putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log),
Expand Down Expand Up @@ -758,3 +757,78 @@ func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) {

return serverInContainer, nil
}

// GetContainerNodes reads storage policy of the referenced container from the
// underlying container storage, reads current network map from the underlying
// storage, applies the storage policy to it, gathers storage nodes matching the
// policy and returns sort interface.
//
// GetContainerNodes implements [putsvc.NeoFSNetwork].
func (c *cfg) GetContainerNodes(cnrID cid.ID) (putsvc.ContainerNodes, error) {
cnr, err := c.cfgObject.containerNodes.containers.Get(cnrID)
if err != nil {
return nil, fmt.Errorf("read container by ID: %w", err)
}
curEpoch, err := c.cfgObject.containerNodes.network.Epoch()
if err != nil {
return nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
networkMap, err := c.cfgObject.containerNodes.network.GetNetMapByEpoch(curEpoch)
if err != nil {
return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err)
}
policy := cnr.Value.PlacementPolicy()
nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return nil, fmt.Errorf("apply container storage policy to the network map at current epoch #%d: %w", curEpoch, err)
}
repCounts := make([]uint, policy.NumberOfReplicas())
for i := range repCounts {
repCounts[i] = uint(policy.ReplicaNumberByIndex(i))
}
return &containerNodesSorter{
policy: storagePolicyRes{
nodeSets: nodeSets,
repCounts: repCounts,
},
networkMap: networkMap,
cnrID: cnrID,
curEpoch: curEpoch,
containerNodes: c.cfgObject.containerNodes,
}, nil
}

// implements [putsvc.ContainerNodes].
type containerNodesSorter struct {
policy storagePolicyRes
networkMap *netmapsdk.NetMap
cnrID cid.ID
curEpoch uint64
containerNodes *containerNodes
}

func (x *containerNodesSorter) Unsorted() [][]netmapsdk.NodeInfo { return x.policy.nodeSets }
func (x *containerNodesSorter) PrimaryCounts() []uint { return x.policy.repCounts }
func (x *containerNodesSorter) SortForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, error) {
cacheKey := objectNodesCacheKey{epoch: x.curEpoch}
cacheKey.addr.SetContainer(x.cnrID)
cacheKey.addr.SetObject(obj)
res, ok := x.containerNodes.objCache.Get(cacheKey)
if ok {
return res.nodeSets, res.err
}
if x.networkMap == nil {
var err error
if x.networkMap, err = x.containerNodes.network.GetNetMapByEpoch(x.curEpoch); err != nil {
// non-persistent error => do not cache
return nil, fmt.Errorf("read network map by epoch: %w", err)
}
}
res.repCounts = x.policy.repCounts
res.nodeSets, res.err = x.containerNodes.sortContainerNodesFunc(*x.networkMap, x.policy.nodeSets, obj)
if res.err != nil {
res.err = fmt.Errorf("sort container nodes for object: %w", res.err)
}
x.containerNodes.objCache.Add(cacheKey, res)
return res.nodeSets, res.err
}
30 changes: 19 additions & 11 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,9 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
if ok {
return res.nodeSets, res.repCounts, res.err
}
cnrRes, networkMap, err := (&containerPolicyContext{
id: addr.Container(),
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || cnrRes.err != nil {
if err == nil {
err = cnrRes.err // cached in x.cache, no need to store in x.objCache
}
return nil, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
cnrRes, networkMap, err := x.getForCurrentEpoch(curEpoch, addr.Container())
if err != nil {
return nil, nil, err
}
if networkMap == nil {
if networkMap, err = x.network.GetNetMapByEpoch(curEpoch); err != nil {
Expand All @@ -188,6 +180,22 @@ func (x *containerNodes) getNodesForObject(addr oid.Address) ([][]netmapsdk.Node
return res.nodeSets, res.repCounts, res.err
}

func (x *containerNodes) getForCurrentEpoch(curEpoch uint64, cnr cid.ID) (storagePolicyRes, *netmapsdk.NetMap, error) {
policy, networkMap, err := (&containerPolicyContext{
id: cnr,
containers: x.containers,
network: x.network,
getNodesFunc: x.getContainerNodesFunc,
}).applyToNetmap(curEpoch, x.cache)
if err != nil || policy.err != nil {
if err == nil {
err = policy.err // cached in x.cache, no need to store in x.objCache
}
return storagePolicyRes{}, nil, fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
}
return policy, networkMap, nil
}

// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
Expand Down
Loading

0 comments on commit 35d3c67

Please sign in to comment.