Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/exchange object meta signatures #2936

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ Changelog for NeoFS Node
- It was impossible to specify memory amount as "1b" (one byte) in config, default was used instead (#2899)
- Container session token's lifetime was not checked (#2898)
- ACL checks for split objects could be forced by a node than might lack access (#2909)
- Do not search for tombstones when handling their expiration, use local indexes instead (#2929)

### Changed
- neofs-cli allows several objects deletion at a time (#2774)
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Search` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)
- Default max_traceable_blocks Morph setting lowered to 17280 from 2102400 (#2897)
- `ObjectService`'s `Get`/`Head`/`GetRange` RPC handlers cache up to 10K lists of per-object sorted container nodes (#2896)
- Metabase graveyard scheme (#2929)

### Updated
- neofs-contract dependency to 0.20.0 (#2872)
- NeoGo dependency to 0.106.3 (#2872)

### Updating from v0.42.1
It is required to resynchronize metabases due to changed metabase scheme; any
starts with old metabases will fail. See storage node's config documentation
for details.

## [0.42.1] - 2024-06-13

A tiny update that adds compatibility with the Neo N3 Domovoi hardfork.
Expand Down
3 changes: 2 additions & 1 deletion cmd/neofs-lens/internal/meta/list-graveyard.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
gravePrm.SetHandler(
func(tsObj meta.TombstonedObject) error {
cmd.Printf(
"Object: %s\nTS: %s\n",
"Object: %s\nTS: %s (TS expiration: %d)\n",

Check warning on line 29 in cmd/neofs-lens/internal/meta/list-graveyard.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-lens/internal/meta/list-graveyard.go#L29

Added line #L29 was not covered by tests
tsObj.Address().EncodeToString(),
tsObj.Tombstone().EncodeToString(),
tsObj.TombstoneExpiration(),

Check warning on line 32 in cmd/neofs-lens/internal/meta/list-graveyard.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-lens/internal/meta/list-graveyard.go#L32

Added line #L32 was not covered by tests
)

return nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@
return e.engine.IsLocked(address)
}

func (e storageEngine) Delete(tombstone oid.Address, toDelete []oid.ID) error {
func (e storageEngine) Delete(tombstone oid.Address, tombExpiration uint64, toDelete []oid.ID) error {

Check warning on line 518 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L518

Added line #L518 was not covered by tests
var prm engine.InhumePrm

addrs := make([]oid.Address, len(toDelete))
Expand All @@ -524,7 +524,7 @@
addrs[i].SetObject(toDelete[i])
}

prm.WithTarget(tombstone, addrs...)
prm.WithTombstone(tombstone, tombExpiration, addrs...)

Check warning on line 527 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L527

Added line #L527 was not covered by tests

_, err := e.engine.Inhume(prm)
return err
Expand Down
13 changes: 1 addition & 12 deletions cmd/neofs-node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone"
tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/panjf2000/ants/v2"
Expand Down Expand Up @@ -47,18 +45,9 @@
// service will be created later
c.cfgObject.getSvc = new(getsvc.Service)

var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)

tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)

var shardsAttached int
for _, optsWithMeta := range c.shardOpts() {
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
id, err := ls.AddShard(optsWithMeta.shOpts...)

Check warning on line 50 in cmd/neofs-node/storage.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/storage.go#L50

Added line #L50 was not covered by tests
if err != nil {
c.log.Error("failed to attach shard to engine", zap.Error(err))
} else {
Expand Down
29 changes: 9 additions & 20 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,30 @@ import (

// InhumePrm encapsulates parameters for inhume operation.
type InhumePrm struct {
tombstone *oid.Address
addrs []oid.Address
tombstone *oid.Address
tombExpiration uint64
addrs []oid.Address

forceRemoval bool
}

// InhumeRes encapsulates results of inhume operation.
type InhumeRes struct{}

// WithTarget sets a list of objects that should be inhumed and tombstone address
// WithTombstone sets a list of objects that should be inhumed and tombstone address
// as the reason for inhume operation.
//
// tombstone should not be nil, addr should not be empty.
// addrs should not be empty.
// Should not be called along with MarkAsGarbage.
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) {
func (p *InhumePrm) WithTombstone(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) {
p.addrs = addrs
p.tombstone = &tombstone
p.tombExpiration = tombExpiration
}

// MarkAsGarbage marks an object to be physically removed from local storage.
//
// Should not be called along with WithTarget.
// Should not be called along with WithTombstone.
func (p *InhumePrm) MarkAsGarbage(addrs ...oid.Address) {
p.addrs = addrs
p.tombstone = nil
Expand Down Expand Up @@ -95,7 +97,7 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) {
}

if prm.tombstone != nil {
shPrm.InhumeByTomb(*prm.tombstone, prm.addrs[i])
shPrm.InhumeByTomb(*prm.tombstone, prm.tombExpiration, prm.addrs[i])
} else {
shPrm.MarkAsGarbage(prm.addrs[i])
}
Expand Down Expand Up @@ -332,19 +334,6 @@ func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Add
}
}

func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredTombstones(addrs)

select {
case <-ctx.Done():
return true
default:
return false
}
})
}

func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(lockers)
Expand Down
4 changes: 2 additions & 2 deletions pkg/local_object_storage/engine/inhume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.NoError(t, err)

var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent))

_, err = e.Inhume(inhumePrm)
require.NoError(t, err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
require.NoError(t, err)

var inhumePrm InhumePrm
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent))

_, err = e.Inhume(inhumePrm)
require.NoError(t, err)
Expand Down
13 changes: 6 additions & 7 deletions pkg/local_object_storage/engine/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func TestLockUserScenario(t *testing.T) {

return pool
}),
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
})

t.Cleanup(func() {
Expand Down Expand Up @@ -108,7 +107,7 @@ func TestLockUserScenario(t *testing.T) {

// 3.
var inhumePrm InhumePrm
inhumePrm.WithTarget(tombAddr, objAddr)
inhumePrm.WithTombstone(tombAddr, 0, objAddr)

_, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
Expand All @@ -121,7 +120,7 @@ func TestLockUserScenario(t *testing.T) {
err = Put(e, tombObj)
require.NoError(t, err)

inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
inhumePrm.WithTombstone(tombForLockAddr, 0, lockerAddr)

_, err = e.Inhume(inhumePrm)
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
Expand All @@ -132,7 +131,7 @@ func TestLockUserScenario(t *testing.T) {
// delay for GC
time.Sleep(time.Second)

inhumePrm.WithTarget(tombAddr, objAddr)
inhumePrm.WithTombstone(tombAddr, 0, objAddr)

_, err = e.Inhume(inhumePrm)
require.NoError(t, err)
Expand Down Expand Up @@ -189,7 +188,7 @@ func TestLockExpiration(t *testing.T) {
require.NoError(t, err)

var inhumePrm InhumePrm
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj))
inhumePrm.WithTombstone(objecttest.Address(), 0, objectcore.AddressOf(obj))

_, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
Expand All @@ -202,7 +201,7 @@ func TestLockExpiration(t *testing.T) {
time.Sleep(time.Second)

// 4.
inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj))
inhumePrm.WithTombstone(objecttest.Address(), 0, objectcore.AddressOf(obj))

_, err = e.Inhume(inhumePrm)
require.NoError(t, err)
Expand Down Expand Up @@ -261,7 +260,7 @@ func TestLockForceRemoval(t *testing.T) {
_, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))

inhumePrm.WithTarget(objecttest.Address(), objectcore.AddressOf(obj))
inhumePrm.WithTombstone(objecttest.Address(), 0, objectcore.AddressOf(obj))

_, err = e.Inhume(inhumePrm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
Expand Down
1 change: 0 additions & 1 deletion pkg/local_object_storage/engine/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredObjectsCallback(e.processExpiredObjects),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground),
Expand Down
4 changes: 2 additions & 2 deletions pkg/local_object_storage/metabase/VERSION.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ This file describes changes between the metabase versions.

## Current

Numbers stand for a single byte value.
Numbers stand for a single byte value unless otherwise stated.
The lowest not used bucket index: 20.

### Primary buckets
- Graveyard bucket
- Name: `0`
- Key: object address
- Value: tombstone address
- Value: tombstone address + little-endian uint64 tombstone expiration epoch
- Garbage objects bucket
- Name: `1`
- Key: object address
Expand Down
4 changes: 2 additions & 2 deletions pkg/local_object_storage/metabase/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCounters(t *testing.T) {
}

var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address())
prm.SetTombstone(oidtest.Address(), 0)
prm.SetAddresses(inhumedObjs...)

res, err := db.Inhume(prm)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCounters(t *testing.T) {
}

var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address())
prm.SetTombstone(oidtest.Address(), 0)
prm.SetAddresses(inhumedObjs...)

_, err = db.Inhume(prm)
Expand Down
74 changes: 52 additions & 22 deletions pkg/local_object_storage/metabase/graveyard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"bytes"
"encoding/binary"
"errors"
"fmt"

Expand Down Expand Up @@ -74,8 +75,9 @@
// TombstonedObject represents descriptor of the
// object that has been covered with tombstone.
type TombstonedObject struct {
addr oid.Address
tomb oid.Address
addr oid.Address
tomb oid.Address
tombExpiration uint64
}

// Address returns tombstoned object address.
Expand All @@ -89,6 +91,13 @@
return g.tomb
}

// TombstoneExpiration returns tombstone's expiration. It can be zero if
// metabase version does not support expiration indexing or if TS does not
// expire.
func (g TombstonedObject) TombstoneExpiration() uint64 {
return g.tombExpiration

Check warning on line 98 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}

// TombstonedHandler is a TombstonedObject handling function.
type TombstonedHandler func(object TombstonedObject) error

Expand Down Expand Up @@ -219,46 +228,67 @@
}

func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
if err = decodeAddressFromKey(&res.addr, k); err != nil {
err = fmt.Errorf("decode tombstone target from key: %w", err)
} else if err = decodeAddressFromKey(&res.tomb, v); err != nil {
err = fmt.Errorf("decode tombstone address from value: %w", err)
err = decodeAddressFromKey(&res.addr, k)
if err != nil {
return res, fmt.Errorf("decode tombstone target from key: %w", err)

Check warning on line 233 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L233

Added line #L233 was not covered by tests
}

err = decodeAddressFromKey(&res.tomb, v[:addressKeySize])
if err != nil {
return res, fmt.Errorf("decode tombstone address from value: %w", err)

Check warning on line 238 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L238

Added line #L238 was not covered by tests
}

if len(v) == addressKeySize+8 {
res.tombExpiration = binary.LittleEndian.Uint64(v[addressKeySize:])
}

return
}

// DropGraves deletes tombstoned objects from the
// graveyard bucket.
//
// Returns any error appeared during deletion process.
func (db *DB) DropGraves(tss []TombstonedObject) error {
// DropExpiredTSMarks run through the graveyard and drops tombstone marks with
// tombstones whose expiration is _less_ than provided epoch.
// Returns number of marks dropped.
func (db *DB) DropExpiredTSMarks(epoch uint64) (int, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.NoMetabase() {
return ErrDegradedMode
return -1, ErrDegradedMode

Check warning on line 256 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L256

Added line #L256 was not covered by tests
} else if db.mode.ReadOnly() {
return ErrReadOnlyMode
return -1, ErrReadOnlyMode

Check warning on line 258 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L258

Added line #L258 was not covered by tests
}

buf := make([]byte, addressKeySize)
var counter int

return db.boltDB.Update(func(tx *bbolt.Tx) error {
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil {
return nil
}
c := bkt.Cursor()
k, v := c.First()

for _, ts := range tss {
err := bkt.Delete(addressKey(ts.Address(), buf))
if err != nil {
return err
for k != nil {
if binary.LittleEndian.Uint64(v[addressKeySize:]) < epoch {
err := c.Delete()
if err != nil {
return fmt.Errorf("cleared %d TS marks in %d epoch and got error: %w", counter, epoch, err)

Check warning on line 272 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L272

Added line #L272 was not covered by tests
}

counter++

// see https://github.com/etcd-io/bbolt/pull/614; there is not
// much we can do with such an unfixed behavior
k, v = c.Seek(k)
} else {
k, v = c.Next()
}
}

return nil
})
if err != nil {
return -1, fmt.Errorf("db call: %w", err)

Check warning on line 288 in pkg/local_object_storage/metabase/graveyard.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/metabase/graveyard.go#L288

Added line #L288 was not covered by tests
}

return counter, nil
}

// GetGarbage returns garbage according to the metabase state. Garbage includes
Expand Down
Loading
Loading