Skip to content

Commit

Permalink
Fix/drop control call (#2873)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider authored Jul 1, 2024
2 parents 0e4f7c9 + 932e7b0 commit 9a1c5e2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 148 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changelog for NeoFS Node
### Added

### Fixed
- Control service's Drop call does not clean metabase (#2822)

### Changed

Expand Down
104 changes: 12 additions & 92 deletions pkg/local_object_storage/engine/delete.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package engine

import (
"errors"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -58,102 +55,25 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
defer elapsed(e.metrics.AddDeleteDuration)()
}

var locked struct {
is bool
err apistatus.ObjectLocked
}
var splitInfo *objectSDK.SplitInfo

// Removal of a big object is done in multiple stages:
// 1. Remove the parent object. If it is locked or already removed, return immediately.
// 2. Otherwise, search for all objects with a particular SplitID and delete them too.
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr)

resExists, err := sh.Exists(existsPrm)
if err != nil {
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
return true
}

var splitErr *objectSDK.SplitInfoError
if !errors.As(err, &splitErr) {
if !shard.IsErrNotFound(err) {
e.reportShardError(sh, "could not check object existence", err)
}
return false
}
splitInfo = splitErr.SplitInfo()
} else if !resExists.Exists() {
return false
}

var shPrm shard.InhumePrm
shPrm.MarkAsGarbage(prm.addr)
if prm.forceRemoval {
shPrm.ForceRemoval()
}

_, err = sh.Inhume(shPrm)
if !prm.forceRemoval {
locked, err := e.isLocked(prm.addr)
if err != nil {
e.reportShardError(sh, "could not inhume object in shard", err)

locked.is = errors.As(err, &locked.err)

return locked.is
}

// If a parent object is removed we should set GC mark on each shard.
return splitInfo == nil
})

if locked.is {
return DeleteRes{}, locked.err
}

if splitInfo != nil {
if splitID := splitInfo.SplitID(); splitID != nil {
e.deleteChildren(prm.addr, prm.forceRemoval, *splitID)
e.log.Warn("deleting an object without full locking check",
zap.Error(err),
zap.Stringer("addr", prm.addr))
} else if locked {
var lockedErr apistatus.ObjectLocked
return DeleteRes{}, lockedErr
}
}

return DeleteRes{}, nil
}

func (e *StorageEngine) deleteChildren(addr oid.Address, force bool, splitID objectSDK.SplitID) {
var fs objectSDK.SearchFilters
fs.AddSplitIDFilter(objectSDK.MatchStringEqual, splitID)

var selectPrm shard.SelectPrm
selectPrm.SetFilters(fs)
selectPrm.SetContainerID(addr.Container())

var inhumePrm shard.InhumePrm
if force {
inhumePrm.MarkAsGarbage(prm.addr)
if prm.forceRemoval {
inhumePrm.ForceRemoval()
}

e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Select(selectPrm)
if err != nil {
e.log.Warn("error during searching for object children",
zap.Stringer("addr", addr),
zap.String("error", err.Error()))
return false
}

for _, addr := range res.AddressList() {
inhumePrm.MarkAsGarbage(addr)
_, err := e.inhumeAddr(prm.addr, inhumePrm)

_, err = sh.Inhume(inhumePrm)
if err != nil {
e.log.Debug("could not inhume object in shard",
zap.Stringer("addr", addr),
zap.String("err", err.Error()))
continue
}
}
return false
})
return DeleteRes{}, err
}
84 changes: 28 additions & 56 deletions pkg/local_object_storage/metabase/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)

// DeletePrm groups the parameters of Delete operation.
Expand Down Expand Up @@ -53,17 +54,9 @@ func (p *DeletePrm) SetAddresses(addrs ...oid.Address) {
p.addrs = addrs
}

type referenceNumber struct {
all, cur int

addr oid.Address

obj *objectSDK.Object
}

type referenceCounter map[string]*referenceNumber

// Delete removed object records from metabase indexes.
// Does not stop on an error if there are more objects to handle requested;
// returns the first error appeared with a number of deleted objects wrapped.
func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
Expand Down Expand Up @@ -105,16 +98,23 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
// removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) {
refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch()

var rawDeleted uint64
var availableDeleted uint64
var errorCount int
var firstErr error

for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
removed, available, size, err := db.delete(tx, addrs[i], currEpoch)
if err != nil {
return 0, 0, err // maybe log and continue?
errorCount++
db.log.Warn("failed to delete object", zap.Stringer("addr", addrs[i]), zap.Error(err))
if firstErr == nil {
firstErr = fmt.Errorf("%s object delete fail: %w", addrs[i], err)
}

continue
}

if removed {
Expand All @@ -127,6 +127,12 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (ui
}
}

if firstErr != nil {
all := len(addrs)
success := all - errorCount
return 0, 0, fmt.Errorf("deleted %d out of %d objects, first error: %w", success, all, firstErr)
}

if rawDeleted > 0 {
err := db.updateCounter(tx, phy, rawDeleted, false)
if err != nil {
Expand All @@ -141,25 +147,15 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (ui
}
}

for _, refNum := range refCounter {
if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
if err != nil {
return rawDeleted, availableDeleted, err // maybe log and continue?
}
}
}

return rawDeleted, availableDeleted, nil
}

// delete removes object indexes from the metabase. Counts the references
// of the object that is being removed.
// delete removes object indexes from the metabase.
// The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter). The third return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) {
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, bool, uint64, error) {
key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key)
garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName)
Expand Down Expand Up @@ -191,22 +187,15 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter

// if object is an only link to a parent, then remove parent
if parent := obj.Parent(); parent != nil {
parAddr := object.AddressOf(parent)
sParAddr := addressKey(parAddr, key)
k := string(sParAddr)

nRef, ok := refCounter[k]
if !ok {
nRef = &referenceNumber{
all: parentLength(tx, parAddr),
addr: parAddr,
obj: parent,
}

refCounter[k] = nRef
if _, fullParent := parent.ID(); !fullParent {
// unfinished header from the first part
return false, false, 0, nil
}

nRef.cur++
err = db.deleteObject(tx, obj, true)
if err != nil {
return false, false, 0, fmt.Errorf("could not remove parent object: %w", err)
}
}

// remove object
Expand Down Expand Up @@ -241,23 +230,6 @@ func (db *DB) deleteObject(
return nil
}

// parentLength returns amount of available children from parentid index.
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
bucketName := make([]byte, bucketKeySize)

bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
if bkt == nil {
return 0
}

lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:])))
if err != nil {
return 0
}

return len(lst)
}

func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
bkt := tx.Bucket(item.name)
if bkt != nil {
Expand Down

0 comments on commit 9a1c5e2

Please sign in to comment.