diff --git a/CHANGELOG.md b/CHANGELOG.md index 16f95c8924..e143640b21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,11 @@ Changelog for NeoFS Node - More effective FSTree writer for HDDs, new configuration options for it (#2814) ### Fixed +- Do not search for tombstones when handling their expiration, use local indexes instead (#2929) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) +- Metabase graveyard scheme (#2929) ### Removed @@ -17,6 +19,9 @@ Changelog for NeoFS Node - Go to 1.22 version (#2517, #2738) ### Updating from v0.43.0 +Metabase version has been increased, auto migrating will be performed once +a v0.44.0 Storage Node is started with a v0.43.0 metabase. This action can +not be undone. No additional work should be done. ## [0.43.0] - 2024-08-20 - Jukdo diff --git a/cmd/neofs-lens/internal/meta/list-graveyard.go b/cmd/neofs-lens/internal/meta/list-graveyard.go index 09457dfb3f..a73833e62e 100644 --- a/cmd/neofs-lens/internal/meta/list-graveyard.go +++ b/cmd/neofs-lens/internal/meta/list-graveyard.go @@ -26,9 +26,10 @@ func listGraveyardFunc(cmd *cobra.Command, _ []string) { gravePrm.SetHandler( func(tsObj meta.TombstonedObject) error { cmd.Printf( - "Object: %s\nTS: %s\n", + "Object: %s\nTS: %s (TS expiration: %d)\n", tsObj.Address().EncodeToString(), tsObj.Tombstone().EncodeToString(), + tsObj.TombstoneExpiration(), ) return nil diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 4fd8b12790..5bd29d9973 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -515,7 +515,7 @@ func (e storageEngine) IsLocked(address oid.Address) (bool, error) { return e.engine.IsLocked(address) } -func (e storageEngine) Delete(tombstone oid.Address, toDelete []oid.ID) error { +func (e storageEngine) Delete(tombstone oid.Address, tombExpiration uint64, toDelete []oid.ID) error { var prm engine.InhumePrm addrs := make([]oid.Address, len(toDelete)) @@ -524,7 +524,7 @@ func (e storageEngine) Delete(tombstone oid.Address, toDelete []oid.ID) error { addrs[i].SetObject(toDelete[i]) } - prm.WithTarget(tombstone, addrs...) + prm.WithTombstone(tombstone, tombExpiration, addrs...) _, err := e.engine.Inhume(prm) return err diff --git a/cmd/neofs-node/storage.go b/cmd/neofs-node/storage.go index 13714fbfbb..eef9aff2b4 100644 --- a/cmd/neofs-node/storage.go +++ b/cmd/neofs-node/storage.go @@ -17,8 +17,6 @@ import ( containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone" - tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source" "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/panjf2000/ants/v2" @@ -47,18 +45,9 @@ func initLocalStorage(c *cfg) { // service will be created later c.cfgObject.getSvc = new(getsvc.Service) - var tssPrm tsourse.TombstoneSourcePrm - tssPrm.SetGetService(c.cfgObject.getSvc) - tombstoneSrc := tsourse.NewSource(tssPrm) - - tombstoneSource := tombstone.NewChecker( - tombstone.WithLogger(c.log), - tombstone.WithTombstoneSource(tombstoneSrc), - ) - var shardsAttached int for _, optsWithMeta := range c.shardOpts() { - id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) + id, err := ls.AddShard(optsWithMeta.shOpts...) if err != nil { c.log.Error("failed to attach shard to engine", zap.Error(err)) } else { diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 5e4eba5e7c..770c6b97f4 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -16,8 +16,9 @@ import ( // InhumePrm encapsulates parameters for inhume operation. type InhumePrm struct { - tombstone *oid.Address - addrs []oid.Address + tombstone *oid.Address + tombExpiration uint64 + addrs []oid.Address forceRemoval bool } @@ -25,19 +26,20 @@ type InhumePrm struct { // InhumeRes encapsulates results of inhume operation. type InhumeRes struct{} -// WithTarget sets a list of objects that should be inhumed and tombstone address +// WithTombstone sets a list of objects that should be inhumed and tombstone address // as the reason for inhume operation. // -// tombstone should not be nil, addr should not be empty. +// addrs should not be empty. // Should not be called along with MarkAsGarbage. -func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) { +func (p *InhumePrm) WithTombstone(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) { p.addrs = addrs p.tombstone = &tombstone + p.tombExpiration = tombExpiration } // MarkAsGarbage marks an object to be physically removed from local storage. // -// Should not be called along with WithTarget. +// Should not be called along with WithTombstone. func (p *InhumePrm) MarkAsGarbage(addrs ...oid.Address) { p.addrs = addrs p.tombstone = nil @@ -95,7 +97,7 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) { } if prm.tombstone != nil { - shPrm.InhumeByTomb(*prm.tombstone, prm.addrs[i]) + shPrm.InhumeByTomb(*prm.tombstone, prm.tombExpiration, prm.addrs[i]) } else { shPrm.MarkAsGarbage(prm.addrs[i]) } @@ -332,19 +334,6 @@ func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Add } } -func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - sh.HandleExpiredTombstones(addrs) - - select { - case <-ctx.Done(): - return true - default: - return false - } - }) -} - func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredLocks(lockers) diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index dc033ce743..b50e2a0d38 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -46,7 +46,7 @@ func TestStorageEngine_Inhume(t *testing.T) { require.NoError(t, err) var inhumePrm InhumePrm - inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) + inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent)) _, err = e.Inhume(inhumePrm) require.NoError(t, err) @@ -74,7 +74,7 @@ func TestStorageEngine_Inhume(t *testing.T) { require.NoError(t, err) var inhumePrm InhumePrm - inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent)) + inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent)) _, err = e.Inhume(inhumePrm) require.NoError(t, err) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index a2cf509290..f4b42ec6d7 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -51,7 +51,6 @@ func TestLockUserScenario(t *testing.T) { return pool }), - shard.WithTombstoneSource(tss{lockerExpiresAfter}), }) t.Cleanup(func() { @@ -108,7 +107,7 @@ func TestLockUserScenario(t *testing.T) { // 3. var inhumePrm InhumePrm - inhumePrm.WithTarget(tombAddr, objAddr) + inhumePrm.WithTombstone(tombAddr, 0, objAddr) _, err = e.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) @@ -121,7 +120,7 @@ func TestLockUserScenario(t *testing.T) { err = Put(e, tombObj) require.NoError(t, err) - inhumePrm.WithTarget(tombForLockAddr, lockerAddr) + inhumePrm.WithTombstone(tombForLockAddr, 0, lockerAddr) _, err = e.Inhume(inhumePrm) require.ErrorIs(t, err, meta.ErrLockObjectRemoval) @@ -132,7 +131,7 @@ func TestLockUserScenario(t *testing.T) { // delay for GC time.Sleep(time.Second) - inhumePrm.WithTarget(tombAddr, objAddr) + inhumePrm.WithTombstone(tombAddr, 0, objAddr) _, err = e.Inhume(inhumePrm) require.NoError(t, err) @@ -189,7 +188,7 @@ func TestLockExpiration(t *testing.T) { require.NoError(t, err) var inhumePrm InhumePrm - inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj)) + inhumePrm.WithTombstone(objecttest.Address(), 0, objectcore.AddressOf(obj)) _, err = e.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) @@ -202,7 +201,7 @@ func TestLockExpiration(t *testing.T) { time.Sleep(time.Second) // 4. - inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj)) + inhumePrm.WithTombstone(objecttest.Address(), 0, objectcore.AddressOf(obj)) _, err = e.Inhume(inhumePrm) require.NoError(t, err) @@ -261,7 +260,7 @@ func TestLockForceRemoval(t *testing.T) { _, err = e.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) - inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj)) + inhumePrm.WithTombstone(objecttest.Address(), 0, objectcore.AddressOf(obj)) _, err = e.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 8d07157c17..42254c4bf9 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -101,7 +101,6 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { sh := shard.New(append(opts, shard.WithID(id), shard.WithExpiredObjectsCallback(e.processExpiredObjects), - shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithReportErrorFunc(e.reportShardErrorBackground), diff --git a/pkg/local_object_storage/metabase/VERSION.md b/pkg/local_object_storage/metabase/VERSION.md index b80a735470..62ad0dc08b 100644 --- a/pkg/local_object_storage/metabase/VERSION.md +++ b/pkg/local_object_storage/metabase/VERSION.md @@ -4,14 +4,14 @@ This file describes changes between the metabase versions. ## Current -Numbers stand for a single byte value. +Numbers stand for a single byte value unless otherwise stated. The lowest not used bucket index: 20. ### Primary buckets - Graveyard bucket - Name: `0` - Key: object address - - Value: tombstone address + - Value: tombstone address + little-endian uint64 tombstone expiration epoch - Garbage objects bucket - Name: `1` - Key: object address diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 3600a820fe..e6fb3fb75e 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -113,7 +113,7 @@ func (db *DB) init(reset bool) error { var err error if !reset { // Normal open, check version and update if not initialized. - err := checkVersion(tx, db.initialized) + err := db.checkVersion(tx) if err != nil { return err } @@ -152,7 +152,7 @@ func (db *DB) init(reset bool) error { if err != nil { return err } - return updateVersion(tx, version) + return db.updateVersion(tx, version) }) } diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 75c7ad0f34..85d9273483 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -85,7 +85,7 @@ func TestCounters(t *testing.T) { } var prm meta.InhumePrm - prm.SetTombstoneAddress(oidtest.Address()) + prm.SetTombstone(oidtest.Address(), 0) prm.SetAddresses(inhumedObjs...) res, err := db.Inhume(prm) @@ -155,7 +155,7 @@ func TestCounters(t *testing.T) { } var prm meta.InhumePrm - prm.SetTombstoneAddress(oidtest.Address()) + prm.SetTombstone(oidtest.Address(), 0) prm.SetAddresses(inhumedObjs...) _, err = db.Inhume(prm) diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index 8ef329fe33..685fd95eaa 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -2,6 +2,7 @@ package meta import ( "bytes" + "encoding/binary" "errors" "fmt" @@ -74,8 +75,9 @@ func (db *DB) IterateOverGarbage(p GarbageIterationPrm) error { // TombstonedObject represents descriptor of the // object that has been covered with tombstone. type TombstonedObject struct { - addr oid.Address - tomb oid.Address + addr oid.Address + tomb oid.Address + tombExpiration uint64 } // Address returns tombstoned object address. @@ -89,6 +91,13 @@ func (g TombstonedObject) Tombstone() oid.Address { return g.tomb } +// TombstoneExpiration returns tombstone's expiration. It can be zero if +// metabase version does not support expiration indexing or if TS does not +// expire. +func (g TombstonedObject) TombstoneExpiration() uint64 { + return g.tombExpiration +} + // TombstonedHandler is a TombstonedObject handling function. type TombstonedHandler func(object TombstonedObject) error @@ -219,46 +228,67 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) { } func graveFromKV(k, v []byte) (res TombstonedObject, err error) { - if err = decodeAddressFromKey(&res.addr, k); err != nil { - err = fmt.Errorf("decode tombstone target from key: %w", err) - } else if err = decodeAddressFromKey(&res.tomb, v); err != nil { - err = fmt.Errorf("decode tombstone address from value: %w", err) + err = decodeAddressFromKey(&res.addr, k) + if err != nil { + return res, fmt.Errorf("decode tombstone target from key: %w", err) + } + + err = decodeAddressFromKey(&res.tomb, v[:addressKeySize]) + if err != nil { + return res, fmt.Errorf("decode tombstone address from value: %w", err) + } + + if len(v) == addressKeySize+8 { + res.tombExpiration = binary.LittleEndian.Uint64(v[addressKeySize:]) } return } -// DropGraves deletes tombstoned objects from the -// graveyard bucket. -// -// Returns any error appeared during deletion process. -func (db *DB) DropGraves(tss []TombstonedObject) error { +// DropExpiredTSMarks run through the graveyard and drops tombstone marks with +// tombstones whose expiration is _less_ than provided epoch. +// Returns number of marks dropped. +func (db *DB) DropExpiredTSMarks(epoch uint64) (int, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { - return ErrDegradedMode + return -1, ErrDegradedMode } else if db.mode.ReadOnly() { - return ErrReadOnlyMode + return -1, ErrReadOnlyMode } - buf := make([]byte, addressKeySize) + var counter int - return db.boltDB.Update(func(tx *bbolt.Tx) error { + err := db.boltDB.Update(func(tx *bbolt.Tx) error { bkt := tx.Bucket(graveyardBucketName) - if bkt == nil { - return nil - } + c := bkt.Cursor() + k, v := c.First() - for _, ts := range tss { - err := bkt.Delete(addressKey(ts.Address(), buf)) - if err != nil { - return err + for k != nil { + if binary.LittleEndian.Uint64(v[addressKeySize:]) < epoch { + err := c.Delete() + if err != nil { + return fmt.Errorf("cleared %d TS marks in %d epoch and got error: %w", counter, epoch, err) + } + + counter++ + + // see https://github.com/etcd-io/bbolt/pull/614; there is not + // much we can do with such an unfixed behavior + k, v = c.Seek(k) + } else { + k, v = c.Next() } } return nil }) + if err != nil { + return -1, fmt.Errorf("db call: %w", err) + } + + return counter, nil } // GetGarbage returns garbage according to the metabase state. Garbage includes diff --git a/pkg/local_object_storage/metabase/graveyard_test.go b/pkg/local_object_storage/metabase/graveyard_test.go index 747e512ac2..50ca87849a 100644 --- a/pkg/local_object_storage/metabase/graveyard_test.go +++ b/pkg/local_object_storage/metabase/graveyard_test.go @@ -1,11 +1,13 @@ package meta_test import ( + "math" "strconv" "testing" "github.com/nspcc-dev/neofs-node/pkg/core/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" objectsdk "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -138,7 +140,7 @@ func TestDB_IterateDeletedObjects(t *testing.T) { addrTombstone := oidtest.Address() inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) - inhumePrm.SetTombstoneAddress(addrTombstone) + inhumePrm.SetTombstone(addrTombstone, 0) _, err = db.Inhume(inhumePrm) require.NoError(t, err) @@ -225,7 +227,7 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) { inhumePrm.SetAddresses( object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)) - inhumePrm.SetTombstoneAddress(addrTombstone) + inhumePrm.SetTombstone(addrTombstone, 0) _, err = db.Inhume(inhumePrm) require.NoError(t, err) @@ -384,59 +386,6 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) { require.False(t, iWasCalled) } -func TestDB_DropGraves(t *testing.T) { - db := newDB(t) - - // generate and put 2 objects - obj1 := generateObject(t) - obj2 := generateObject(t) - - var err error - - err = putBig(db, obj1) - require.NoError(t, err) - - err = putBig(db, obj2) - require.NoError(t, err) - - // inhume with tombstone - addrTombstone := oidtest.Address() - - var inhumePrm meta.InhumePrm - inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) - inhumePrm.SetTombstoneAddress(addrTombstone) - - _, err = db.Inhume(inhumePrm) - require.NoError(t, err) - - buriedTS := make([]meta.TombstonedObject, 0) - var iterGravePRM meta.GraveyardIterationPrm - var counter int - iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error { - buriedTS = append(buriedTS, tomstoned) - counter++ - - return nil - }) - - err = db.IterateOverGraveyard(iterGravePRM) - require.NoError(t, err) - require.Equal(t, 2, counter) - - err = db.DropGraves(buriedTS) - require.NoError(t, err) - - counter = 0 - iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error { - counter++ - return nil - }) - - err = db.IterateOverGraveyard(iterGravePRM) - require.NoError(t, err) - require.Zero(t, counter) -} - func TestDB_GetGarbage(t *testing.T) { db := newDB(t) @@ -484,3 +433,55 @@ func TestDB_GetGarbage(t *testing.T) { require.Len(t, garbageContainers, 1) // but container can be deleted now require.Equal(t, garbageContainers[0], cID) } + +func TestDropExpiredTSMarks(t *testing.T) { + epoch := uint64(math.MaxUint64 / 2) + db := newDB(t) + droppedObjects := oidtest.Addresses(1024) + tombstone := oidtest.Address() + + var pInh meta.InhumePrm + pInh.SetTombstone(tombstone, epoch) + pInh.SetAddresses(droppedObjects[:len(droppedObjects)/2]...) + _, err := db.Inhume(pInh) + require.NoError(t, err) + + pInh.SetTombstone(tombstone, epoch+1) + pInh.SetAddresses(droppedObjects[len(droppedObjects)/2:]...) + _, err = db.Inhume(pInh) + require.NoError(t, err) + + for _, o := range droppedObjects { + var pGet meta.GetPrm + pGet.SetAddress(o) + _, err = db.Get(pGet) + require.ErrorIs(t, err, apistatus.ErrObjectAlreadyRemoved) + } + + res, err := db.DropExpiredTSMarks(epoch + 1) + require.NoError(t, err) + require.EqualValues(t, len(droppedObjects)/2, res) // first half with epoch + 1 expiration + + for i, o := range droppedObjects { + var pGet meta.GetPrm + pGet.SetAddress(o) + + _, err = db.Get(pGet) + if i < len(droppedObjects)/2 { + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + } else { + require.ErrorIs(t, err, apistatus.ErrObjectAlreadyRemoved) + } + } + + res, err = db.DropExpiredTSMarks(epoch + 2) + require.NoError(t, err) + require.EqualValues(t, len(droppedObjects)/2, res) // second half with epoch + 2 expiration + + for _, o := range droppedObjects { + var pGet meta.GetPrm + pGet.SetAddress(o) + _, err = db.Get(pGet) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + } +} diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 2f00e44079..fcb5f7bf13 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -2,6 +2,7 @@ package meta import ( "bytes" + "encoding/binary" "errors" "fmt" @@ -15,7 +16,8 @@ import ( // InhumePrm encapsulates parameters for Inhume operation. type InhumePrm struct { - tomb *oid.Address + tomb *oid.Address + tombExpiration uint64 target []oid.Address @@ -48,17 +50,19 @@ func (p *InhumePrm) SetAddresses(addrs ...oid.Address) { p.target = addrs } -// SetTombstoneAddress sets tombstone address as the reason for inhume operation. +// SetTombstone sets tombstone address as the reason for inhume operation +// and tombstone's expiration. // // addr should not be nil. // Should not be called along with SetGCMark. -func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) { +func (p *InhumePrm) SetTombstone(addr oid.Address, epoch uint64) { p.tomb = &addr + p.tombExpiration = epoch } // SetGCMark marks the object to be physically removed. // -// Should not be called along with SetTombstoneAddress. +// Should not be called along with SetTombstone. func (p *InhumePrm) SetGCMark() { p.tomb = nil } @@ -114,15 +118,15 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { // 2. Garbage if Inhume was called with a GC mark bkt *bbolt.Bucket // value that will be put in the bucket, one of the: - // 1. tombstone address if Inhume was called with - // a Tombstone + // 1. tombstone address + tomb expiration epoch if Inhume was called + // with a Tombstone // 2. zeroValue if Inhume was called with a GC mark value []byte ) if prm.tomb != nil { bkt = graveyardBKT - tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) + tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize+8)) // it is forbidden to have a tomb-on-tomb in NeoFS, // so graveyard keys must not be addresses of tombstones @@ -134,7 +138,7 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { } } - value = tombKey + value = binary.LittleEndian.AppendUint64(tombKey, prm.tombExpiration) } else { bkt = garbageObjectsBKT value = zeroValue @@ -187,10 +191,10 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { // iterate over graveyard and check if target address // is the address of tombstone in graveyard. - err = bkt.ForEach(func(k, v []byte) error { + err = graveyardBKT.ForEach(func(k, v []byte) error { // check if graveyard has record with key corresponding // to tombstone address (at least one) - targetIsTomb = bytes.Equal(v, targetKey) + targetIsTomb = bytes.Equal(v[:addressKeySize], targetKey) if targetIsTomb { // break bucket iterator diff --git a/pkg/local_object_storage/metabase/inhume_test.go b/pkg/local_object_storage/metabase/inhume_test.go index 3a38a21991..6c7f04da91 100644 --- a/pkg/local_object_storage/metabase/inhume_test.go +++ b/pkg/local_object_storage/metabase/inhume_test.go @@ -49,7 +49,7 @@ func TestInhumeTombOnTomb(t *testing.T) { ) inhumePrm.SetAddresses(addr1) - inhumePrm.SetTombstoneAddress(addr2) + inhumePrm.SetTombstone(addr2, 0) // inhume addr1 via addr2 _, err = db.Inhume(inhumePrm) @@ -62,7 +62,7 @@ func TestInhumeTombOnTomb(t *testing.T) { require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) inhumePrm.SetAddresses(addr3) - inhumePrm.SetTombstoneAddress(addr1) + inhumePrm.SetTombstone(addr1, 0) // try to inhume addr3 via addr1 _, err = db.Inhume(inhumePrm) @@ -82,7 +82,7 @@ func TestInhumeTombOnTomb(t *testing.T) { require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) inhumePrm.SetAddresses(addr1) - inhumePrm.SetTombstoneAddress(oidtest.Address()) + inhumePrm.SetTombstone(oidtest.Address(), 0) // try to inhume addr1 (which is already a tombstone in graveyard) _, err = db.Inhume(inhumePrm) @@ -162,7 +162,7 @@ func TestInhumeContainer(t *testing.T) { func metaInhume(db *meta.DB, target, tomb oid.Address) error { var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(target) - inhumePrm.SetTombstoneAddress(tomb) + inhumePrm.SetTombstone(tomb, 0) _, err := db.Inhume(inhumePrm) return err diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 120bef72a6..c158b00868 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -146,7 +146,7 @@ func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Addres err := bktGraveyard.ForEach(func(k, v []byte) error { var addr oid.Address - if err := decodeAddressFromKey(&addr, v); err != nil { + if err := decodeAddressFromKey(&addr, v[:addressKeySize]); err != nil { return err } if _, ok := tss[addr.EncodeToString()]; ok { diff --git a/pkg/local_object_storage/metabase/iterators_test.go b/pkg/local_object_storage/metabase/iterators_test.go index 2f0f8eb203..8c604aeee0 100644 --- a/pkg/local_object_storage/metabase/iterators_test.go +++ b/pkg/local_object_storage/metabase/iterators_test.go @@ -77,7 +77,7 @@ func TestDB_IterateCoveredByTombstones(t *testing.T) { var err error prm.SetAddresses(protected1, protected2, protectedLocked) - prm.SetTombstoneAddress(ts) + prm.SetTombstone(ts, 0) _, err = db.Inhume(prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/metabase/lock_test.go b/pkg/local_object_storage/metabase/lock_test.go index 2b09453afc..85d00ec3c5 100644 --- a/pkg/local_object_storage/metabase/lock_test.go +++ b/pkg/local_object_storage/metabase/lock_test.go @@ -67,7 +67,7 @@ func TestDB_Lock(t *testing.T) { _, err := db.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) - inhumePrm.SetTombstoneAddress(oidtest.Address()) + inhumePrm.SetTombstone(oidtest.Address(), 0) _, err = db.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) @@ -83,7 +83,7 @@ func TestDB_Lock(t *testing.T) { _, err = db.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) - inhumePrm.SetTombstoneAddress(oidtest.Address()) + inhumePrm.SetTombstone(oidtest.Address(), 0) _, err = db.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) }) diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index 23cd2c705a..4a64bbcb53 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -4,12 +4,13 @@ import ( "encoding/binary" "fmt" + objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" "go.etcd.io/bbolt" ) // version contains current metabase version. -const version = 2 +const version = 3 var versionKey = []byte("version") @@ -18,7 +19,7 @@ var versionKey = []byte("version") // the current code version. var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required") -func checkVersion(tx *bbolt.Tx, initialized bool) error { +func (db *DB) checkVersion(tx *bbolt.Tx) error { var knownVersion bool b := tx.Bucket(shardInfoBucket) @@ -29,14 +30,22 @@ func checkVersion(tx *bbolt.Tx, initialized bool) error { stored := binary.LittleEndian.Uint64(data) if stored != version { - return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored) + migrate, ok := migrateFrom[stored] + if !ok { + return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, version, stored) + } + + err := migrate(db, tx) + if err != nil { + return fmt.Errorf("migrating from %d to %d version failed, consider database resync: %w", stored, version, err) + } } } } - if !initialized { + if !db.initialized { // new database, write version - return updateVersion(tx, version) + return db.updateVersion(tx, version) } else if !knownVersion { // db is initialized but no version // has been found; that could happen @@ -49,7 +58,7 @@ func checkVersion(tx *bbolt.Tx, initialized bool) error { return nil } -func updateVersion(tx *bbolt.Tx, version uint64) error { +func (db *DB) updateVersion(tx *bbolt.Tx, version uint64) error { data := make([]byte, 8) binary.LittleEndian.PutUint64(data, version) @@ -71,3 +80,26 @@ func getVersion(tx *bbolt.Tx) uint64 { return 0 } + +var migrateFrom = map[uint64]func(*DB, *bbolt.Tx) error{ + 2: migrateFrom2Version, +} + +func migrateFrom2Version(db *DB, tx *bbolt.Tx) error { + tsExpiration := db.epochState.CurrentEpoch() + objectconfig.DefaultTombstoneLifetime + bkt := tx.Bucket(graveyardBucketName) + c := bkt.Cursor() + + for k, v := c.First(); k != nil; k, v = c.Next() { + newVal := make([]byte, addressKeySize, addressKeySize+8) + copy(newVal, v) + newVal = binary.LittleEndian.AppendUint64(newVal, tsExpiration) + + err := bkt.Put(k, newVal) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index 39eaec58b1..70c67e33c7 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -1,12 +1,20 @@ package meta import ( + "bytes" "encoding/binary" "errors" "fmt" + "os" + "path" "path/filepath" "testing" + objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" ) @@ -69,7 +77,7 @@ func TestVersion(t *testing.T) { db := newDB(t) require.NoError(t, db.Open(false)) require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { - return updateVersion(tx, version+1) + return db.updateVersion(tx, version+1) })) require.NoError(t, db.Close()) @@ -85,3 +93,236 @@ func TestVersion(t *testing.T) { }) }) } + +type inhumeV2Prm struct { + tomb *oid.Address + target []oid.Address + + lockObjectHandling bool + + forceRemoval bool +} + +func (db *DB) inhumeV2(prm inhumeV2Prm) (res InhumeRes, err error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return InhumeRes{}, ErrDegradedMode + } else if db.mode.ReadOnly() { + return InhumeRes{}, ErrReadOnlyMode + } + + currEpoch := db.epochState.CurrentEpoch() + var inhumed uint64 + + err = db.boltDB.Update(func(tx *bbolt.Tx) error { + garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName) + garbageContainersBKT := tx.Bucket(garbageContainersBucketName) + graveyardBKT := tx.Bucket(graveyardBucketName) + + var ( + // target bucket of the operation, one of the: + // 1. Graveyard if Inhume was called with a Tombstone + // 2. Garbage if Inhume was called with a GC mark + bkt *bbolt.Bucket + // value that will be put in the bucket, one of the: + // 1. tombstone address if Inhume was called with + // a Tombstone + // 2. zeroValue if Inhume was called with a GC mark + value []byte + ) + + if prm.tomb != nil { + bkt = graveyardBKT + tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) + + // it is forbidden to have a tomb-on-tomb in NeoFS, + // so graveyard keys must not be addresses of tombstones + data := bkt.Get(tombKey) + if data != nil { + err := bkt.Delete(tombKey) + if err != nil { + return fmt.Errorf("could not remove grave with tombstone key: %w", err) + } + } + + value = tombKey + } else { + bkt = garbageObjectsBKT + value = zeroValue + } + + buf := make([]byte, addressKeySize) + for i := range prm.target { + id := prm.target[i].Object() + cnr := prm.target[i].Container() + + // prevent locked objects to be inhumed + if !prm.forceRemoval && objectLocked(tx, cnr, id) { + return apistatus.ObjectLocked{} + } + + var lockWasChecked bool + + // prevent lock objects to be inhumed + // if `Inhume` was called not with the + // `WithForceGCMark` option + if !prm.forceRemoval { + if isLockObject(tx, cnr, id) { + return ErrLockObjectRemoval + } + + lockWasChecked = true + } + + obj, err := db.get(tx, prm.target[i], buf, false, true, currEpoch) + targetKey := addressKey(prm.target[i], buf) + if err == nil { + if inGraveyardWithKey(targetKey, graveyardBKT, garbageObjectsBKT, garbageContainersBKT) == 0 { + // object is available, decrement the + // logical counter + inhumed++ + } + + // if object is stored, and it is regular object then update bucket + // with container size estimations + if obj.Type() == object.TypeRegular { + err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) + if err != nil { + return err + } + } + } + + if prm.tomb != nil { + targetIsTomb := false + + // iterate over graveyard and check if target address + // is the address of tombstone in graveyard. + err = bkt.ForEach(func(k, v []byte) error { + // check if graveyard has record with key corresponding + // to tombstone address (at least one) + targetIsTomb = bytes.Equal(v, targetKey) + + if targetIsTomb { + // break bucket iterator + return errBreakBucketForEach + } + + return nil + }) + if err != nil && !errors.Is(err, errBreakBucketForEach) { + return err + } + + // do not add grave if target is a tombstone + if targetIsTomb { + continue + } + + // if tombstone appears object must be + // additionally marked with GC + err = garbageObjectsBKT.Put(targetKey, zeroValue) + if err != nil { + return err + } + } + + // consider checking if target is already in graveyard? + err = bkt.Put(targetKey, value) + if err != nil { + return err + } + + if prm.lockObjectHandling { + // do not perform lock check if + // it was already called + if lockWasChecked { + // inhumed object is not of + // the LOCK type + continue + } + + if isLockObject(tx, cnr, id) { + res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) + } + } + } + + return db.updateCounter(tx, logical, inhumed, false) + }) + + res.availableImhumed = inhumed + + return +} + +const testEpoch = 123 + +type epochState struct{} + +func (s epochState) CurrentEpoch() uint64 { + return testEpoch +} + +func newDB(t testing.TB, opts ...Option) *DB { + p := path.Join(t.TempDir(), "meta.db") + + bdb := New( + append([]Option{ + WithPath(p), + WithPermissions(0o600), + WithEpochState(epochState{}), + }, opts...)..., + ) + + require.NoError(t, bdb.Open(false)) + require.NoError(t, bdb.Init()) + + t.Cleanup(func() { + bdb.Close() + os.Remove(bdb.DumpInfo().Path) + }) + + return bdb +} + +func TestMigrate2to3(t *testing.T) { + expectedEpoch := uint64(testEpoch + objectconfig.DefaultTombstoneLifetime) + expectedEpochRaw := make([]byte, 8) + binary.LittleEndian.PutUint64(expectedEpochRaw, expectedEpoch) + + db := newDB(t) + + testObjs := oidtest.Addresses(1024) + tomb := oidtest.Address() + tombRaw := addressKey(tomb, make([]byte, addressKeySize)) + + _, err := db.inhumeV2(inhumeV2Prm{ + target: testObjs, + tomb: &tomb, + }) + require.NoError(t, err) + + err = db.boltDB.Update(func(tx *bbolt.Tx) error { + err = db.updateVersion(tx, 2) + if err != nil { + return err + } + + return migrateFrom2Version(db, tx) + }) + require.NoError(t, err) + + err = db.boltDB.View(func(tx *bbolt.Tx) error { + return tx.Bucket(graveyardBucketName).ForEach(func(k, v []byte) error { + require.Len(t, v, addressKeySize+8) + require.Equal(t, v[:addressKeySize], tombRaw) + require.Equal(t, v[addressKeySize:], expectedEpochRaw) + + return nil + }) + }) + require.NoError(t, err) +} diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 70390b70fe..32047ba1c4 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -433,7 +433,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M key, value = c.Prev() } - for i := range len(ms) { + for i := range ms { // Loop invariant: key represents the next stored timestamp after ms[i].Time. // 2. Insert the operation. diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index c144b66b70..e38d7c3d25 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -176,6 +176,10 @@ func (s *Shard) refillMetabase() error { switch obj.Type() { case objectSDK.TypeTombstone: tombstone := objectSDK.NewTombstone() + exp, err := object.Expiration(*obj) + if err != nil && !errors.Is(err, object.ErrNoExpiration) { + return fmt.Errorf("tombstone's expiration: %w", err) + } if err := tombstone.Unmarshal(obj.Payload()); err != nil { return fmt.Errorf("could not unmarshal tombstone content: %w", err) @@ -194,7 +198,7 @@ func (s *Shard) refillMetabase() error { var inhumePrm meta.InhumePrm - inhumePrm.SetTombstoneAddress(tombAddr) + inhumePrm.SetTombstone(tombAddr, exp) inhumePrm.SetAddresses(tombMembers...) _, err = s.metaBase.Inhume(inhumePrm) diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 9e0490c58b..ed2979f5c9 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -244,7 +244,7 @@ func TestRefillMetabase(t *testing.T) { require.NoError(t, sh.Lock(cnrLocked, lockID, locked)) var inhumePrm InhumePrm - inhumePrm.InhumeByTomb(object.AddressOf(&tombObj), tombMembers...) + inhumePrm.InhumeByTomb(object.AddressOf(&tombObj), 0, tombMembers...) _, err = sh.Inhume(inhumePrm) require.NoError(t, err) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 81b0daae43..e013bbd0ff 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -13,15 +13,6 @@ import ( "go.uber.org/zap" ) -// TombstoneSource is an interface that checks -// tombstone status in the NeoFS network. -type TombstoneSource interface { - // IsTombstoneAvailable must return boolean value that means - // provided tombstone's presence in the NeoFS network at the - // time of the passed epoch. - IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool -} - // Event represents class of external events. type Event interface { typ() eventType @@ -236,7 +227,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { log.Debug("started expired objects handling") expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { - return typ != object.TypeTombstone && typ != object.TypeLock + return typ != object.TypeLock }) if err != nil || len(expired) == 0 { if err != nil { @@ -252,69 +243,19 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { log.Debug("finished expired objects handling") } -func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { +func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) { epoch := e.(newEpoch).epoch log := s.log.With(zap.Uint64("epoch", epoch)) log.Debug("started expired tombstones handling") - const tssDeleteBatch = 50 - tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) - tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch) - - var iterPrm meta.GraveyardIterationPrm - iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error { - tss = append(tss, deletedObject) - - if len(tss) == tssDeleteBatch { - return meta.ErrInterruptIterator - } - - return nil - }) - - for { - log.Debug("iterating tombstones") - - s.m.RLock() - - if s.info.Mode.NoMetabase() { - s.log.Debug("shard is in a degraded mode, skip collecting expired tombstones") - s.m.RUnlock() - - return - } - - err := s.metaBase.IterateOverGraveyard(iterPrm) - if err != nil { - log.Error("iterator over graveyard failed", zap.Error(err)) - s.m.RUnlock() - - return - } - - s.m.RUnlock() - - tssLen := len(tss) - if tssLen == 0 { - break - } - - for _, ts := range tss { - if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) { - tssExp = append(tssExp, ts) - } - } - - log.Debug("handling expired tombstones batch", zap.Int("number", len(tssExp))) - s.expiredTombstonesCallback(ctx, tssExp) - - iterPrm.SetOffset(tss[tssLen-1].Address()) - tss = tss[:0] - tssExp = tssExp[:0] + dropped, err := s.metaBase.DropExpiredTSMarks(epoch) + if err != nil { + log.Error("cleaning graveyard up failed", zap.Error(err)) + return } - log.Debug("finished expired tombstones handling") + log.Debug("finished expired tombstones handling", zap.Int("dropped marks", dropped)) } func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { @@ -358,46 +299,6 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu return expired, ctx.Err() } -// HandleExpiredTombstones marks tombstones themselves as garbage -// and clears up corresponding graveyard records. -// -// Does not modify tss. -func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) { - if s.GetMode().NoMetabase() { - return - } - - // Mark tombstones as garbage. - var pInhume meta.InhumePrm - - tsAddrs := make([]oid.Address, 0, len(tss)) - for _, ts := range tss { - tsAddrs = append(tsAddrs, ts.Tombstone()) - } - - pInhume.SetGCMark() - pInhume.SetAddresses(tsAddrs...) - - // inhume tombstones - res, err := s.metaBase.Inhume(pInhume) - if err != nil { - s.log.Warn("could not mark tombstones as garbage", - zap.String("error", err.Error()), - ) - - return - } - - s.decObjectCounterBy(logical, res.AvailableInhumed()) - - // drop just processed expired tombstones - // from graveyard - err = s.metaBase.DropGraves(tss) - if err != nil { - s.log.Warn("could not drop expired grave records", zap.Error(err)) - } -} - // HandleExpiredLocks unlocks all objects which were locked by lockers. // If successful, marks lockers themselves as garbage. Also, marks as // garbage every object that becomes free-to-remove and just removed diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 798fb0bc63..8ff64e1a93 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "math" "path/filepath" + "strconv" "testing" "time" @@ -18,6 +20,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -181,3 +184,83 @@ func TestGC_ContainerCleanup(t *testing.T) { return len(res.Containers()) == 0 }, time.Second, 100*time.Millisecond) } + +func TestExpiration(t *testing.T) { + rootPath := t.TempDir() + var sh *shard.Shard + + opts := []shard.Option{ + shard.WithLogger(zap.NewNop()), + shard.WithBlobStorOptions( + blobstor.WithStorages([]blobstor.SubStorage{ + { + Storage: fstree.New( + fstree.WithPath(filepath.Join(rootPath, "blob"))), + Policy: func(_ *objectSDK.Object, _ []byte) bool { return true }, + }, + }), + ), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(rootPath, "meta")), + meta.WithEpochState(epochState{Value: math.MaxUint64 / 2}), + ), + shard.WithExpiredObjectsCallback( + func(_ context.Context, addresses []oid.Address) { + var p shard.InhumePrm + p.MarkAsGarbage(addresses...) + _, err := sh.Inhume(p) + require.NoError(t, err) + }, + ), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + + return pool + }), + } + + sh = shard.New(opts...) + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init()) + + t.Cleanup(func() { + releaseShard(sh, t) + }) + ch := sh.NotificationChannel() + + var expAttr objectSDK.Attribute + expAttr.SetKey(objectV2.SysAttributeExpEpoch) + + obj := generateObject(t) + + for i, typ := range []object.Type{object.TypeRegular, object.TypeTombstone, object.TypeLink, objectSDK.TypeStorageGroup} { + t.Run(fmt.Sprintf("type: %s", typ), func(t *testing.T) { + exp := uint64(i * 10) + + expAttr.SetValue(strconv.FormatUint(exp, 10)) + obj.SetAttributes(expAttr) + obj.SetType(typ) + require.NoError(t, obj.SetIDWithSignature(neofscryptotest.Signer())) + + var putPrm shard.PutPrm + putPrm.SetObject(obj) + + _, err := sh.Put(putPrm) + require.NoError(t, err) + + var getPrm shard.GetPrm + getPrm.SetAddress(objectCore.AddressOf(obj)) + + _, err = sh.Get(getPrm) + require.NoError(t, err) + + ch <- shard.EventNewEpoch(exp + 1) + + require.Eventually(t, func() bool { + _, err = sh.Get(getPrm) + return shard.IsErrNotFound(err) + }, 3*time.Second, 100*time.Millisecond, "lock expiration should free object removal") + }) + } +} diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 1c272cc0e9..9890e83338 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -12,9 +12,10 @@ import ( // InhumePrm encapsulates parameters for inhume operation. type InhumePrm struct { - target []oid.Address - tombstone *oid.Address - forceRemoval bool + target []oid.Address + tombstone *oid.Address + tombstoneExpiration uint64 + forceRemoval bool } // InhumeRes encapsulates results of inhume operation. @@ -25,10 +26,11 @@ type InhumeRes struct{} // // tombstone should not be nil, addr should not be empty. // Should not be called along with MarkAsGarbage. -func (p *InhumePrm) InhumeByTomb(tombstone oid.Address, addrs ...oid.Address) { +func (p *InhumePrm) InhumeByTomb(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) { if p != nil { p.target = addrs p.tombstone = &tombstone + p.tombstoneExpiration = tombExpiration } } @@ -39,6 +41,7 @@ func (p *InhumePrm) MarkAsGarbage(addr ...oid.Address) { if p != nil { p.target = addr p.tombstone = nil + p.tombstoneExpiration = 0 } } @@ -91,7 +94,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) { metaPrm.SetLockObjectHandling() if prm.tombstone != nil { - metaPrm.SetTombstoneAddress(*prm.tombstone) + metaPrm.SetTombstone(*prm.tombstone, prm.tombstoneExpiration) } else { metaPrm.SetGCMark() } diff --git a/pkg/local_object_storage/shard/inhume_test.go b/pkg/local_object_storage/shard/inhume_test.go index 011b0266fe..ab81c6355c 100644 --- a/pkg/local_object_storage/shard/inhume_test.go +++ b/pkg/local_object_storage/shard/inhume_test.go @@ -35,7 +35,7 @@ func testShardInhume(t *testing.T, hasWriteCache bool) { putPrm.SetObject(obj) var inhPrm shard.InhumePrm - inhPrm.InhumeByTomb(object.AddressOf(ts), object.AddressOf(obj)) + inhPrm.InhumeByTomb(object.AddressOf(ts), 0, object.AddressOf(obj)) var getPrm shard.GetPrm getPrm.SetAddress(object.AddressOf(obj)) diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index bc5826a89b..abf92d4226 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -86,7 +86,7 @@ func TestShard_Lock(t *testing.T) { ts := generateObjectWithCID(t, cnr) var inhumePrm shard.InhumePrm - inhumePrm.InhumeByTomb(objectcore.AddressOf(ts), objectcore.AddressOf(obj)) + inhumePrm.InhumeByTomb(objectcore.AddressOf(ts), 0, objectcore.AddressOf(obj)) _, err = sh.Inhume(inhumePrm) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) @@ -100,7 +100,7 @@ func TestShard_Lock(t *testing.T) { ts := generateObjectWithCID(t, cnr) var inhumePrm shard.InhumePrm - inhumePrm.InhumeByTomb(objectcore.AddressOf(ts), objectcore.AddressOf(lock)) + inhumePrm.InhumeByTomb(objectcore.AddressOf(ts), 0, objectcore.AddressOf(lock)) _, err = sh.Inhume(inhumePrm) require.Error(t, err) diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 7c4574b9a9..9b4b808046 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -145,7 +145,7 @@ func TestCounters(t *testing.T) { logic := mm.objectCounters[logical] inhumedNumber := int(phy / 4) - prm.InhumeByTomb(ts, addrFromObjs(oo[:inhumedNumber])...) + prm.InhumeByTomb(ts, 0, addrFromObjs(oo[:inhumedNumber])...) _, err := sh.Inhume(prm) require.NoError(t, err) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index ae6d27ab2a..7817947106 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -28,8 +28,6 @@ type Shard struct { pilorama pilorama.ForestStorage metaBase *meta.DB - - tsSource TombstoneSource } // Option represents Shard's constructor option. @@ -96,14 +94,10 @@ type cfg struct { expiredObjectsCallback ExpiredObjectsCallback - expiredTombstonesCallback ExpiredTombstonesCallback - expiredLocksCallback ExpiredObjectsCallback deletedLockCallBack DeletedLockCallback - tsSource TombstoneSource - metricsWriter MetricsWriter reportErrorFunc func(selfID string, message string, err error) @@ -133,7 +127,6 @@ func New(opts ...Option) *Shard { cfg: c, blobStor: bs, metaBase: mb, - tsSource: c.tsSource, } reportFunc := func(msg string, err error) { @@ -251,14 +244,6 @@ func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option { } } -// WithExpiredTombstonesCallback returns option to specify callback -// of the expired tombstones handler. -func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option { - return func(c *cfg) { - c.expiredTombstonesCallback = cb - } -} - // WithExpiredLocksCallback returns option to specify callback // of the expired LOCK objects handler. func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option { @@ -283,13 +268,6 @@ func WithMode(v mode.Mode) Option { } } -// WithTombstoneSource returns option to set TombstoneSource. -func WithTombstoneSource(v TombstoneSource) Option { - return func(c *cfg) { - c.tsSource = v - } -} - // WithDeletedLockCallback returns option to specify callback // of the deleted LOCK objects handler. func WithDeletedLockCallback(v DeletedLockCallback) Option { diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 79c53a4786..f45962e67b 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -224,7 +224,7 @@ func TestFlush(t *testing.T) { var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(objects[0].addr, objects[1].addr) - inhumePrm.SetTombstoneAddress(oidtest.Address()) + inhumePrm.SetTombstone(oidtest.Address(), 0) _, err := mb.Inhume(inhumePrm) require.NoError(t, err) diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index 755274e738..cb65806930 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -24,7 +24,7 @@ type ObjectStorage interface { Put(obj *object.Object, objBin []byte, hdrLen int) error // Delete must delete passed objects // and return any appeared error. - Delete(tombstone oid.Address, toDelete []oid.ID) error + Delete(tombstone oid.Address, tombExpiration uint64, toDelete []oid.ID) error // Lock must lock passed objects // and return any appeared error. Lock(locker oid.Address, toLock []oid.ID) error @@ -62,7 +62,12 @@ func (t *localTarget) Close() (oid.ID, *neofscrypto.Signature, error) { func putObjectLocally(storage ObjectStorage, obj *object.Object, meta objectCore.ContentMeta, enc *encodedObject) error { switch meta.Type() { case object.TypeTombstone: - err := storage.Delete(objectCore.AddressOf(obj), meta.Objects()) + exp, err := objectCore.Expiration(*obj) + if err != nil && !errors.Is(err, objectCore.ErrNoExpiration) { + return fmt.Errorf("reading tombstone expiration: %w", err) + } + + err = storage.Delete(objectCore.AddressOf(obj), exp, meta.Objects()) if err != nil { return fmt.Errorf("could not delete objects from tombstone locally: %w", err) } diff --git a/pkg/services/object_manager/tombstone/checker.go b/pkg/services/object_manager/tombstone/checker.go deleted file mode 100644 index 91b1478b2d..0000000000 --- a/pkg/services/object_manager/tombstone/checker.go +++ /dev/null @@ -1,92 +0,0 @@ -package tombstone - -import ( - "context" - "strconv" - - lru "github.com/hashicorp/golang-lru/v2" - "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" -) - -// Source is a tombstone source interface. -type Source interface { - // Tombstone must return tombstone from the source it was - // configured to fetch from and any error that appeared during - // fetching process. - // - // Tombstone MUST return (nil, nil) if requested tombstone is - // missing in the storage for the provided epoch. - Tombstone(ctx context.Context, a oid.Address, epoch uint64) (*object.Object, error) -} - -// ExpirationChecker is a tombstone source wrapper. -// It checks tombstones presence via tombstone -// source, caches it checks its expiration. -// -// Must be created via NewChecker function. `var` and -// `ExpirationChecker{}` declarations leads to undefined behaviour -// and may lead to panics. -type ExpirationChecker struct { - cache *lru.Cache[oid.Address, uint64] - - log *zap.Logger - - tsSource Source -} - -// IsTombstoneAvailable checks the tombstone presence in the system in the -// following order: -// - 1. Local LRU cache; -// - 2. Tombstone source. -// -// If a tombstone was successfully fetched (regardless of its expiration) -// it is cached in the LRU cache. -func (g *ExpirationChecker) IsTombstoneAvailable(ctx context.Context, a oid.Address, epoch uint64) bool { - addrStr := a.EncodeToString() - log := g.log.With(zap.String("address", addrStr)) - - expEpoch, ok := g.cache.Get(a) - if ok { - return expEpoch > epoch - } - - ts, err := g.tsSource.Tombstone(ctx, a, epoch) - if err != nil { - log.Warn( - "tombstone getter: could not get the tombstone the source", - zap.Error(err), - ) - } else { - if ts != nil { - return g.handleTS(a, ts, epoch) - } - } - - // requested tombstone not - // found in the NeoFS network - return false -} - -func (g *ExpirationChecker) handleTS(addr oid.Address, ts *object.Object, reqEpoch uint64) bool { - for _, atr := range ts.Attributes() { - if atr.Key() == object.AttributeExpirationEpoch { - epoch, err := strconv.ParseUint(atr.Value(), 10, 64) - if err != nil { - g.log.Warn( - "tombstone getter: could not parse tombstone expiration epoch", - zap.Error(err), - ) - - return false - } - - g.cache.Add(addr, epoch) - return epoch >= reqEpoch - } - } - - // unexpected tombstone without expiration epoch - return false -} diff --git a/pkg/services/object_manager/tombstone/constructor.go b/pkg/services/object_manager/tombstone/constructor.go deleted file mode 100644 index 6f876cf0af..0000000000 --- a/pkg/services/object_manager/tombstone/constructor.go +++ /dev/null @@ -1,85 +0,0 @@ -package tombstone - -import ( - "fmt" - - lru "github.com/hashicorp/golang-lru/v2" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" -) - -const defaultLRUCacheSize = 100 - -type cfg struct { - log *zap.Logger - - cacheSize int - - tsSource Source -} - -// Option is an option of ExpirationChecker's constructor. -type Option func(*cfg) - -func defaultCfg() *cfg { - return &cfg{ - log: zap.NewNop(), - cacheSize: defaultLRUCacheSize, - } -} - -// NewChecker creates, initializes and returns tombstone ExpirationChecker. -// The returned structure is ready to use. -// -// Panics if any of the provided options does not allow -// constructing a valid tombstone ExpirationChecker. -func NewChecker(oo ...Option) *ExpirationChecker { - cfg := defaultCfg() - - for _, o := range oo { - o(cfg) - } - - panicOnNil := func(v any, name string) { - if v == nil { - panic(fmt.Sprintf("tombstone getter constructor: %s is nil", name)) - } - } - - panicOnNil(cfg.tsSource, "Tombstone source") - - cache, err := lru.New[oid.Address, uint64](cfg.cacheSize) - if err != nil { - panic(fmt.Errorf("could not create LRU cache with %d size: %w", cfg.cacheSize, err)) - } - - return &ExpirationChecker{ - cache: cache, - log: cfg.log, - tsSource: cfg.tsSource, - } -} - -// WithLogger returns an option to specify -// logger. -func WithLogger(v *zap.Logger) Option { - return func(c *cfg) { - c.log = v - } -} - -// WithCacheSize returns an option to specify -// LRU cache size. -func WithCacheSize(v int) Option { - return func(c *cfg) { - c.cacheSize = v - } -} - -// WithTombstoneSource returns an option -// to specify tombstone source. -func WithTombstoneSource(v Source) Option { - return func(c *cfg) { - c.tsSource = v - } -} diff --git a/pkg/services/object_manager/tombstone/source/source.go b/pkg/services/object_manager/tombstone/source/source.go deleted file mode 100644 index 8444e6b60d..0000000000 --- a/pkg/services/object_manager/tombstone/source/source.go +++ /dev/null @@ -1,83 +0,0 @@ -package tsourse - -import ( - "context" - "errors" - "fmt" - - getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" -) - -// Source represents wrapper over the object service that -// allows checking if a tombstone is available in NeoFS -// network. -// -// Must be created via NewSource function. `var` and `Source{}` -// declarations leads to undefined behaviour and may lead -// to panics. -type Source struct { - s *getsvc.Service -} - -// TombstoneSourcePrm groups required parameters for Source creation. -type TombstoneSourcePrm struct { - s *getsvc.Service -} - -// SetGetService sets object service. -func (s *TombstoneSourcePrm) SetGetService(v *getsvc.Service) { - s.s = v -} - -// NewSource creates, initialize and returns local tombstone Source. -// The returned structure is ready to use. -// -// Panics if any of the provided options does not allow -// constructing a valid tombstone local Source. -func NewSource(p TombstoneSourcePrm) Source { - if p.s == nil { - panic("Tombstone source: nil object service") - } - - return Source(p) -} - -type headerWriter struct { - o *object.Object -} - -func (h *headerWriter) WriteHeader(o *object.Object) error { - h.o = o - return nil -} - -// Tombstone checks if the engine stores tombstone. -// Returns nil, nil if the tombstone has been removed -// or marked for removal. -func (s Source) Tombstone(ctx context.Context, a oid.Address, _ uint64) (*object.Object, error) { - var hr headerWriter - - var headPrm getsvc.HeadPrm - headPrm.WithAddress(a) - headPrm.SetHeaderWriter(&hr) - headPrm.SetCommonParameters(&util.CommonPrm{}) // default values are ok for that operation - - err := s.s.Head(ctx, headPrm) - switch { - case errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)): - return nil, nil - case err != nil: - return nil, fmt.Errorf("could not get tombstone from the source: %w", err) - default: - } - - if hr.o.Type() != object.TypeTombstone { - return nil, fmt.Errorf("returned %s object is not a tombstone", a) - } - - return hr.o, nil -}