Skip to content

Commit

Permalink
gc: fix that expired objects are not deleted after locks expire
Browse files Browse the repository at this point in the history
The bug was that when processing expired locks, the objects that were
spited into pieces were not taken into account in any way.

Fix #3026.

Signed-off-by: Andrey Butusov <[email protected]>
  • Loading branch information
End-rey committed Jan 28, 2025
1 parent 1b04361 commit d725f92
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changelog for NeoFS Node
- Panic during shutdown if N3 client connection is lost (#3073)
- The first object of the split object is not deleted (#3089)
- The parent of the split object is not removed from the metabase (#3089)
- A split expired object is not deleted after the lock is removed or expired (#3089)

### Changed
- Number of cuncurrenly handled notifications from the chain was increased from 10 to 300 for IR (#3068)
Expand Down
43 changes: 41 additions & 2 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,40 @@ func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) {
}

func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) {
var unlocked, expired []oid.Address
for _, sh := range e.unsortedShards() {
sh.HandleExpiredLocks(lockers)
unlocked = sh.HandleExpiredLocks(lockers)
for _, sh2 := range e.unsortedShards() {
expired = append(expired, sh2.FilterExpired(unlocked)...)
}
}
expired = removeDuplicateAddresses(expired)
e.log.Debug("expired objects after locks expired",
zap.Stringers("addrs", expired),
zap.Stringers("locks", lockers))

err := e.inhume(expired, false, nil, 0)
if err != nil {
e.log.Warn("handling expired locks", zap.Error(err))

Check warning on line 281 in pkg/local_object_storage/engine/inhume.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/inhume.go#L281

Added line #L281 was not covered by tests
}
}

func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) {
var unlocked, expired []oid.Address
for _, sh := range e.unsortedShards() {
sh.HandleDeletedLocks(lockers)
unlocked = sh.HandleDeletedLocks(lockers)
for _, sh2 := range e.unsortedShards() {
expired = append(expired, sh2.FilterExpired(unlocked)...)
}
}
expired = removeDuplicateAddresses(expired)
e.log.Debug("expired objects after locks are deleted",
zap.Stringers("addrs", expired),
zap.Stringers("locks", lockers))

err := e.inhume(expired, false, nil, 0)
if err != nil {
e.log.Warn("handling deleted locks", zap.Error(err))

Check warning on line 300 in pkg/local_object_storage/engine/inhume.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/inhume.go#L300

Added line #L300 was not covered by tests
}
}

Expand Down Expand Up @@ -300,3 +326,16 @@ func oIDsToAddresses(cID cid.ID, oo []oid.ID) []oid.Address {

return res
}

func removeDuplicateAddresses(addrs []oid.Address) []oid.Address {
uniqueMap := make(map[oid.Address]struct{})
res := make([]oid.Address, 0, len(addrs))

for _, addr := range addrs {
if _, ok := uniqueMap[addr]; !ok {
uniqueMap[addr] = struct{}{}
res = append(res, addr)
}

Check warning on line 338 in pkg/local_object_storage/engine/inhume.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/inhume.go#L335-L338

Added lines #L335 - L338 were not covered by tests
}
return res
}
63 changes: 24 additions & 39 deletions pkg/local_object_storage/shard/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,11 @@ func (s *Shard) getExpiredObjects(epoch uint64, typeCond func(object.Type) bool)
}

// HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage. Also, marks as
// garbage every object that becomes free-to-remove and just removed
// lock object is the only reason for that object to be alive (e.g.
// expired but locked objects).
func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
// If successful, marks lockers themselves as garbage.
// Returns every object that is unlocked.
func (s *Shard) HandleExpiredLocks(lockers []oid.Address) []oid.Address {
if s.GetMode().NoMetabase() {
return
return nil

Check warning on line 291 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L291

Added line #L291 was not covered by tests
}

unlocked, err := s.metaBase.FreeLockedBy(lockers)
Expand All @@ -299,37 +297,42 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
zap.Error(err),
)

return
return nil

Check warning on line 300 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L300

Added line #L300 was not covered by tests
}

expired, err := s.metaBase.FilterExpired(unlocked)
inhumed, _, err := s.metaBase.MarkGarbage(true, false, lockers...)
if err != nil {
s.log.Warn("expired object filtering",
s.log.Warn("failure to mark lockers as garbage",

Check warning on line 305 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L305

Added line #L305 was not covered by tests
zap.Error(err),
)

return
return nil

Check warning on line 309 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L309

Added line #L309 was not covered by tests
}

inhumed, _, err := s.metaBase.MarkGarbage(true, false, append(lockers, expired...)...)
s.decObjectCounterBy(logical, inhumed)

return unlocked
}

// FilterExpired filters expired objects by address through the metabase and returns them.
func (s *Shard) FilterExpired(addrs []oid.Address) []oid.Address {
expired, err := s.metaBase.FilterExpired(addrs)
if err != nil {
s.log.Warn("failure to mark lockers as garbage",
s.log.Warn("expired object filtering",

Check warning on line 321 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L321

Added line #L321 was not covered by tests
zap.Error(err),
)

return
return nil

Check warning on line 325 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L325

Added line #L325 was not covered by tests
}

s.decObjectCounterBy(logical, inhumed)
return expired
}

// HandleDeletedLocks unlocks all objects which were locked by lockers.
// Also, marks as garbage every object that becomes free-to-remove and
// just removed lock object is the only reason for that object to be
// alive (e.g. expired but locked objects).
func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
// Returns every object that is unlocked.
func (s *Shard) HandleDeletedLocks(lockers []oid.Address) []oid.Address {
if s.GetMode().NoMetabase() {
return
return nil

Check warning on line 335 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L335

Added line #L335 was not covered by tests
}

unlocked, err := s.metaBase.FreeLockedBy(lockers)
Expand All @@ -338,28 +341,10 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
zap.Error(err),
)

return
}

expired, err := s.metaBase.FilterExpired(unlocked)
if err != nil {
s.log.Warn("expired object filtering",
zap.Error(err),
)

return
}

inhumed, _, err := s.metaBase.MarkGarbage(false, false, expired...)
if err != nil {
s.log.Warn("failure to mark unlocked objects as garbage",
zap.Error(err),
)

return
return nil

Check warning on line 344 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L344

Added line #L344 was not covered by tests
}

s.decObjectCounterBy(logical, inhumed)
return unlocked
}

// NotificationChannel returns channel for shard events.
Expand Down
8 changes: 6 additions & 2 deletions pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(aa []oid.Address) {
sh.HandleDeletedLocks(aa)
unlocked := sh.HandleDeletedLocks(aa)
expired := sh.FilterExpired(unlocked)
require.NoError(t, sh.MarkGarbage(false, expired...))
}),
shard.WithExpiredLocksCallback(func(aa []oid.Address) {
sh.HandleExpiredLocks(aa)
unlocked := sh.HandleExpiredLocks(aa)
expired := sh.FilterExpired(unlocked)
require.NoError(t, sh.MarkGarbage(false, expired...))
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
Expand Down

0 comments on commit d725f92

Please sign in to comment.