Skip to content

Commit

Permalink
node/meta: simplify big objects removal
Browse files Browse the repository at this point in the history
Children handling has been moved on the engine layer, it searches for links and
deletes all the children. There is no need to try it on a layer that even may
not have such objects at all (they can be moved, deleted and replicated, etc.).
Relates #2822.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Jun 28, 2024
1 parent 0fe4353 commit f39c4b2
Showing 1 changed file with 10 additions and 55 deletions.
65 changes: 10 additions & 55 deletions pkg/local_object_storage/metabase/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,6 @@ func (p *DeletePrm) SetAddresses(addrs ...oid.Address) {
p.addrs = addrs
}

type referenceNumber struct {
all, cur int

addr oid.Address

obj *objectSDK.Object
}

type referenceCounter map[string]*referenceNumber

// Delete removed object records from metabase indexes.
func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
db.modeMtx.RLock()
Expand Down Expand Up @@ -105,14 +95,13 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
// removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) {
refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch()

var rawDeleted uint64
var availableDeleted uint64

for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
removed, available, size, err := db.delete(tx, addrs[i], currEpoch)
if err != nil {
return 0, 0, err // maybe log and continue?
}
Expand Down Expand Up @@ -141,25 +130,15 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (ui
}
}

for _, refNum := range refCounter {
if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
if err != nil {
return rawDeleted, availableDeleted, err // maybe log and continue?
}
}
}

return rawDeleted, availableDeleted, nil
}

// delete removes object indexes from the metabase. Counts the references
// of the object that is being removed.
// delete removes object indexes from the metabase.
// The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter). The third return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) {
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, bool, uint64, error) {
key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key)
garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName)
Expand Down Expand Up @@ -191,22 +170,15 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter

// if object is an only link to a parent, then remove parent
if parent := obj.Parent(); parent != nil {
parAddr := object.AddressOf(parent)
sParAddr := addressKey(parAddr, key)
k := string(sParAddr)

nRef, ok := refCounter[k]
if !ok {
nRef = &referenceNumber{
all: parentLength(tx, parAddr),
addr: parAddr,
obj: parent,
}

refCounter[k] = nRef
if _, fullParent := parent.ID(); !fullParent {
// unfinished header from the first part
return false, false, 0, nil
}

nRef.cur++
err = db.deleteObject(tx, obj, true)
if err != nil {
return false, false, 0, fmt.Errorf("could not remove parent object: %w", err)
}
}

// remove object
Expand Down Expand Up @@ -241,23 +213,6 @@ func (db *DB) deleteObject(
return nil
}

// parentLength returns amount of available children from parentid index.
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
bucketName := make([]byte, bucketKeySize)

bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
if bkt == nil {
return 0
}

lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:])))
if err != nil {
return 0
}

return len(lst)
}

func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
bkt := tx.Bucket(item.name)
if bkt != nil {
Expand Down

0 comments on commit f39c4b2

Please sign in to comment.