From edc447f530cd040ab4c2e79e06235b8271680183 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 7 Sep 2024 11:21:52 +0300 Subject: [PATCH 1/5] shard: drop context from DeletedLockCallback 1. There is a number of related contextcheck warnings: pkg/services/control/server/gc.go:50:29 contextcheck Function `Delete->Delete$1->delete->inhumeAddr->Inhume` should pass the context parameter pkg/local_object_storage/engine/inhume.go:329:20 contextcheck Function `Inhume->Inhume$1->inhume->inhumeAddr->Inhume` should pass the context parameter pkg/services/policer/check.go:95:44 contextcheck Function `Inhume->Inhume$1->inhume->inhumeAddr->Inhume` should pass the context parameter which tell us that the context chain is not preserved correctly, we're resorting to some background context in all cases. 2. context.Background() means that this context is totally useless and the relevant context-related logic is never triggered. 3. Consider we've passed a proper context here and iteration can really be stopped. This would mean we can end up in an inconsistent state since lock object will be gone, but some shards may still have not performed this cleanup. It's safer to just drop this context, this function must run to completion. It's local, no networking involved. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/inhume.go | 11 ++--------- pkg/local_object_storage/shard/gc_test.go | 2 +- pkg/local_object_storage/shard/inhume.go | 3 +-- pkg/local_object_storage/shard/lock_test.go | 3 +-- pkg/local_object_storage/shard/shard.go | 2 +- 5 files changed, 6 insertions(+), 15 deletions(-) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 54576af569..5e4eba5e7c 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -359,17 +359,10 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.A }) } -func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.Address) { +func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleDeletedLocks(lockers) - - select { - case <-ctx.Done(): - e.log.Info("interrupt processing the deleted locks by context") - return true - default: - return false - } + return false }) } diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 1892272404..798fb0bc63 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -58,7 +58,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) { meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epoch), ), - shard.WithDeletedLockCallback(func(_ context.Context, aa []oid.Address) { + shard.WithDeletedLockCallback(func(aa []oid.Address) { sh.HandleDeletedLocks(aa) }), shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) { diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 47ab912690..1c272cc0e9 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -1,7 +1,6 @@ package shard import ( - "context" "errors" "fmt" @@ -122,7 +121,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) { s.decObjectCounterBy(logical, res.AvailableInhumed()) if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 { - s.deletedLockCallBack(context.Background(), deletedLockObjs) + s.deletedLockCallBack(deletedLockObjs) } return InhumeRes{}, nil diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index 9491a56e64..bc5826a89b 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -1,7 +1,6 @@ package shard_test import ( - "context" "path/filepath" "testing" "time" @@ -45,7 +44,7 @@ func TestShard_Lock(t *testing.T) { meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{}), ), - shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { + shard.WithDeletedLockCallback(func(addresses []oid.Address) { sh.HandleDeletedLocks(addresses) }), } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 9d5e75dccf..ae6d27ab2a 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -42,7 +42,7 @@ type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) type ExpiredObjectsCallback func(context.Context, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. -type DeletedLockCallback func(context.Context, []oid.Address) +type DeletedLockCallback func([]oid.Address) // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { From 7393dbe7c208b98c6831df02e2fabb3039d1fb59 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 7 Sep 2024 11:35:55 +0300 Subject: [PATCH 2/5] tree: pass context to replication worker pkg/services/tree/replicator.go:97:3 contextcheck Function `replicationWorker->replicationWorker$1` should pass the context parameter Signed-off-by: Roman Khimov --- pkg/services/tree/replicator.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 3a70413f09..2bfffaae72 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -52,11 +52,13 @@ func (s *Service) localReplicationWorker() { } } -func (s *Service) replicationWorker() { +func (s *Service) replicationWorker(ctx context.Context) { for { select { case <-s.closeCh: return + case <-ctx.Done(): + return case task := <-s.replicationTasks: var lastErr error var lastAddr string @@ -64,13 +66,13 @@ func (s *Service) replicationWorker() { task.n.IterateNetworkEndpoints(func(addr string) bool { lastAddr = addr - c, err := s.cache.get(context.Background(), addr) + c, err := s.cache.get(ctx, addr) if err != nil { lastErr = fmt.Errorf("can't create client: %w", err) return false } - ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout) + ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) _, lastErr = c.Apply(ctx, task.req) cancel() @@ -94,7 +96,7 @@ func (s *Service) replicationWorker() { func (s *Service) replicateLoop(ctx context.Context) { for range s.replicatorWorkerCount { - go s.replicationWorker() + go s.replicationWorker(ctx) go s.localReplicationWorker() } defer func() { From b27f260132bd1860284433d3f396d3299ae75ec0 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 7 Sep 2024 11:53:24 +0300 Subject: [PATCH 3/5] policer: pass context explicitly Fixes contextcheck: pkg/services/policer/check.go:132:17 contextcheck Function `processNodes` should pass the context parameter Signed-off-by: Roman Khimov --- pkg/services/policer/check.go | 37 ++++++++++++++++------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 8d5f19d7af..e2eddc1845 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -117,7 +117,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add } c := &processPlacementContext{ - Context: ctx, object: addrWithType, checkedNodes: newNodeCache(), } @@ -129,7 +128,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add default: } - p.processNodes(c, nn[i], policy.ReplicaNumberByIndex(i)) + p.processNodes(ctx, c, nn[i], policy.ReplicaNumberByIndex(i)) } // if context is done, needLocalCopy might not be able to calculate @@ -182,8 +181,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add } type processPlacementContext struct { - context.Context - // whether the local node is in the object container localNodeInContainer bool @@ -200,8 +197,8 @@ type processPlacementContext struct { checkedNodes *nodeCache } -func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) { - prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address) +func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) { + prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(plc.object.Address) p.cfg.RLock() headTimeout := p.headTimeout @@ -216,7 +213,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node // prevent spam with new replicas. // However, additional copies should not be removed in this case, // because we can remove the only copy this way. - ctx.checkedNodes.submitReplicaHolder(node) + plc.checkedNodes.submitReplicaHolder(node) shortage-- uncheckedCopies++ @@ -225,7 +222,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node ) } - if ctx.object.Type == object.TypeLock || ctx.object.Type == object.TypeLink { + if plc.object.Type == object.TypeLock || plc.object.Type == object.TypeLink { // all nodes of a container must store the `LOCK` and `LINK` objects // for correct object relations handling: // - `LINK` objects allows treating all children as root object; @@ -234,7 +231,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node shortage = uint32(len(nodes)) } - for i := 0; (!ctx.localNodeInContainer || shortage > 0) && i < len(nodes); i++ { + for i := 0; (!plc.localNodeInContainer || shortage > 0) && i < len(nodes); i++ { select { case <-ctx.Done(): return @@ -243,20 +240,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node isLocalNode := p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) - if !ctx.localNodeInContainer { - ctx.localNodeInContainer = isLocalNode + if !plc.localNodeInContainer { + plc.localNodeInContainer = isLocalNode } if shortage == 0 { continue } else if isLocalNode { - ctx.needLocalCopy = true + plc.needLocalCopy = true shortage-- } else if nodes[i].IsMaintenance() { handleMaintenance(nodes[i]) } else { - if status := ctx.checkedNodes.processStatus(nodes[i]); status >= 0 { + if status := plc.checkedNodes.processStatus(nodes[i]); status >= 0 { if status == 0 { // node already contains replica, no need to replicate nodes = append(nodes[:i], nodes[i+1:]...) @@ -274,7 +271,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node cancel() if errors.Is(err, apistatus.ErrObjectNotFound) { - ctx.checkedNodes.submitReplicaCandidate(nodes[i]) + plc.checkedNodes.submitReplicaCandidate(nodes[i]) continue } @@ -282,12 +279,12 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node handleMaintenance(nodes[i]) } else if err != nil { p.log.Error("receive object header to check policy compliance", - zap.Stringer("object", ctx.object.Address), + zap.Stringer("object", plc.object.Address), zap.String("error", err.Error()), ) } else { shortage-- - ctx.checkedNodes.submitReplicaHolder(nodes[i]) + plc.checkedNodes.submitReplicaHolder(nodes[i]) } } @@ -297,20 +294,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node if shortage > 0 { p.log.Debug("shortage of object copies detected", - zap.Stringer("object", ctx.object.Address), + zap.Stringer("object", plc.object.Address), zap.Uint32("shortage", shortage), ) var task replicator.Task - task.SetObjectAddress(ctx.object.Address) + task.SetObjectAddress(plc.object.Address) task.SetNodes(nodes) task.SetCopiesNumber(shortage) - p.replicator.HandleTask(ctx, task, ctx.checkedNodes) + p.replicator.HandleTask(ctx, task, plc.checkedNodes) } else if uncheckedCopies > 0 { // If we have more copies than needed, but some of them are from the maintenance nodes, // save the local copy. - ctx.needLocalCopy = true + plc.needLocalCopy = true p.log.Debug("some of the copies are stored on nodes under maintenance, save local copy", zap.Int("count", uncheckedCopies)) } From 114f9d9b7afabb495187776fcba36944efc00512 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 7 Sep 2024 19:37:03 +0300 Subject: [PATCH 4/5] search: do not store context in struct, pass it over Linter: pkg/services/object/search/search.go:36:14 contextcheck Function `execute->analyzeStatus->executeOnContainer` should pass the context parameter This also fixes a bug in executeOnContainer() since internal calls to c.searchObjects should use the inner context, not outer one. Signed-off-by: Roman Khimov --- pkg/services/object/search/container.go | 6 +++--- pkg/services/object/search/exec.go | 8 -------- pkg/services/object/search/search.go | 13 ++++++------- pkg/services/object/search/search_test.go | 2 +- pkg/services/object/search/service.go | 4 +++- pkg/services/object/search/util.go | 5 +++-- 6 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index f719a8fab7..cead4fe789 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -10,7 +10,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeOnContainer() { +func (exec *execCtx) executeOnContainer(ectx context.Context) { if exec.isLocal() { exec.log.Debug("return result directly") return @@ -18,7 +18,7 @@ func (exec *execCtx) executeOnContainer() { exec.log.Debug("trying to execute in container...") - ctx, cancel := context.WithCancel(exec.context()) + ctx, cancel := context.WithCancel(ectx) defer cancel() mProcessedNodes := make(map[string]struct{}) @@ -85,7 +85,7 @@ func (exec *execCtx) executeOnContainer() { return } - ids, err := c.searchObjects(exec, info) + ids, err := c.searchObjects(ctx, exec, info) if err != nil { lg.Debug("remote operation failed", zap.String("error", err.Error())) diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 91f46dc878..2669458655 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -1,8 +1,6 @@ package searchsvc import ( - "context" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -17,8 +15,6 @@ type statusError struct { type execCtx struct { svc *Service - ctx context.Context - prm Prm statusError @@ -47,10 +43,6 @@ func (exec *execCtx) setLogger(l *zap.Logger) { ) } -func (exec execCtx) context() context.Context { - return exec.ctx -} - func (exec execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } diff --git a/pkg/services/object/search/search.go b/pkg/services/object/search/search.go index 8990104401..51742e0fb8 100644 --- a/pkg/services/object/search/search.go +++ b/pkg/services/object/search/search.go @@ -25,7 +25,6 @@ func (s *Service) Search(ctx context.Context, prm Prm) error { exec := &execCtx{ svc: s, - ctx: ctx, prm: prm, } @@ -33,21 +32,21 @@ func (s *Service) Search(ctx context.Context, prm Prm) error { exec.setLogger(s.log) - exec.execute() + exec.execute(ctx) return exec.statusError.err } -func (exec *execCtx) execute() { +func (exec *execCtx) execute(ctx context.Context) { exec.log.Debug("serving request...") // perform local operation exec.executeLocal() - exec.analyzeStatus(true) + exec.analyzeStatus(ctx, true) } -func (exec *execCtx) analyzeStatus(execCnr bool) { +func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { // analyze local result switch exec.status { default: @@ -59,8 +58,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { } if execCnr { - exec.executeOnContainer() - exec.analyzeStatus(false) + exec.executeOnContainer(ctx) + exec.analyzeStatus(ctx, false) } } diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 33dd74100c..14c42a19c3 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -115,7 +115,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) { return v.ids, v.err } -func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) { +func (c *testStorage) searchObjects(_ context.Context, exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) { v, ok := c.items[exec.containerID().EncodeToString()] if !ok { return nil, nil diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 6f4c5eb528..646bb70712 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,6 +1,8 @@ package searchsvc import ( + "context" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -23,7 +25,7 @@ type Option func(*cfg) type searchClient interface { // searchObjects searches objects on the specified node. // MUST NOT modify execCtx as it can be accessed concurrently. - searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error) + searchObjects(context.Context, *execCtx, client.NodeInfo) ([]oid.ID, error) } // Containers provides information about NeoFS containers necessary for the diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index f98570894c..b83526a846 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -1,6 +1,7 @@ package searchsvc import ( + "context" "sync" "github.com/nspcc-dev/neofs-node/pkg/core/client" @@ -68,7 +69,7 @@ func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, erro }, nil } -func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { +func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { if exec.prm.forwarder != nil { return exec.prm.forwarder(info, c.client) } @@ -89,7 +90,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi var prm internalclient.SearchObjectsPrm - prm.SetContext(exec.context()) + prm.SetContext(ctx) prm.SetClient(c.client) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) From d0199c15d67fd3072f858e818a4a2ab64a1fd36f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 7 Sep 2024 22:03:26 +0300 Subject: [PATCH 5/5] get: suppress contextcheck pkg/services/object/get/get.go:91:14 contextcheck Function `execute->analyzeStatus->assemble->processV2Split->processV2Link->getChild` should pass the context parameter Can be untangled, but we a lot of calls here joined by the same context, so I'd keep it as is for now. Signed-off-by: Roman Khimov --- pkg/services/object/get/get.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 43a1e9b6c1..ba7a27329b 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -88,7 +88,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st exec.setLogger(s.log) } - exec.execute() + exec.execute() //nolint:contextcheck // It is in fact passed via execCtx return exec.statusError }