Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/drop control call #2873

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changelog for NeoFS Node
### Added

### Fixed
- Control service's Drop call does not clean metabase (#2822)

### Changed

Expand Down
104 changes: 12 additions & 92 deletions pkg/local_object_storage/engine/delete.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package engine

import (
"errors"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -58,102 +55,25 @@
defer elapsed(e.metrics.AddDeleteDuration)()
}

var locked struct {
is bool
err apistatus.ObjectLocked
}
var splitInfo *objectSDK.SplitInfo

// Removal of a big object is done in multiple stages:
// 1. Remove the parent object. If it is locked or already removed, return immediately.
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr)

resExists, err := sh.Exists(existsPrm)
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
return true
}

var splitErr *objectSDK.SplitInfoError
if !errors.As(err, &splitErr) {
if !shard.IsErrNotFound(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one differs from the two implementations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is the key of my suggestion: why do we differ this logic? we do the same things (and one may always have more or less num of bugs) when delete/inhume objects, why some of them should be collected (if an object is a big one) one way while the other should be handled differently? about this PR: i have improved and fixed a little Inhume some time ago and could not understand why i had do it again one more time when looking at Delete

e.reportShardError(sh, "could not check object existence", err)
}
return false
}
splitInfo = splitErr.SplitInfo()
} else if !resExists.Exists() {
return false
}

var shPrm shard.InhumePrm
shPrm.MarkAsGarbage(prm.addr)
if prm.forceRemoval {
shPrm.ForceRemoval()
}

_, err = sh.Inhume(shPrm)
if !prm.forceRemoval {
locked, err := e.isLocked(prm.addr)

Check warning on line 59 in pkg/local_object_storage/engine/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/delete.go#L59

Added line #L59 was not covered by tests
if err != nil {
e.reportShardError(sh, "could not inhume object in shard", err)

locked.is = errors.As(err, &locked.err)

return locked.is
}

// If a parent object is removed we should set GC mark on each shard.
return splitInfo == nil
})

if locked.is {
return DeleteRes{}, locked.err
}

if splitInfo != nil {
if splitID := splitInfo.SplitID(); splitID != nil {
e.deleteChildren(prm.addr, prm.forceRemoval, *splitID)
e.log.Warn("deleting an object without full locking check",
zap.Error(err),
zap.Stringer("addr", prm.addr))
} else if locked {
var lockedErr apistatus.ObjectLocked
return DeleteRes{}, lockedErr

Check warning on line 66 in pkg/local_object_storage/engine/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/delete.go#L61-L66

Added lines #L61 - L66 were not covered by tests
}
}

return DeleteRes{}, nil
}

func (e *StorageEngine) deleteChildren(addr oid.Address, force bool, splitID objectSDK.SplitID) {
var fs objectSDK.SearchFilters
fs.AddSplitIDFilter(objectSDK.MatchStringEqual, splitID)

var selectPrm shard.SelectPrm
selectPrm.SetFilters(fs)
selectPrm.SetContainerID(addr.Container())

var inhumePrm shard.InhumePrm
if force {
inhumePrm.MarkAsGarbage(prm.addr)
if prm.forceRemoval {
inhumePrm.ForceRemoval()
}

e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Select(selectPrm)
if err != nil {
e.log.Warn("error during searching for object children",
zap.Stringer("addr", addr),
zap.String("error", err.Error()))
return false
}

for _, addr := range res.AddressList() {
inhumePrm.MarkAsGarbage(addr)
_, err := e.inhumeAddr(prm.addr, inhumePrm)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was a bit more efficient previously with sorted shards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you think it is unsorted now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {

in inhumeAddr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean this? it is root object handling, there is no information about link and last objects that can be acquired via root object's ID

cthulhu-rider marked this conversation as resolved.
Show resolved Hide resolved

_, err = sh.Inhume(inhumePrm)
if err != nil {
e.log.Debug("could not inhume object in shard",
zap.Stringer("addr", addr),
zap.String("err", err.Error()))
continue
}
}
return false
})
return DeleteRes{}, err
}
84 changes: 28 additions & 56 deletions pkg/local_object_storage/metabase/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)

