Skip to content

Commit

Permalink
shard: drop more of context from local stores (#2946)
Browse files Browse the repository at this point in the history
Similar to edc447f, we can't have any
meaninful context here and current context checks are broken. Fixes
#1911, the last case of

Error in `cmd/neofs-node/config.go`: `Function
`Reload->Init->init->listenEvents` should pass the context parameter`.

goes away with this.
  • Loading branch information
carpawell authored Sep 23, 2024
2 parents a0447ce + 54494ac commit 8340d68
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 44 deletions.
14 changes: 3 additions & 11 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
"context"
"errors"

meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
Expand Down Expand Up @@ -324,7 +323,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
return locked, outErr
}

func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) {
func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) {
var prm InhumePrm
prm.MarkAsGarbage(addrs...)

Expand All @@ -334,17 +333,10 @@ func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Add
}
}

func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) {
func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(lockers)

select {
case <-ctx.Done():
e.log.Info("interrupt processing the expired locks by context")
return true
default:
return false
}
return false
})
}

Expand Down
1 change: 0 additions & 1 deletion pkg/local_object_storage/shard/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (s *Shard) Init() error {
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredObjects,
s.collectExpiredTombstones,
Expand Down
40 changes: 14 additions & 26 deletions pkg/local_object_storage/shard/gc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -40,13 +39,11 @@ func EventNewEpoch(e uint64) Event {
}
}

type eventHandler func(context.Context, Event)
type eventHandler func(Event)

type eventHandlers struct {
prevGroup sync.WaitGroup

cancelFunc context.CancelFunc

handlers []eventHandler
}

Expand Down Expand Up @@ -114,19 +111,15 @@ func (gc *gc) listenEvents() {
continue
}

v.cancelFunc()
v.prevGroup.Wait()

var ctx context.Context
ctx, v.cancelFunc = context.WithCancel(context.Background())

v.prevGroup.Add(len(v.handlers))

for i := range v.handlers {
h := v.handlers[i]

err := gc.workerPool.Submit(func() {
h(ctx, event)
h(event)
v.prevGroup.Done()
})
if err != nil {
Expand Down Expand Up @@ -220,13 +213,13 @@ func (s *Shard) removeGarbage() {
}
}

func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
func (s *Shard) collectExpiredObjects(e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

log.Debug("started expired objects handling")

expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
expired, err := s.getExpiredObjects(e.(newEpoch).epoch, func(typ object.Type) bool {
return typ != object.TypeLock
})
if err != nil || len(expired) == 0 {
Expand All @@ -238,12 +231,12 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {

log.Debug("collected expired objects", zap.Int("num", len(expired)))

s.expiredObjectsCallback(ctx, expired)
s.expiredObjectsCallback(expired)

log.Debug("finished expired objects handling")
}

func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) {
func (s *Shard) collectExpiredTombstones(e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

Expand All @@ -258,8 +251,8 @@ func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) {
log.Debug("finished expired tombstones handling", zap.Int("dropped marks", dropped))
}

func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
func (s *Shard) collectExpiredLocks(e Event) {
expired, err := s.getExpiredObjects(e.(newEpoch).epoch, func(typ object.Type) bool {
return typ == object.TypeLock
})
if err != nil || len(expired) == 0 {
Expand All @@ -269,10 +262,10 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
return
}

s.expiredLocksCallback(ctx, expired)
s.expiredLocksCallback(expired)
}

func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
func (s *Shard) getExpiredObjects(epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
s.m.RLock()
defer s.m.RUnlock()

Expand All @@ -283,20 +276,15 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu
var expired []oid.Address

err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
default:
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
})
if err != nil {
return nil, err
}
return expired, ctx.Err()
return expired, nil
}

// HandleExpiredLocks unlocks all objects which were locked by lockers.
Expand Down
5 changes: 2 additions & 3 deletions pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard_test

import (
"context"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -64,7 +63,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
shard.WithDeletedLockCallback(func(aa []oid.Address) {
sh.HandleDeletedLocks(aa)
}),
shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) {
shard.WithExpiredLocksCallback(func(aa []oid.Address) {
sh.HandleExpiredLocks(aa)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
Expand Down Expand Up @@ -205,7 +204,7 @@ func TestExpiration(t *testing.T) {
meta.WithEpochState(epochState{Value: math.MaxUint64 / 2}),
),
shard.WithExpiredObjectsCallback(
func(_ context.Context, addresses []oid.Address) {
func(addresses []oid.Address) {
var p shard.InhumePrm
p.MarkAsGarbage(addresses...)
_, err := sh.Inhume(p)
Expand Down
5 changes: 2 additions & 3 deletions pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -34,10 +33,10 @@ type Shard struct {
type Option func(*cfg)

// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
type ExpiredTombstonesCallback func([]meta.TombstonedObject)

// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, []oid.Address)
type ExpiredObjectsCallback func([]oid.Address)

// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func([]oid.Address)
Expand Down

0 comments on commit 8340d68

Please sign in to comment.