Skip to content

Commit

Permalink
Context fixes (#2930)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Sep 11, 2024
2 parents 1a5809e + d0199c1 commit cfc1f43
Show file tree
Hide file tree
Showing 14 changed files with 46 additions and 62 deletions.
11 changes: 2 additions & 9 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,10 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.A
})
}

func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.Address) {
func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleDeletedLocks(lockers)

select {
case <-ctx.Done():
e.log.Info("interrupt processing the deleted locks by context")
return true
default:
return false
}
return false
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(_ context.Context, aa []oid.Address) {
shard.WithDeletedLockCallback(func(aa []oid.Address) {
sh.HandleDeletedLocks(aa)
}),
shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/local_object_storage/shard/inhume.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"errors"
"fmt"

Expand Down Expand Up @@ -122,7 +121,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
s.decObjectCounterBy(logical, res.AvailableInhumed())

if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
s.deletedLockCallBack(context.Background(), deletedLockObjs)
s.deletedLockCallBack(deletedLockObjs)
}

return InhumeRes{}, nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/local_object_storage/shard/lock_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard_test

import (
"context"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -45,7 +44,7 @@ func TestShard_Lock(t *testing.T) {
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
shard.WithDeletedLockCallback(func(addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
type ExpiredObjectsCallback func(context.Context, []oid.Address)

// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func(context.Context, []oid.Address)
type DeletedLockCallback func([]oid.Address)

// MetricsWriter is an interface that must store shard's metrics.
type MetricsWriter interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
exec.setLogger(s.log)
}

exec.execute()
exec.execute() //nolint:contextcheck // It is in fact passed via execCtx

return exec.statusError
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/services/object/search/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"go.uber.org/zap"
)

func (exec *execCtx) executeOnContainer() {
func (exec *execCtx) executeOnContainer(ectx context.Context) {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
}

exec.log.Debug("trying to execute in container...")

ctx, cancel := context.WithCancel(exec.context())
ctx, cancel := context.WithCancel(ectx)
defer cancel()

mProcessedNodes := make(map[string]struct{})
Expand Down Expand Up @@ -85,7 +85,7 @@ func (exec *execCtx) executeOnContainer() {
return
}

ids, err := c.searchObjects(exec, info)
ids, err := c.searchObjects(ctx, exec, info)
if err != nil {
lg.Debug("remote operation failed",
zap.String("error", err.Error()))
Expand Down
8 changes: 0 additions & 8 deletions pkg/services/object/search/exec.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package searchsvc

import (
"context"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -17,8 +15,6 @@ type statusError struct {
type execCtx struct {
svc *Service

ctx context.Context

prm Prm

statusError
Expand Down Expand Up @@ -47,10 +43,6 @@ func (exec *execCtx) setLogger(l *zap.Logger) {
)
}

func (exec execCtx) context() context.Context {
return exec.ctx
}

func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/services/object/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,28 @@ func (s *Service) Search(ctx context.Context, prm Prm) error {

exec := &execCtx{
svc: s,
ctx: ctx,
prm: prm,
}

exec.prepare()

exec.setLogger(s.log)

exec.execute()
exec.execute(ctx)

return exec.statusError.err
}

func (exec *execCtx) execute() {
func (exec *execCtx) execute(ctx context.Context) {
exec.log.Debug("serving request...")

// perform local operation
exec.executeLocal()

exec.analyzeStatus(true)
exec.analyzeStatus(ctx, true)
}

func (exec *execCtx) analyzeStatus(execCnr bool) {
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result
switch exec.status {
default:
Expand All @@ -59,8 +58,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
}

if execCnr {
exec.executeOnContainer()
exec.analyzeStatus(false)
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
return v.ids, v.err
}

func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
func (c *testStorage) searchObjects(_ context.Context, exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
v, ok := c.items[exec.containerID().EncodeToString()]
if !ok {
return nil, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/services/object/search/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package searchsvc

import (
"context"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
Expand All @@ -23,7 +25,7 @@ type Option func(*cfg)
type searchClient interface {
// searchObjects searches objects on the specified node.
// MUST NOT modify execCtx as it can be accessed concurrently.
searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error)
searchObjects(context.Context, *execCtx, client.NodeInfo) ([]oid.ID, error)
}

// Containers provides information about NeoFS containers necessary for the
Expand Down
5 changes: 3 additions & 2 deletions pkg/services/object/search/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package searchsvc

import (
"context"
"sync"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, erro
}, nil
}

func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oid.ID, error) {
func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) {
if exec.prm.forwarder != nil {
return exec.prm.forwarder(info, c.client)
}
Expand All @@ -89,7 +90,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi

var prm internalclient.SearchObjectsPrm

prm.SetContext(exec.context())
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down
37 changes: 17 additions & 20 deletions pkg/services/policer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
}

c := &processPlacementContext{
Context: ctx,
object: addrWithType,
checkedNodes: newNodeCache(),
}
Expand All @@ -129,7 +128,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
default:
}

p.processNodes(c, nn[i], policy.ReplicaNumberByIndex(i))
p.processNodes(ctx, c, nn[i], policy.ReplicaNumberByIndex(i))
}

// if context is done, needLocalCopy might not be able to calculate
Expand Down Expand Up @@ -182,8 +181,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
}

type processPlacementContext struct {
context.Context

// whether the local node is in the object container
localNodeInContainer bool

Expand All @@ -200,8 +197,8 @@ type processPlacementContext struct {
checkedNodes *nodeCache
}

func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address)
func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(plc.object.Address)

p.cfg.RLock()
headTimeout := p.headTimeout
Expand All @@ -216,7 +213,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node
// prevent spam with new replicas.
// However, additional copies should not be removed in this case,
// because we can remove the only copy this way.
ctx.checkedNodes.submitReplicaHolder(node)
plc.checkedNodes.submitReplicaHolder(node)
shortage--
uncheckedCopies++

Expand All @@ -225,7 +222,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node
)
}

if ctx.object.Type == object.TypeLock || ctx.object.Type == object.TypeLink {
if plc.object.Type == object.TypeLock || plc.object.Type == object.TypeLink {
// all nodes of a container must store the `LOCK` and `LINK` objects
// for correct object relations handling:
// - `LINK` objects allows treating all children as root object;
Expand All @@ -234,7 +231,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node
shortage = uint32(len(nodes))
}

for i := 0; (!ctx.localNodeInContainer || shortage > 0) && i < len(nodes); i++ {
for i := 0; (!plc.localNodeInContainer || shortage > 0) && i < len(nodes); i++ {
select {
case <-ctx.Done():
return
Expand All @@ -243,20 +240,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node

isLocalNode := p.netmapKeys.IsLocalKey(nodes[i].PublicKey())

if !ctx.localNodeInContainer {
ctx.localNodeInContainer = isLocalNode
if !plc.localNodeInContainer {
plc.localNodeInContainer = isLocalNode
}

if shortage == 0 {
continue
} else if isLocalNode {
ctx.needLocalCopy = true
plc.needLocalCopy = true

shortage--
} else if nodes[i].IsMaintenance() {
handleMaintenance(nodes[i])
} else {
if status := ctx.checkedNodes.processStatus(nodes[i]); status >= 0 {
if status := plc.checkedNodes.processStatus(nodes[i]); status >= 0 {
if status == 0 {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
Expand All @@ -274,20 +271,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node
cancel()

if errors.Is(err, apistatus.ErrObjectNotFound) {
ctx.checkedNodes.submitReplicaCandidate(nodes[i])
plc.checkedNodes.submitReplicaCandidate(nodes[i])
continue
}

if errors.Is(err, apistatus.ErrNodeUnderMaintenance) {
handleMaintenance(nodes[i])
} else if err != nil {
p.log.Error("receive object header to check policy compliance",
zap.Stringer("object", ctx.object.Address),
zap.Stringer("object", plc.object.Address),
zap.String("error", err.Error()),
)
} else {
shortage--
ctx.checkedNodes.submitReplicaHolder(nodes[i])
plc.checkedNodes.submitReplicaHolder(nodes[i])
}
}

Expand All @@ -297,20 +294,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node

if shortage > 0 {
p.log.Debug("shortage of object copies detected",
zap.Stringer("object", ctx.object.Address),
zap.Stringer("object", plc.object.Address),
zap.Uint32("shortage", shortage),
)

var task replicator.Task
task.SetObjectAddress(ctx.object.Address)
task.SetObjectAddress(plc.object.Address)
task.SetNodes(nodes)
task.SetCopiesNumber(shortage)

p.replicator.HandleTask(ctx, task, ctx.checkedNodes)
p.replicator.HandleTask(ctx, task, plc.checkedNodes)
} else if uncheckedCopies > 0 {
// If we have more copies than needed, but some of them are from the maintenance nodes,
// save the local copy.
ctx.needLocalCopy = true
plc.needLocalCopy = true
p.log.Debug("some of the copies are stored on nodes under maintenance, save local copy",
zap.Int("count", uncheckedCopies))
}
Expand Down
Loading

0 comments on commit cfc1f43

Please sign in to comment.