// DeletePrm groups the parameters of Delete operation.
Expand Down Expand Up @@ -53,17 +54,9 @@
p.addrs = addrs
}

type referenceNumber struct {
all, cur int

addr oid.Address

obj *objectSDK.Object
}

type referenceCounter map[string]*referenceNumber

// Delete removed object records from metabase indexes.
// Does not stop on an error if there are more objects to handle requested;
// returns the first error appeared with a number of deleted objects wrapped.
func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
Expand Down Expand Up @@ -105,16 +98,23 @@
// removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) {
refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch()

var rawDeleted uint64
var availableDeleted uint64
var errorCount int
var firstErr error

for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
removed, available, size, err := db.delete(tx, addrs[i], currEpoch)
if err != nil {
return 0, 0, err // maybe log and continue?
errorCount++
db.log.Warn("failed to delete object", zap.Stringer("addr", addrs[i]), zap.Error(err))
if firstErr == nil {
firstErr = fmt.Errorf("%s object delete fail: %w", addrs[i], err)

Check warning on line 114 in pkg/local_object_storage/metabase/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/delete.go#L111-L114

Added lines #L111 - L114 were not covered by tests
}

continue

Check warning on line 117 in pkg/local_object_storage/metabase/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/delete.go#L117

Added line #L117 was not covered by tests
}

if removed {
Expand All @@ -127,6 +127,12 @@
}
}

if firstErr != nil {
all := len(addrs)
success := all - errorCount
return 0, 0, fmt.Errorf("deleted %d out of %d objects, first error: %w", success, all, firstErr)

Check warning on line 133 in pkg/local_object_storage/metabase/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/delete.go#L131-L133

Added lines #L131 - L133 were not covered by tests
cthulhu-rider marked this conversation as resolved.
Show resolved Hide resolved
}

if rawDeleted > 0 {
err := db.updateCounter(tx, phy, rawDeleted, false)
if err != nil {
Expand All @@ -141,25 +147,15 @@
}
}

for _, refNum := range refCounter {
if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
if err != nil {
return rawDeleted, availableDeleted, err // maybe log and continue?
}
}
}

return rawDeleted, availableDeleted, nil
}

// delete removes object indexes from the metabase. Counts the references
// of the object that is being removed.
// delete removes object indexes from the metabase.
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
// The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter). The third return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) {
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, bool, uint64, error) {
key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key)
garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName)
Expand Down Expand Up @@ -191,22 +187,15 @@

// if object is an only link to a parent, then remove parent
if parent := obj.Parent(); parent != nil {
parAddr := object.AddressOf(parent)
sParAddr := addressKey(parAddr, key)
k := string(sParAddr)

nRef, ok := refCounter[k]
if !ok {
nRef = &referenceNumber{
all: parentLength(tx, parAddr),
addr: parAddr,
obj: parent,
}

refCounter[k] = nRef
if _, fullParent := parent.ID(); !fullParent {
// unfinished header from the first part
return false, false, 0, nil

Check warning on line 192 in pkg/local_object_storage/metabase/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/delete.go#L192

Added line #L192 was not covered by tests
}

nRef.cur++
err = db.deleteObject(tx, obj, true)
if err != nil {
return false, false, 0, fmt.Errorf("could not remove parent object: %w", err)

Check warning on line 197 in pkg/local_object_storage/metabase/delete.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/delete.go#L197

Added line #L197 was not covered by tests
}
}

// remove object
Expand Down Expand Up @@ -241,23 +230,6 @@
return nil
}

// parentLength returns amount of available children from parentid index.
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
bucketName := make([]byte, bucketKeySize)

bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
if bkt == nil {
return 0
}

lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:])))
if err != nil {
return 0
}

return len(lst)
}

func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
bkt := tx.Bucket(item.name)
if bkt != nil {
Expand Down
Loading