diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go new file mode 100644 index 00000000000..18d6569db86 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -0,0 +1,484 @@ +package peapod + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io/fs" + "path/filepath" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/util/slice" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" + "github.com/nspcc-dev/neofs-node/pkg/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +type batch struct { + initErr error + + tx *bbolt.Tx + + nonIdle bool + + commitErr error + chCommitted chan struct{} + + bktRootMtx sync.Mutex + bktRoot *bbolt.Bucket +} + +// Peapod provides storage for relatively small NeoFS binary object (peas). +// Peapod is a single low-level key/value database optimized to work with big +// number of stored units. +type Peapod struct { + path string + perm fs.FileMode + + compress *compression.Config + + readOnly bool + + bolt *bbolt.DB + + currentBatchMtx sync.RWMutex + currentBatch *batch + + chClose chan struct{} + chFlushDone chan struct{} +} + +var rootBucket = []byte("root") + +// returned when BoltDB rootBucket is inaccessible within particular transaction. +var errMissingRootBucket = errors.New("missing root bucket") + +// New creates new Peapod instance to be located at the given path with +// specified permissions. +func New(path string, perm fs.FileMode) *Peapod { + return &Peapod{ + path: path, + perm: perm, + } +} + +func (x *Peapod) flushLoop() { + defer close(x.chFlushDone) + + const flushInterval = 10 * time.Millisecond + t := time.NewTimer(flushInterval) + defer t.Stop() + + for { + select { + case <-x.chClose: + // commit current transaction to prevent bbolt.DB.Close blocking + x.flushCurrentBatch(false) + return + case <-t.C: + st := time.Now() + + x.flushCurrentBatch(true) + + interval := flushInterval - time.Since(st) + if interval <= 0 { + interval = time.Microsecond + } + + t.Reset(interval) + } + } +} + +func (x *Peapod) flushCurrentBatch(beginNew bool) { + x.currentBatchMtx.Lock() + + if !x.currentBatch.nonIdle { + if !beginNew && x.currentBatch.tx != nil { + _ = x.currentBatch.tx.Commit() + } + x.currentBatchMtx.Unlock() + return + } + + err := x.currentBatch.tx.Commit() + if err != nil { + err = fmt.Errorf("commit BoltDB batch transaction: %w", err) + } + + x.currentBatch.commitErr = err + close(x.currentBatch.chCommitted) + + if beginNew { + x.beginNewBatch() + } + + x.currentBatchMtx.Unlock() +} + +func (x *Peapod) beginNewBatch() { + x.currentBatch = new(batch) + + x.currentBatch.tx, x.currentBatch.initErr = x.bolt.Begin(true) + if x.currentBatch.initErr != nil { + x.currentBatch.initErr = fmt.Errorf("begin new BoltDB writable transaction: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.bktRoot = x.currentBatch.tx.Bucket(rootBucket) + if x.currentBatch.bktRoot == nil { + x.currentBatch.initErr = errMissingRootBucket + return + } + + x.currentBatch.chCommitted = make(chan struct{}) +} + +const objectAddressKeySize = 2 * sha256.Size + +func keyForObject(addr oid.Address) []byte { + b := make([]byte, objectAddressKeySize) + addr.Container().Encode(b) + addr.Object().Encode(b[sha256.Size:]) + return b +} + +func decodeKeyForObject(addr *oid.Address, key []byte) error { + if len(key) != objectAddressKeySize { + return fmt.Errorf("invalid object address key size: %d instead of %d", len(key), objectAddressKeySize) + } + + var cnr cid.ID + var obj oid.ID + + err := cnr.Decode(key[:sha256.Size]) + if err != nil { + return fmt.Errorf("decode container ID: %w", err) + } + + err = obj.Decode(key[sha256.Size:]) + if err != nil { + return fmt.Errorf("decode object ID: %w", err) + } + + addr.SetContainer(cnr) + addr.SetObject(obj) + + return nil +} + +// Open opens underlying database in the specified mode. +func (x *Peapod) Open(readOnly bool) error { + err := util.MkdirAllX(filepath.Dir(x.path), x.perm) + if err != nil { + return fmt.Errorf("create directory '%s' for database: %w", x.path, err) + } + + x.bolt, err = bbolt.Open(x.path, x.perm, &bbolt.Options{ + ReadOnly: readOnly, + Timeout: 100 * time.Millisecond, // to handle flock + }) + if err != nil { + return fmt.Errorf("open BoltDB instance: %w", err) + } + + if readOnly { + err = x.bolt.View(func(tx *bbolt.Tx) error { + if tx.Bucket(rootBucket) == nil { + return errMissingRootBucket + } + return nil + }) + if err != nil { + return fmt.Errorf("check root bucket presence in BoltDB instance: %w", err) + } + } + + x.readOnly = readOnly + + return nil +} + +// Init initializes internal structure of the underlying database. +func (x *Peapod) Init() error { + if x.readOnly { + // no extra actions needed in read-only mode + return nil + } + + err := x.bolt.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(rootBucket) + return err + }) + if err != nil { + return fmt.Errorf("create root bucket in BoltDB instance: %w", err) + } + + x.chClose = make(chan struct{}) + x.chFlushDone = make(chan struct{}) + + x.beginNewBatch() + + go x.flushLoop() + + return nil +} + +// Close syncs data and closes the database. +func (x *Peapod) Close() error { + if !x.readOnly { + close(x.chClose) + <-x.chFlushDone + } + return x.bolt.Close() +} + +// Type is peapod storage type used in logs and configuration. +const Type = "peapod" + +func (x *Peapod) Type() string { + return Type +} + +func (x *Peapod) Path() string { + return x.path +} + +func (x *Peapod) SetCompressor(cc *compression.Config) { + x.compress = cc +} + +func (x *Peapod) SetReportErrorFunc(func(string, error)) { + // no-op like FSTree +} + +// Get reads data from the underlying database by the given object address. +// Returns apistatus.ErrObjectNotFound if object is missing in the Peapod. +func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) { + var data []byte + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + val := bktRoot.Get(keyForObject(prm.Address)) + if val == nil { + return apistatus.ErrObjectNotFound + } + + data = slice.Copy(val) + + return nil + }) + if err != nil { + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.GetRes{}, logicerr.Wrap(err) + } + return common.GetRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + // copy-paste from FSTree + data, err = x.compress.Decompress(data) + if err != nil { + return common.GetRes{}, fmt.Errorf("decompress data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, fmt.Errorf("decode object from binary: %w", err) + } + + return common.GetRes{Object: obj, RawData: data}, err +} + +// GetRange works like Get but reads specific payload range. +func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { + // copy-paste from FSTree + res, err := x.Get(common.GetPrm{Address: prm.Address}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := res.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{}) + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil +} + +// Exists checks presence of the object in the underlying database by the given +// address. +func (x *Peapod) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { + var res common.ExistsRes + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + res.Exists = bktRoot.Get(keyForObject(prm.Address)) != nil + + return nil + }) + if err != nil { + return common.ExistsRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return res, nil +} + +var storageID = []byte("peapod") + +// Put saves given data in the underlying database by specified object address. +// The data can be anything, but in practice a binary NeoFS object is expected. +// Operation is executed within provided context: if the context is done, Put +// returns its error (in this case data may be saved). +// +// Put returns common.ErrReadOnly if Peadpod is read-only. +func (x *Peapod) Put(prm common.PutPrm) (common.PutRes, error) { + if !prm.DontCompress { + prm.RawData = x.compress.Compress(prm.RawData) + } + + // Track https://github.com/nspcc-dev/neofs-node/issues/2480 + err := x.batch(context.TODO(), func(bktRoot *bbolt.Bucket) error { + return bktRoot.Put(keyForObject(prm.Address), prm.RawData) + }) + + return common.PutRes{ + StorageID: storageID, + }, err +} + +// Delete removes data associated with the given object address from the +// underlying database. Delete returns apistatus.ErrObjectNotFound if object is +// missing. +// +// Put returns common.ErrReadOnly if Peadpod is read-only. +func (x *Peapod) Delete(prm common.DeletePrm) (common.DeleteRes, error) { + // Track https://github.com/nspcc-dev/neofs-node/issues/2480 + err := x.batch(context.TODO(), func(bktRoot *bbolt.Bucket) error { + key := keyForObject(prm.Address) + if bktRoot.Get(key) == nil { + return apistatus.ErrObjectNotFound + } + + return bktRoot.Delete(key) + }) + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.DeleteRes{}, logicerr.Wrap(err) + } + + return common.DeleteRes{}, err +} + +func (x *Peapod) batch(ctx context.Context, fBktRoot func(bktRoot *bbolt.Bucket) error) error { + if x.readOnly { + return common.ErrReadOnly + } + + x.currentBatchMtx.RLock() + + currentBatch := x.currentBatch + + if currentBatch.initErr != nil { + x.currentBatchMtx.RUnlock() + return currentBatch.initErr + } + + // bbolt.Bucket.Put MUST NOT be called concurrently. This is not obvious from + // the docs, but panic occurs in practice + currentBatch.bktRootMtx.Lock() + err := fBktRoot(currentBatch.bktRoot) + currentBatch.bktRootMtx.Unlock() + if err != nil { + x.currentBatchMtx.RUnlock() + return fmt.Errorf("put object into BoltDB bucket for container: %w", err) + } + + currentBatch.nonIdle = true + + x.currentBatchMtx.RUnlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-currentBatch.chCommitted: + return currentBatch.commitErr + } +} + +// Iterate iterates over all objects stored in the underlying database and +// passes them into LazyHandler or Handler. Break on f's false return. +func (x *Peapod) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + var addr oid.Address + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + return bktRoot.ForEach(func(k, v []byte) error { + err := decodeKeyForObject(&addr, k) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + + return nil + } + + return fmt.Errorf("decode object address from bucket key: %w", err) + } + + v, err = x.compress.Decompress(v) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + + return nil + } + + return fmt.Errorf("decompress value for object '%s': %w", addr, err) + } + + if prm.LazyHandler != nil { + return prm.LazyHandler(addr, func() ([]byte, error) { + return v, nil + }) + } + + return prm.Handler(common.IterationElement{ + ObjectData: v, + Address: addr, + StorageID: storageID, + }) + }) + }) + if err != nil { + return common.IterateRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return common.IterateRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go new file mode 100644 index 00000000000..199825e2054 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -0,0 +1,299 @@ +package peapod_test + +import ( + "crypto/rand" + "fmt" + "path/filepath" + "sync" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/stretchr/testify/require" +) + +func TestGeneric(t *testing.T) { + newPath := func() string { + return filepath.Join(t.TempDir(), "peapod.db") + } + + blobstortest.TestAll(t, func(t *testing.T) common.Storage { + return peapod.New(newPath(), 0600) + }, 2048, 16*1024) + + t.Run("info", func(t *testing.T) { + path := newPath() + blobstortest.TestInfo(t, func(t *testing.T) common.Storage { + return peapod.New(path, 0600) + }, peapod.Type, path) + }) +} + +func TestControl(t *testing.T) { + blobstortest.TestControl(t, func(t *testing.T) common.Storage { + return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600) + }, 2048, 2048) +} + +func testPeapodPath(tb testing.TB) string { + return filepath.Join(tb.TempDir(), "peapod.db") +} + +func newTestPeapod(tb testing.TB) *peapod.Peapod { + ppd := _newTestPeapod(tb, testPeapodPath(tb), false) + tb.Cleanup(func() { _ = ppd.Close() }) + return ppd +} + +// creates new read-only peapod.Peapod with one stored object. +func newTestPeapodReadOnly(tb testing.TB) (*peapod.Peapod, oid.Address) { + path := testPeapodPath(tb) + + ppd := _newTestPeapod(tb, path, false) + addr := oidtest.Address() + + _, err := ppd.Put(common.PutPrm{ + Address: addr, + RawData: []byte("Hello, world!"), + DontCompress: false, + }) + require.NoError(tb, err) + require.NoError(tb, ppd.Close()) + + ppd = _newTestPeapod(tb, path, true) + + tb.Cleanup(func() { _ = ppd.Close() }) + + return ppd, addr +} + +func _newTestPeapod(tb testing.TB, path string, readOnly bool) *peapod.Peapod { + ppd := peapod.New(path, 0600) + require.NoError(tb, ppd.Open(readOnly)) + require.NoError(tb, ppd.Init()) + + return ppd +} + +func TestPeapod_Get(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + getPrm := common.GetPrm{Address: addr} + + _, err = ppd.Get(getPrm) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err := ppd.Get(getPrm) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) +} + +func TestPeapod_Exists(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + data := []byte("Hello, world!") + + existsPrm := common.ExistsPrm{ + Address: addr, + } + + res, err := ppd.Exists(existsPrm) + require.NoError(t, err) + require.False(t, res.Exists) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err = ppd.Exists(existsPrm) + require.NoError(t, err) + require.True(t, res.Exists) +} + +func TestPeapod_Iterate(t *testing.T) { + ppd := newTestPeapod(t) + + mSrc := map[oid.Address][]byte{ + oidtest.Address(): {1, 2, 3}, + oidtest.Address(): {4, 5, 6}, + oidtest.Address(): {7, 8, 9}, + } + + mDst := make(map[oid.Address][]byte) + + f := func(el common.IterationElement) error { + mDst[el.Address] = el.ObjectData + return nil + } + + iterPrm := common.IteratePrm{ + Handler: f, + } + + _, err := ppd.Iterate(iterPrm) + require.NoError(t, err) + require.Empty(t, mDst) + + for addr, data := range mSrc { + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + } + + _, err = ppd.Iterate(iterPrm) + require.NoError(t, err) + require.Equal(t, mSrc, mDst) +} + +func TestPeapod_Put(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err := ppd.Get(common.GetPrm{ + Address: addr, + }) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) + + t.Run("read-only", func(t *testing.T) { + ppd, _ := newTestPeapodReadOnly(t) + + _, err := ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.ErrorIs(t, err, common.ErrReadOnly) + }) +} + +func TestPeapod_Delete(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + _, err = ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + getPrm := common.GetPrm{ + Address: addr, + } + + res, err := ppd.Get(getPrm) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) + + _, err = ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.NoError(t, err) + + res, err = ppd.Get(getPrm) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + t.Run("read-only", func(t *testing.T) { + ppd, addr := newTestPeapodReadOnly(t) + + _, err := ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.ErrorIs(t, err, common.ErrReadOnly) + }) +} + +func benchmark(b *testing.B, ppd *peapod.Peapod, objSize uint64, nThreads int) { + data := make([]byte, objSize) + rand.Read(data) + + prm := common.PutPrm{ + RawData: data, + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + + for i := 0; i < nThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + prm := prm + prm.Address = oidtest.Address() + + _, err := ppd.Put(prm) + require.NoError(b, err) + }() + } + + wg.Wait() + } +} + +func BenchmarkPeapod_Put(b *testing.B) { + ppd := newTestPeapod(b) + + for _, tc := range []struct { + objSize uint64 + nThreads int + }{ + {1, 1}, + {1, 20}, + {1, 100}, + {1 << 10, 1}, + {1 << 10, 20}, + {1 << 10, 100}, + {100 << 10, 1}, + {100 << 10, 20}, + {100 << 10, 100}, + } { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + benchmark(b, ppd, tc.objSize, tc.nThreads) + }) + } +}