diff --git a/CHANGELOG.md b/CHANGELOG.md index 749517baea..a19f0e29e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Changelog for NeoFS Node ### Added ### Fixed +- Control service's Drop call does not clean metabase (#2822) ### Changed diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index b7250eb737..37c18f89d5 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -1,11 +1,8 @@ package engine import ( - "errors" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -58,102 +55,25 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { defer elapsed(e.metrics.AddDeleteDuration)() } - var locked struct { - is bool - err apistatus.ObjectLocked - } - var splitInfo *objectSDK.SplitInfo - - // Removal of a big object is done in multiple stages: - // 1. Remove the parent object. If it is locked or already removed, return immediately. - // 2. Otherwise, search for all objects with a particular SplitID and delete them too. - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { - var existsPrm shard.ExistsPrm - existsPrm.SetAddress(prm.addr) - - resExists, err := sh.Exists(existsPrm) - if err != nil { - if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { - return true - } - - var splitErr *objectSDK.SplitInfoError - if !errors.As(err, &splitErr) { - if !shard.IsErrNotFound(err) { - e.reportShardError(sh, "could not check object existence", err) - } - return false - } - splitInfo = splitErr.SplitInfo() - } else if !resExists.Exists() { - return false - } - - var shPrm shard.InhumePrm - shPrm.MarkAsGarbage(prm.addr) - if prm.forceRemoval { - shPrm.ForceRemoval() - } - - _, err = sh.Inhume(shPrm) + if !prm.forceRemoval { + locked, err := e.isLocked(prm.addr) if err != nil { - e.reportShardError(sh, "could not inhume object in shard", err) - - locked.is = errors.As(err, &locked.err) - - return locked.is - } - - // If a parent object is removed we should set GC mark on each shard. - return splitInfo == nil - }) - - if locked.is { - return DeleteRes{}, locked.err - } - - if splitInfo != nil { - if splitID := splitInfo.SplitID(); splitID != nil { - e.deleteChildren(prm.addr, prm.forceRemoval, *splitID) + e.log.Warn("deleting an object without full locking check", + zap.Error(err), + zap.Stringer("addr", prm.addr)) + } else if locked { + var lockedErr apistatus.ObjectLocked + return DeleteRes{}, lockedErr } } - return DeleteRes{}, nil -} - -func (e *StorageEngine) deleteChildren(addr oid.Address, force bool, splitID objectSDK.SplitID) { - var fs objectSDK.SearchFilters - fs.AddSplitIDFilter(objectSDK.MatchStringEqual, splitID) - - var selectPrm shard.SelectPrm - selectPrm.SetFilters(fs) - selectPrm.SetContainerID(addr.Container()) - var inhumePrm shard.InhumePrm - if force { + inhumePrm.MarkAsGarbage(prm.addr) + if prm.forceRemoval { inhumePrm.ForceRemoval() } - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - res, err := sh.Select(selectPrm) - if err != nil { - e.log.Warn("error during searching for object children", - zap.Stringer("addr", addr), - zap.String("error", err.Error())) - return false - } - - for _, addr := range res.AddressList() { - inhumePrm.MarkAsGarbage(addr) + _, err := e.inhumeAddr(prm.addr, inhumePrm) - _, err = sh.Inhume(inhumePrm) - if err != nil { - e.log.Debug("could not inhume object in shard", - zap.Stringer("addr", addr), - zap.String("err", err.Error())) - continue - } - } - return false - }) + return DeleteRes{}, err } diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index fed46acab3..24d350c7e4 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -11,6 +11,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. @@ -53,17 +54,9 @@ func (p *DeletePrm) SetAddresses(addrs ...oid.Address) { p.addrs = addrs } -type referenceNumber struct { - all, cur int - - addr oid.Address - - obj *objectSDK.Object -} - -type referenceCounter map[string]*referenceNumber - // Delete removed object records from metabase indexes. +// Does not stop on an error if there are more objects to handle requested; +// returns the first error appeared with a number of deleted objects wrapped. func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -105,16 +98,23 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { // removed number: objects that were available (without Tombstones, GCMarks // non-expired, etc.) func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) { - refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() var rawDeleted uint64 var availableDeleted uint64 + var errorCount int + var firstErr error for i := range addrs { - removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) + removed, available, size, err := db.delete(tx, addrs[i], currEpoch) if err != nil { - return 0, 0, err // maybe log and continue? + errorCount++ + db.log.Warn("failed to delete object", zap.Stringer("addr", addrs[i]), zap.Error(err)) + if firstErr == nil { + firstErr = fmt.Errorf("%s object delete fail: %w", addrs[i], err) + } + + continue } if removed { @@ -127,6 +127,12 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (ui } } + if firstErr != nil { + all := len(addrs) + success := all - errorCount + return 0, 0, fmt.Errorf("deleted %d out of %d objects, first error: %w", success, all, firstErr) + } + if rawDeleted > 0 { err := db.updateCounter(tx, phy, rawDeleted, false) if err != nil { @@ -141,25 +147,15 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (ui } } - for _, refNum := range refCounter { - if refNum.cur == refNum.all { - err := db.deleteObject(tx, refNum.obj, true) - if err != nil { - return rawDeleted, availableDeleted, err // maybe log and continue? - } - } - } - return rawDeleted, availableDeleted, nil } -// delete removes object indexes from the metabase. Counts the references -// of the object that is being removed. +// delete removes object indexes from the metabase. // The first return value indicates if an object has been removed. (removing a // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object // counter). The third return value is removed object payload size. -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) { +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, bool, uint64, error) { key := make([]byte, addressKeySize) addrKey := addressKey(addr, key) garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName) @@ -191,22 +187,15 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter // if object is an only link to a parent, then remove parent if parent := obj.Parent(); parent != nil { - parAddr := object.AddressOf(parent) - sParAddr := addressKey(parAddr, key) - k := string(sParAddr) - - nRef, ok := refCounter[k] - if !ok { - nRef = &referenceNumber{ - all: parentLength(tx, parAddr), - addr: parAddr, - obj: parent, - } - - refCounter[k] = nRef + if _, fullParent := parent.ID(); !fullParent { + // unfinished header from the first part + return false, false, 0, nil } - nRef.cur++ + err = db.deleteObject(tx, obj, true) + if err != nil { + return false, false, 0, fmt.Errorf("could not remove parent object: %w", err) + } } // remove object @@ -241,23 +230,6 @@ func (db *DB) deleteObject( return nil } -// parentLength returns amount of available children from parentid index. -func parentLength(tx *bbolt.Tx, addr oid.Address) int { - bucketName := make([]byte, bucketKeySize) - - bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:])) - if bkt == nil { - return 0 - } - - lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:]))) - if err != nil { - return 0 - } - - return len(lst) -} - func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) { bkt := tx.Bucket(item.name) if bkt != nil {