From b28c1f29a07ec10083a46fe114eac275b9cdd599 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 2 Aug 2023 16:07:55 +0400 Subject: [PATCH 1/6] Introduce new storage component Peapod Currently, storage node saves relatively small NeoFS objects in Blobovnicza tree component: group of BoltDB wrappers managed as a tree. This component has pretty complex data structure, code implementation and dubious performance results. Peapod is a new storage component introduced to replace Blobovnicza one as more simple and effective. It also bases on single BoltDB instance, but organizes batch writes in a specific way. In future, Peapod is going to be used as a storage of small objects by the BlobStor. Refs #2453. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + .../blobstor/peapod/peapod.go | 484 ++++++++++++++++++ .../blobstor/peapod/peapod_test.go | 299 +++++++++++ 3 files changed, 784 insertions(+) create mode 100644 pkg/local_object_storage/blobstor/peapod/peapod.go create mode 100644 pkg/local_object_storage/blobstor/peapod/peapod_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 155d3d2959..c9e648fd58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Changelog for NeoFS Node - Stored payload metric per shard (#2023) - Histogram metrics for RPC and engine operations (#2351) - SN's version is announced via the attributes automatically but can be overwritten explicitly (#2455) +- New storage component for small objects named Peapod (#2453) ### Fixed - `neo-go` RPC connection loss handling (#1337) diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go new file mode 100644 index 0000000000..18d6569db8 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -0,0 +1,484 @@ +package peapod + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io/fs" + "path/filepath" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/util/slice" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" + "github.com/nspcc-dev/neofs-node/pkg/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +type batch struct { + initErr error + + tx *bbolt.Tx + + nonIdle bool + + commitErr error + chCommitted chan struct{} + + bktRootMtx sync.Mutex + bktRoot *bbolt.Bucket +} + +// Peapod provides storage for relatively small NeoFS binary object (peas). +// Peapod is a single low-level key/value database optimized to work with big +// number of stored units. +type Peapod struct { + path string + perm fs.FileMode + + compress *compression.Config + + readOnly bool + + bolt *bbolt.DB + + currentBatchMtx sync.RWMutex + currentBatch *batch + + chClose chan struct{} + chFlushDone chan struct{} +} + +var rootBucket = []byte("root") + +// returned when BoltDB rootBucket is inaccessible within particular transaction. +var errMissingRootBucket = errors.New("missing root bucket") + +// New creates new Peapod instance to be located at the given path with +// specified permissions. +func New(path string, perm fs.FileMode) *Peapod { + return &Peapod{ + path: path, + perm: perm, + } +} + +func (x *Peapod) flushLoop() { + defer close(x.chFlushDone) + + const flushInterval = 10 * time.Millisecond + t := time.NewTimer(flushInterval) + defer t.Stop() + + for { + select { + case <-x.chClose: + // commit current transaction to prevent bbolt.DB.Close blocking + x.flushCurrentBatch(false) + return + case <-t.C: + st := time.Now() + + x.flushCurrentBatch(true) + + interval := flushInterval - time.Since(st) + if interval <= 0 { + interval = time.Microsecond + } + + t.Reset(interval) + } + } +} + +func (x *Peapod) flushCurrentBatch(beginNew bool) { + x.currentBatchMtx.Lock() + + if !x.currentBatch.nonIdle { + if !beginNew && x.currentBatch.tx != nil { + _ = x.currentBatch.tx.Commit() + } + x.currentBatchMtx.Unlock() + return + } + + err := x.currentBatch.tx.Commit() + if err != nil { + err = fmt.Errorf("commit BoltDB batch transaction: %w", err) + } + + x.currentBatch.commitErr = err + close(x.currentBatch.chCommitted) + + if beginNew { + x.beginNewBatch() + } + + x.currentBatchMtx.Unlock() +} + +func (x *Peapod) beginNewBatch() { + x.currentBatch = new(batch) + + x.currentBatch.tx, x.currentBatch.initErr = x.bolt.Begin(true) + if x.currentBatch.initErr != nil { + x.currentBatch.initErr = fmt.Errorf("begin new BoltDB writable transaction: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.bktRoot = x.currentBatch.tx.Bucket(rootBucket) + if x.currentBatch.bktRoot == nil { + x.currentBatch.initErr = errMissingRootBucket + return + } + + x.currentBatch.chCommitted = make(chan struct{}) +} + +const objectAddressKeySize = 2 * sha256.Size + +func keyForObject(addr oid.Address) []byte { + b := make([]byte, objectAddressKeySize) + addr.Container().Encode(b) + addr.Object().Encode(b[sha256.Size:]) + return b +} + +func decodeKeyForObject(addr *oid.Address, key []byte) error { + if len(key) != objectAddressKeySize { + return fmt.Errorf("invalid object address key size: %d instead of %d", len(key), objectAddressKeySize) + } + + var cnr cid.ID + var obj oid.ID + + err := cnr.Decode(key[:sha256.Size]) + if err != nil { + return fmt.Errorf("decode container ID: %w", err) + } + + err = obj.Decode(key[sha256.Size:]) + if err != nil { + return fmt.Errorf("decode object ID: %w", err) + } + + addr.SetContainer(cnr) + addr.SetObject(obj) + + return nil +} + +// Open opens underlying database in the specified mode. +func (x *Peapod) Open(readOnly bool) error { + err := util.MkdirAllX(filepath.Dir(x.path), x.perm) + if err != nil { + return fmt.Errorf("create directory '%s' for database: %w", x.path, err) + } + + x.bolt, err = bbolt.Open(x.path, x.perm, &bbolt.Options{ + ReadOnly: readOnly, + Timeout: 100 * time.Millisecond, // to handle flock + }) + if err != nil { + return fmt.Errorf("open BoltDB instance: %w", err) + } + + if readOnly { + err = x.bolt.View(func(tx *bbolt.Tx) error { + if tx.Bucket(rootBucket) == nil { + return errMissingRootBucket + } + return nil + }) + if err != nil { + return fmt.Errorf("check root bucket presence in BoltDB instance: %w", err) + } + } + + x.readOnly = readOnly + + return nil +} + +// Init initializes internal structure of the underlying database. +func (x *Peapod) Init() error { + if x.readOnly { + // no extra actions needed in read-only mode + return nil + } + + err := x.bolt.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(rootBucket) + return err + }) + if err != nil { + return fmt.Errorf("create root bucket in BoltDB instance: %w", err) + } + + x.chClose = make(chan struct{}) + x.chFlushDone = make(chan struct{}) + + x.beginNewBatch() + + go x.flushLoop() + + return nil +} + +// Close syncs data and closes the database. +func (x *Peapod) Close() error { + if !x.readOnly { + close(x.chClose) + <-x.chFlushDone + } + return x.bolt.Close() +} + +// Type is peapod storage type used in logs and configuration. +const Type = "peapod" + +func (x *Peapod) Type() string { + return Type +} + +func (x *Peapod) Path() string { + return x.path +} + +func (x *Peapod) SetCompressor(cc *compression.Config) { + x.compress = cc +} + +func (x *Peapod) SetReportErrorFunc(func(string, error)) { + // no-op like FSTree +} + +// Get reads data from the underlying database by the given object address. +// Returns apistatus.ErrObjectNotFound if object is missing in the Peapod. +func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) { + var data []byte + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + val := bktRoot.Get(keyForObject(prm.Address)) + if val == nil { + return apistatus.ErrObjectNotFound + } + + data = slice.Copy(val) + + return nil + }) + if err != nil { + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.GetRes{}, logicerr.Wrap(err) + } + return common.GetRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + // copy-paste from FSTree + data, err = x.compress.Decompress(data) + if err != nil { + return common.GetRes{}, fmt.Errorf("decompress data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, fmt.Errorf("decode object from binary: %w", err) + } + + return common.GetRes{Object: obj, RawData: data}, err +} + +// GetRange works like Get but reads specific payload range. +func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { + // copy-paste from FSTree + res, err := x.Get(common.GetPrm{Address: prm.Address}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := res.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{}) + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil +} + +// Exists checks presence of the object in the underlying database by the given +// address. +func (x *Peapod) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { + var res common.ExistsRes + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + res.Exists = bktRoot.Get(keyForObject(prm.Address)) != nil + + return nil + }) + if err != nil { + return common.ExistsRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return res, nil +} + +var storageID = []byte("peapod") + +// Put saves given data in the underlying database by specified object address. +// The data can be anything, but in practice a binary NeoFS object is expected. +// Operation is executed within provided context: if the context is done, Put +// returns its error (in this case data may be saved). +// +// Put returns common.ErrReadOnly if Peadpod is read-only. +func (x *Peapod) Put(prm common.PutPrm) (common.PutRes, error) { + if !prm.DontCompress { + prm.RawData = x.compress.Compress(prm.RawData) + } + + // Track https://github.com/nspcc-dev/neofs-node/issues/2480 + err := x.batch(context.TODO(), func(bktRoot *bbolt.Bucket) error { + return bktRoot.Put(keyForObject(prm.Address), prm.RawData) + }) + + return common.PutRes{ + StorageID: storageID, + }, err +} + +// Delete removes data associated with the given object address from the +// underlying database. Delete returns apistatus.ErrObjectNotFound if object is +// missing. +// +// Put returns common.ErrReadOnly if Peadpod is read-only. +func (x *Peapod) Delete(prm common.DeletePrm) (common.DeleteRes, error) { + // Track https://github.com/nspcc-dev/neofs-node/issues/2480 + err := x.batch(context.TODO(), func(bktRoot *bbolt.Bucket) error { + key := keyForObject(prm.Address) + if bktRoot.Get(key) == nil { + return apistatus.ErrObjectNotFound + } + + return bktRoot.Delete(key) + }) + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.DeleteRes{}, logicerr.Wrap(err) + } + + return common.DeleteRes{}, err +} + +func (x *Peapod) batch(ctx context.Context, fBktRoot func(bktRoot *bbolt.Bucket) error) error { + if x.readOnly { + return common.ErrReadOnly + } + + x.currentBatchMtx.RLock() + + currentBatch := x.currentBatch + + if currentBatch.initErr != nil { + x.currentBatchMtx.RUnlock() + return currentBatch.initErr + } + + // bbolt.Bucket.Put MUST NOT be called concurrently. This is not obvious from + // the docs, but panic occurs in practice + currentBatch.bktRootMtx.Lock() + err := fBktRoot(currentBatch.bktRoot) + currentBatch.bktRootMtx.Unlock() + if err != nil { + x.currentBatchMtx.RUnlock() + return fmt.Errorf("put object into BoltDB bucket for container: %w", err) + } + + currentBatch.nonIdle = true + + x.currentBatchMtx.RUnlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-currentBatch.chCommitted: + return currentBatch.commitErr + } +} + +// Iterate iterates over all objects stored in the underlying database and +// passes them into LazyHandler or Handler. Break on f's false return. +func (x *Peapod) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + var addr oid.Address + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + return bktRoot.ForEach(func(k, v []byte) error { + err := decodeKeyForObject(&addr, k) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + + return nil + } + + return fmt.Errorf("decode object address from bucket key: %w", err) + } + + v, err = x.compress.Decompress(v) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + + return nil + } + + return fmt.Errorf("decompress value for object '%s': %w", addr, err) + } + + if prm.LazyHandler != nil { + return prm.LazyHandler(addr, func() ([]byte, error) { + return v, nil + }) + } + + return prm.Handler(common.IterationElement{ + ObjectData: v, + Address: addr, + StorageID: storageID, + }) + }) + }) + if err != nil { + return common.IterateRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return common.IterateRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go new file mode 100644 index 0000000000..199825e205 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -0,0 +1,299 @@ +package peapod_test + +import ( + "crypto/rand" + "fmt" + "path/filepath" + "sync" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/stretchr/testify/require" +) + +func TestGeneric(t *testing.T) { + newPath := func() string { + return filepath.Join(t.TempDir(), "peapod.db") + } + + blobstortest.TestAll(t, func(t *testing.T) common.Storage { + return peapod.New(newPath(), 0600) + }, 2048, 16*1024) + + t.Run("info", func(t *testing.T) { + path := newPath() + blobstortest.TestInfo(t, func(t *testing.T) common.Storage { + return peapod.New(path, 0600) + }, peapod.Type, path) + }) +} + +func TestControl(t *testing.T) { + blobstortest.TestControl(t, func(t *testing.T) common.Storage { + return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600) + }, 2048, 2048) +} + +func testPeapodPath(tb testing.TB) string { + return filepath.Join(tb.TempDir(), "peapod.db") +} + +func newTestPeapod(tb testing.TB) *peapod.Peapod { + ppd := _newTestPeapod(tb, testPeapodPath(tb), false) + tb.Cleanup(func() { _ = ppd.Close() }) + return ppd +} + +// creates new read-only peapod.Peapod with one stored object. +func newTestPeapodReadOnly(tb testing.TB) (*peapod.Peapod, oid.Address) { + path := testPeapodPath(tb) + + ppd := _newTestPeapod(tb, path, false) + addr := oidtest.Address() + + _, err := ppd.Put(common.PutPrm{ + Address: addr, + RawData: []byte("Hello, world!"), + DontCompress: false, + }) + require.NoError(tb, err) + require.NoError(tb, ppd.Close()) + + ppd = _newTestPeapod(tb, path, true) + + tb.Cleanup(func() { _ = ppd.Close() }) + + return ppd, addr +} + +func _newTestPeapod(tb testing.TB, path string, readOnly bool) *peapod.Peapod { + ppd := peapod.New(path, 0600) + require.NoError(tb, ppd.Open(readOnly)) + require.NoError(tb, ppd.Init()) + + return ppd +} + +func TestPeapod_Get(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + getPrm := common.GetPrm{Address: addr} + + _, err = ppd.Get(getPrm) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err := ppd.Get(getPrm) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) +} + +func TestPeapod_Exists(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + data := []byte("Hello, world!") + + existsPrm := common.ExistsPrm{ + Address: addr, + } + + res, err := ppd.Exists(existsPrm) + require.NoError(t, err) + require.False(t, res.Exists) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err = ppd.Exists(existsPrm) + require.NoError(t, err) + require.True(t, res.Exists) +} + +func TestPeapod_Iterate(t *testing.T) { + ppd := newTestPeapod(t) + + mSrc := map[oid.Address][]byte{ + oidtest.Address(): {1, 2, 3}, + oidtest.Address(): {4, 5, 6}, + oidtest.Address(): {7, 8, 9}, + } + + mDst := make(map[oid.Address][]byte) + + f := func(el common.IterationElement) error { + mDst[el.Address] = el.ObjectData + return nil + } + + iterPrm := common.IteratePrm{ + Handler: f, + } + + _, err := ppd.Iterate(iterPrm) + require.NoError(t, err) + require.Empty(t, mDst) + + for addr, data := range mSrc { + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + } + + _, err = ppd.Iterate(iterPrm) + require.NoError(t, err) + require.Equal(t, mSrc, mDst) +} + +func TestPeapod_Put(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err := ppd.Get(common.GetPrm{ + Address: addr, + }) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) + + t.Run("read-only", func(t *testing.T) { + ppd, _ := newTestPeapodReadOnly(t) + + _, err := ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.ErrorIs(t, err, common.ErrReadOnly) + }) +} + +func TestPeapod_Delete(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + _, err = ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + getPrm := common.GetPrm{ + Address: addr, + } + + res, err := ppd.Get(getPrm) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) + + _, err = ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.NoError(t, err) + + res, err = ppd.Get(getPrm) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + t.Run("read-only", func(t *testing.T) { + ppd, addr := newTestPeapodReadOnly(t) + + _, err := ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.ErrorIs(t, err, common.ErrReadOnly) + }) +} + +func benchmark(b *testing.B, ppd *peapod.Peapod, objSize uint64, nThreads int) { + data := make([]byte, objSize) + rand.Read(data) + + prm := common.PutPrm{ + RawData: data, + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + + for i := 0; i < nThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + prm := prm + prm.Address = oidtest.Address() + + _, err := ppd.Put(prm) + require.NoError(b, err) + }() + } + + wg.Wait() + } +} + +func BenchmarkPeapod_Put(b *testing.B) { + ppd := newTestPeapod(b) + + for _, tc := range []struct { + objSize uint64 + nThreads int + }{ + {1, 1}, + {1, 20}, + {1, 100}, + {1 << 10, 1}, + {1 << 10, 20}, + {1 << 10, 100}, + {100 << 10, 1}, + {100 << 10, 20}, + {100 << 10, 100}, + } { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + benchmark(b, ppd, tc.objSize, tc.nThreads) + }) + } +} From ec8022901333a44e3fb48caf0b2a7ef69bf179a5 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 1 Aug 2023 12:11:09 +0400 Subject: [PATCH 2/6] node/config: Fix type in FSTree config docs Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go b/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go index cf9df9020a..6595e9375d 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go @@ -6,7 +6,7 @@ import ( ) // Config is a wrapper over the config section -// which provides access to Blobovnicza configurations. +// which provides access to FSTree configurations. type Config config.Config // DepthDefault is a default shallow dir depth. From e36ad52372e5e7a3274cfe0d15bd19b3790ca07e Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 2 Aug 2023 15:05:26 +0400 Subject: [PATCH 3/6] Introduce tool to migrate objects from Blobovnicza tree to Peapod Add `cmd/blobovnicza-to-peapod` application which accepts YAML configuration file of the storage node and, for each configured shard, overtakes data from Blobovnicza tree to Peapod created in the parent directory. The tool is going to be used for phased and safe rejection of the Blobovnicza trees and the transition to Peapods. Refs #2453. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 12 + cmd/blobovnicza-to-peapod/main.go | 261 ++++++++++++++++++ .../blobstor/common/storage.go | 61 +++- .../blobstor/common/storage_test.go | 63 +++++ 4 files changed, 396 insertions(+), 1 deletion(-) create mode 100644 cmd/blobovnicza-to-peapod/main.go create mode 100644 pkg/local_object_storage/blobstor/common/storage_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c9e648fd58..739385e9c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Changelog for NeoFS Node - Histogram metrics for RPC and engine operations (#2351) - SN's version is announced via the attributes automatically but can be overwritten explicitly (#2455) - New storage component for small objects named Peapod (#2453) +- New `blobovnicza-to-peapod` tool providing blobovnicza-to-peapod data migration (#2453) ### Fixed - `neo-go` RPC connection loss handling (#1337) @@ -72,6 +73,17 @@ Docker images now contain a single executable file and SSL certificates only. `neofs-cli control healthcheck` exit code is `0` only for "READY" state. +To migrate data from Blobovnicza trees to Peapods: +```shell +$ blobovnicza-to-peapod -config +``` +For any shard, the data from the configured Blobovnicza tree is copied into +a created Peapod file named `peapod.db` in the directory where the tree is +located. For example, `/neofs/data/blobovcniza/*` -> `/neofs/data/peapod.db`. +Notice that existing Blobovnicza trees are untouched. Configuration is also +updated, for example, `/etc/neofs/config.yaml` -> `/etc/neofs/config_peapod.yaml`. +WARN: carefully review the updated config before using it in the application! + ## [0.37.0] - 2023-06-15 - Sogado ### Added diff --git a/cmd/blobovnicza-to-peapod/main.go b/cmd/blobovnicza-to-peapod/main.go new file mode 100644 index 0000000000..2380da2f4d --- /dev/null +++ b/cmd/blobovnicza-to-peapod/main.go @@ -0,0 +1,261 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "io/fs" + "log" + "os" + "path/filepath" + "strings" + + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" + engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" + shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" + blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + "gopkg.in/yaml.v3" +) + +func main() { + nodeCfgPath := flag.String("config", "", "Path to storage node's YAML configuration file") + + flag.Parse() + + if *nodeCfgPath == "" { + log.Fatal("missing storage node config flag") + } + + appCfg := config.New(config.Prm{}, config.WithConfigFile(*nodeCfgPath)) + + err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error { + log.Println("processing shard...") + + var bbcz common.Storage + var perm fs.FileMode + storagesCfg := sc.BlobStor().Storages() + + for i := range storagesCfg { + if storagesCfg[i].Type() == blobovniczatree.Type { + bbczCfg := blobovniczaconfig.From((*config.Config)(storagesCfg[i])) + + perm = storagesCfg[i].Perm() + bbcz = blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithRootPath(storagesCfg[i].Path()), + blobovniczatree.WithPermissions(storagesCfg[i].Perm()), + blobovniczatree.WithBlobovniczaSize(bbczCfg.Size()), + blobovniczatree.WithBlobovniczaShallowDepth(bbczCfg.ShallowDepth()), + blobovniczatree.WithBlobovniczaShallowWidth(bbczCfg.ShallowWidth()), + blobovniczatree.WithOpenedCacheSize(bbczCfg.OpenedCacheSize())) + + break + } + } + + if bbcz == nil { + log.Println("Blobovnicza is not configured for the current shard, going to next one...") + return nil + } + + bbczPath := bbcz.Path() + if !filepath.IsAbs(bbczPath) { + log.Fatalf("Blobobvnicza tree path '%s' is not absolute, make it like this in the config file first\n", bbczPath) + } + + ppdPath := filepath.Join(filepath.Dir(bbcz.Path()), "peapod.db") + ppd := peapod.New(ppdPath, perm) + + var compressCfg compression.Config + compressCfg.Enabled = sc.Compress() + compressCfg.UncompressableContentTypes = sc.UncompressableContentTypes() + + err := compressCfg.Init() + if err != nil { + log.Fatal("init compression config for the current shard: ", err) + } + + bbcz.SetCompressor(&compressCfg) + ppd.SetCompressor(&compressCfg) + + log.Printf("migrating data from Blobovnicza tree '%s' to Peapod '%s'...\n", bbcz.Path(), ppd.Path()) + + err = common.Copy(ppd, bbcz) + if err != nil { + log.Fatal("migration failed: ", err) + } + + log.Println("data successfully migrated in the current shard, going to the next one...") + + return nil + }) + if err != nil { + log.Fatal(err) + } + + srcPath := *nodeCfgPath + ss := strings.Split(srcPath, ".") + ss[0] += "_peapod" + + dstPath := strings.Join(ss, ".") + + log.Printf("data successfully migrated in all shards, migrating configuration to '%s' file...\n", dstPath) + + err = migrateConfigToPeapod(dstPath, srcPath) + if err != nil { + log.Fatal(err) + } +} + +func migrateConfigToPeapod(dstPath, srcPath string) error { + fData, err := os.ReadFile(srcPath) + if err != nil { + return fmt.Errorf("read source config config file: %w", err) + } + + var mConfig map[any]any + + err = yaml.Unmarshal(fData, &mConfig) + if err != nil { + return fmt.Errorf("decode config from YAML: %w", err) + } + + v, ok := mConfig["storage"] + if !ok { + return errors.New("missing 'storage' section") + } + + mStorage, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage' section type: %T instead of %T", v, mStorage) + } + + v, ok = mStorage["shard"] + if !ok { + return errors.New("missing 'storage.shard' section") + } + + mShards, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage.shard' section type: %T instead of %T", v, mShards) + } + + replaceBlobovniczaWithPeapod := func(mShard map[any]any, shardDesc any) error { + v, ok := mShard["blobstor"] + if !ok { + return fmt.Errorf("missing 'blobstor' section in shard '%v' config", shardDesc) + } + + sBlobStor, ok := v.([]any) + if !ok { + return fmt.Errorf("unexpected 'blobstor' section type in shard '%v': %T instead of %T", shardDesc, v, sBlobStor) + } + + var bbczSubStorage map[any]any + + for i := range sBlobStor { + mSubStorage, ok := sBlobStor[i].(map[any]any) + if !ok { + return fmt.Errorf("unexpected sub-storage #%d type in shard '%v': %T instead of %T", i, shardDesc, v, mStorage) + } + + v, ok := mSubStorage["type"] + if ok { + typ, ok := v.(string) + if !ok { + return fmt.Errorf("unexpected type of sub-storage name: %T instead of %T", v, typ) + } + + if typ == blobovniczatree.Type { + bbczSubStorage = mSubStorage + } + + continue + } + + // in 'default' section 'type' may be missing + + _, withDepth := mSubStorage["depth"] + _, withWidth := mSubStorage["width"] + + if withWidth && withDepth { + bbczSubStorage = mSubStorage + } + } + + if bbczSubStorage == nil { + log.Printf("blobovnicza tree is not configured for the shard '%s', skip\n", shardDesc) + return nil + } + + for k := range bbczSubStorage { + switch k { + default: + delete(bbczSubStorage, k) + case "type", "path", "perm": + } + } + + bbczSubStorage["type"] = peapod.Type + + v, ok = bbczSubStorage["path"] + if ok { + path, ok := v.(string) + if !ok { + return fmt.Errorf("unexpected sub-storage path type: %T instead of %T", v, path) + } + + bbczSubStorage["path"] = filepath.Join(filepath.Dir(path), "peapod.db") + } + + return nil + } + + v, ok = mShards["default"] + if ok { + mShard, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage.shard.default' section type: %T instead of %T", v, mShard) + } + + err = replaceBlobovniczaWithPeapod(mShard, "default") + if err != nil { + return err + } + } + + for i := 0; ; i++ { + v, ok = mShards[i] + if !ok { + if i == 0 { + return errors.New("missing numbered shards") + } + break + } + + mShard, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage.shard.%d' section type: %T instead of %T", i, v, mStorage) + } + + err = replaceBlobovniczaWithPeapod(mShard, i) + if err != nil { + return err + } + } + + data, err := yaml.Marshal(mConfig) + if err != nil { + return fmt.Errorf("encode modified config into YAML: %w", err) + } + + err = os.WriteFile(dstPath, data, 0640) + if err != nil { + return fmt.Errorf("write resulting config to the destination file: %w", err) + } + + return nil +} diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index b66b412005..6a6ecd9a91 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -1,6 +1,10 @@ package common -import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" +) // Storage represents key-value object storage. // It is used as a building block for a blobstor of a shard. @@ -23,3 +27,58 @@ type Storage interface { Delete(DeletePrm) (DeleteRes, error) Iterate(IteratePrm) (IterateRes, error) } + +// Copy copies all objects from source Storage into the destination one. If any +// object cannot be stored, Copy immediately fails. +func Copy(dst, src Storage) error { + err := src.Open(true) + if err != nil { + return fmt.Errorf("open source sub-storage: %w", err) + } + + defer func() { _ = src.Close() }() + + err = src.Init() + if err != nil { + return fmt.Errorf("initialize source sub-storage: %w", err) + } + + err = dst.Open(false) + if err != nil { + return fmt.Errorf("open destination sub-storage: %w", err) + } + + defer func() { _ = dst.Close() }() + + err = dst.Init() + if err != nil { + return fmt.Errorf("initialize destination sub-storage: %w", err) + } + + _, err = src.Iterate(IteratePrm{ + Handler: func(el IterationElement) error { + exRes, err := dst.Exists(ExistsPrm{ + Address: el.Address, + }) + if err != nil { + return fmt.Errorf("check presence of object %s in the destination sub-storage: %w", el.Address, err) + } else if exRes.Exists { + return nil + } + + _, err = dst.Put(PutPrm{ + Address: el.Address, + RawData: el.ObjectData, + }) + if err != nil { + return fmt.Errorf("put object %s into destination sub-storage: %w", el.Address, err) + } + return nil + }, + }) + if err != nil { + return fmt.Errorf("iterate over source sub-storage: %w", err) + } + + return nil +} diff --git a/pkg/local_object_storage/blobstor/common/storage_test.go b/pkg/local_object_storage/blobstor/common/storage_test.go new file mode 100644 index 0000000000..793862cbb3 --- /dev/null +++ b/pkg/local_object_storage/blobstor/common/storage_test.go @@ -0,0 +1,63 @@ +package common_test + +import ( + "crypto/rand" + "path/filepath" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestCopy(t *testing.T) { + dir := t.TempDir() + const nObjects = 100 + + src := blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithBlobovniczaShallowWidth(2), + blobovniczatree.WithBlobovniczaShallowDepth(3), + blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")), + ) + + require.NoError(t, src.Open(false)) + require.NoError(t, src.Init()) + + mObjs := make(map[oid.Address][]byte, nObjects) + + for i := 0; i < nObjects; i++ { + addr := oidtest.Address() + data := make([]byte, 32) + rand.Read(data) + mObjs[addr] = data + + _, err := src.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + } + + require.NoError(t, src.Close()) + + dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600) + + err := common.Copy(dst, src) + require.NoError(t, err) + + require.NoError(t, dst.Open(true)) + t.Cleanup(func() { _ = dst.Close() }) + + _, err = dst.Iterate(common.IteratePrm{ + Handler: func(el common.IterationElement) error { + data, ok := mObjs[el.Address] + require.True(t, ok) + require.Equal(t, data, el.ObjectData) + return nil + }, + }) + require.NoError(t, err) +} From c5112c435fdeb748719155f8787d9626a799b437 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 11 Aug 2023 16:50:54 +0400 Subject: [PATCH 4/6] blobstortest: Use `errors.Is` to asset iteration error Error wrapping is normal, so we should always be ready to it. Signed-off-by: Leonard Lyubich --- .../blobstor/internal/blobstortest/iterate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go index f54c255b97..e06c95a539 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go @@ -95,7 +95,7 @@ func TestIterate(t *testing.T, cons Constructor, min, max uint64) { } _, err := s.Iterate(iterPrm) - require.Equal(t, logicErr, err) + require.ErrorIs(t, err, logicErr) require.Equal(t, len(objects)/2, len(seen)) for i := range objects { d, ok := seen[objects[i].addr.String()] From c060b16fca8c29728f99747b27b91ec1e75989d1 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 11 Aug 2023 19:14:06 +0400 Subject: [PATCH 5/6] node: Support Peapod as BlobStor's sub-storage Support `peapod` sub-storage type in BlobStor configuration. Refs #2453. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 8 ++++++++ cmd/neofs-node/config.go | 9 +++++++++ cmd/neofs-node/config/engine/config_test.go | 10 ++++------ cmd/neofs-node/config/engine/shard/blobstor/config.go | 3 ++- cmd/neofs-node/validate.go | 3 ++- config/example/node.env | 9 +++------ config/example/node.json | 4 ++-- config/example/node.yaml | 4 ++-- docs/storage-node-configuration.md | 8 +++++++- 9 files changed, 39 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 739385e9c3..d6dc70781d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ Changelog for NeoFS Node ## [Unreleased] +This is the last release to support the Blobovnicza tree. Starting with the next +minor release, the component will be purged, so be prepared (see `Updating` section). + ### Added - Embedded Neo contracts in `contracts` dir (#2391) - `dump-names` command for adm @@ -84,6 +87,11 @@ Notice that existing Blobovnicza trees are untouched. Configuration is also updated, for example, `/etc/neofs/config.yaml` -> `/etc/neofs/config_peapod.yaml`. WARN: carefully review the updated config before using it in the application! +To store small objects in more effective and simple sub-storage Peapod, replace +`blobovnicza` sub-storage with the `peapod` one in `blobstor` config section. +If storage node already stores some data, don't forget to make data migration +described above. + ## [0.37.0] - 2023-06-15 - Sogado ### Added diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 5644ef7b20..3d419ead7f 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -36,6 +36,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" @@ -264,6 +265,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() + case peapod.Type: default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } @@ -732,6 +734,13 @@ func (c *cfg) shardOpts() []shardOptsWithID { return true }, }) + case peapod.Type: + ss = append(ss, blobstor.SubStorage{ + Storage: peapod.New(sRead.path, sRead.perm), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) default: // should never happen, that has already // been handled: when the config was read diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index c57017fb58..4080458370 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -12,6 +12,7 @@ import ( fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/stretchr/testify/require" ) @@ -133,12 +134,9 @@ func TestEngineSection(t *testing.T) { require.Equal(t, 2, len(ss)) - blz := blobovniczaconfig.From((*config.Config)(ss[0])) - require.Equal(t, "tmp/1/blob/blobovnicza", ss[0].Path()) - require.EqualValues(t, 4194304, blz.Size()) - require.EqualValues(t, 1, blz.ShallowDepth()) - require.EqualValues(t, 4, blz.ShallowWidth()) - require.EqualValues(t, 50, blz.OpenedCacheSize()) + require.Equal(t, "tmp/1/blob/peapod.db", ss[0].Path()) + require.EqualValues(t, 0644, ss[0].Perm()) + require.EqualValues(t, peapod.Type, ss[0].Type()) require.Equal(t, "tmp/1/blob", ss[1].Path()) require.EqualValues(t, 0644, ss[1].Perm()) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/config.go b/cmd/neofs-node/config/engine/shard/blobstor/config.go index 4e6dc86c2c..fe9ccdc8cc 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/config.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/storage" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" ) // Config is a wrapper over the config section @@ -28,7 +29,7 @@ func (x *Config) Storages() []*storage.Config { switch typ { case "": return ss - case fstree.Type, blobovniczatree.Type: + case fstree.Type, blobovniczatree.Type, peapod.Type: sub := storage.From((*config.Config)(x).Sub(strconv.Itoa(i))) ss = append(ss, sub) default: diff --git a/cmd/neofs-node/validate.go b/cmd/neofs-node/validate.go index 5f337731d8..573e8acf58 100644 --- a/cmd/neofs-node/validate.go +++ b/cmd/neofs-node/validate.go @@ -11,6 +11,7 @@ import ( treeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/tree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" "go.uber.org/zap/zapcore" ) @@ -54,7 +55,7 @@ func validateConfig(c *config.Config) error { } for i := range blobstor { switch blobstor[i].Type() { - case fstree.Type, blobovniczatree.Type: + case fstree.Type, blobovniczatree.Type, peapod.Type: default: // FIXME #1764 (@fyrchik): this line is currently unreachable, // because we panic in `sc.BlobStor().Storages()`. diff --git a/config/example/node.env b/config/example/node.env index 2e81750530..f9e1d49f84 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -152,12 +152,9 @@ NEOFS_STORAGE_SHARD_1_METABASE_MAX_BATCH_DELAY=20ms NEOFS_STORAGE_SHARD_1_COMPRESS=false NEOFS_STORAGE_SHARD_1_SMALL_OBJECT_SIZE=102400 ### Blobovnicza config -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=blobovnicza -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/blobovnicza -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_SIZE=4194304 -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH=1 -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH=4 -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50 +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=peapod +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/peapod.db +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PERM=0644 ### FSTree config NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE=fstree NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH=tmp/1/blob diff --git a/config/example/node.json b/config/example/node.json index 908662ba1a..1e33668c90 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -198,8 +198,8 @@ "small_object_size": 102400, "blobstor": [ { - "type": "blobovnicza", - "path": "tmp/1/blob/blobovnicza", + "type": "peapod", + "path": "tmp/1/blob/peapod.db", "perm": "0644", "size": 4194304, "depth": 1, diff --git a/config/example/node.yaml b/config/example/node.yaml index cfd3a6a956..f0823a172d 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -194,8 +194,8 @@ storage: path: tmp/1/meta # metabase path blobstor: - - type: blobovnicza - path: tmp/1/blob/blobovnicza + - type: peapod + path: tmp/1/blob/peapod.db # path to Peapod database - type: fstree path: tmp/1/blob # blobstor path no_sync: true diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 3ba935a3f3..ec30143bb0 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -175,7 +175,7 @@ The following table describes configuration for each shard. ### `blobstor` subsection Contains a list of substorages each with it's own type. -Currently only 2 types are supported: `fstree` and `blobovnicza`. +Currently only 3 types are supported: `fstree`, `blobovnicza` and `peapod`. ```yaml blobstor: @@ -215,6 +215,12 @@ blobstor: | `width` | `int` | `16` | Blobovnicza tree width. | | `opened_cache_capacity` | `int` | `16` | Maximum number of simultaneously opened blobovniczas. | +#### `peapod` type options +| Parameter | Type | Default value | Description | +|---------------------|-----------|---------------|-------------------------------------------------------| +| `path` | `string` | | Path to the Peapod database file. | +| `perm` | file mode | `0660` | Default permission for created files and directories. | + ### `gc` subsection Contains garbage-collection service configuration. It iterates over the blobstor and removes object the node no longer needs. From b01baf6552e55e513fc1de5791de7d209614038a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 15 Aug 2023 11:53:05 +0400 Subject: [PATCH 6/6] node: Make Peapod's flush interval configurable There may be a need to tune time interval b/w batch writes to disk in Peapod component. Add storage node's config with `flush_interval` key of type duration defaulting to 10ms. Signed-off-by: Leonard Lyubich --- cmd/blobovnicza-to-peapod/main.go | 3 +- cmd/neofs-node/config.go | 16 ++++++--- cmd/neofs-node/config/engine/config_test.go | 4 ++- .../engine/shard/blobstor/peapod/config.go | 34 +++++++++++++++++++ config/example/node.env | 1 + config/example/node.json | 5 +-- config/example/node.yaml | 1 + docs/storage-node-configuration.md | 1 + .../blobstor/common/storage_test.go | 3 +- .../blobstor/peapod/peapod.go | 20 ++++++++--- .../blobstor/peapod/peapod_test.go | 9 ++--- 11 files changed, 77 insertions(+), 20 deletions(-) create mode 100644 cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go diff --git a/cmd/blobovnicza-to-peapod/main.go b/cmd/blobovnicza-to-peapod/main.go index 2380da2f4d..80b8ba5d33 100644 --- a/cmd/blobovnicza-to-peapod/main.go +++ b/cmd/blobovnicza-to-peapod/main.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" @@ -67,7 +68,7 @@ func main() { } ppdPath := filepath.Join(filepath.Dir(bbcz.Path()), "peapod.db") - ppd := peapod.New(ppdPath, perm) + ppd := peapod.New(ppdPath, perm, 10*time.Millisecond) var compressCfg compression.Config compressCfg.Enabled = sc.Compress() diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 3d419ead7f..74d143f6f7 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -25,6 +25,7 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" + peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" @@ -159,9 +160,11 @@ func (c *shardCfg) id() string { type subStorageCfg struct { // common for all storages - typ string - path string - perm fs.FileMode + typ string + path string + perm fs.FileMode + + // tree-specific (FS and blobovnicza) depth uint64 noSync bool @@ -169,6 +172,9 @@ type subStorageCfg struct { size uint64 width uint64 openedCacheSize int + + // Peapod-specific + flushInterval time.Duration } // readConfig fills applicationConfiguration with raw configuration values @@ -266,6 +272,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() case peapod.Type: + peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i])) + sCfg.flushInterval = peapodCfg.FlushInterval() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } @@ -736,7 +744,7 @@ func (c *cfg) shardOpts() []shardOptsWithID { }) case peapod.Type: ss = append(ss, blobstor.SubStorage{ - Storage: peapod.New(sRead.path, sRead.perm), + Storage: peapod.New(sRead.path, sRead.perm, sRead.flushInterval), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < shCfg.smallSizeObjectLimit }, diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index 4080458370..dc49f9a946 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -10,6 +10,7 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" + peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" @@ -133,10 +134,11 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 102400, sc.SmallSizeLimit()) require.Equal(t, 2, len(ss)) - + ppd := peapodconfig.From((*config.Config)(ss[0])) require.Equal(t, "tmp/1/blob/peapod.db", ss[0].Path()) require.EqualValues(t, 0644, ss[0].Perm()) require.EqualValues(t, peapod.Type, ss[0].Type()) + require.EqualValues(t, 30*time.Millisecond, ppd.FlushInterval()) require.Equal(t, "tmp/1/blob", ss[1].Path()) require.EqualValues(t, 0644, ss[1].Perm()) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go new file mode 100644 index 0000000000..cd01d21ca9 --- /dev/null +++ b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go @@ -0,0 +1,34 @@ +package peapodconfig + +import ( + "time" + + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" +) + +// Config is a wrapper over the config section +// which provides access to Peapod configurations. +type Config config.Config + +// Various Peapod config defaults. +const ( + // DefaultFlushInterval is a default time interval between Peapod's batch writes + // to disk. + DefaultFlushInterval = 10 * time.Millisecond +) + +// From wraps config section into Config. +func From(c *config.Config) *Config { + return (*Config)(c) +} + +// FlushInterval returns the value of "flush_interval" config parameter. +// +// Returns DefaultFlushInterval if the value is not a positive duration. +func (x *Config) FlushInterval() time.Duration { + d := config.DurationSafe((*config.Config)(x), "flush_interval") + if d > 0 { + return d + } + return DefaultFlushInterval +} diff --git a/config/example/node.env b/config/example/node.env index f9e1d49f84..5449b6567a 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -155,6 +155,7 @@ NEOFS_STORAGE_SHARD_1_SMALL_OBJECT_SIZE=102400 NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=peapod NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/peapod.db NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PERM=0644 +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_FLUSH_INTERVAL=30ms ### FSTree config NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE=fstree NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH=tmp/1/blob diff --git a/config/example/node.json b/config/example/node.json index 1e33668c90..fa11730f57 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -201,10 +201,7 @@ "type": "peapod", "path": "tmp/1/blob/peapod.db", "perm": "0644", - "size": 4194304, - "depth": 1, - "width": 4, - "opened_cache_capacity": 50 + "flush_interval": "30ms" }, { "type": "fstree", diff --git a/config/example/node.yaml b/config/example/node.yaml index f0823a172d..fde52a969f 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -196,6 +196,7 @@ storage: blobstor: - type: peapod path: tmp/1/blob/peapod.db # path to Peapod database + flush_interval: 30ms # time interval between batch writes to disk (defaults to 10ms) - type: fstree path: tmp/1/blob # blobstor path no_sync: true diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index ec30143bb0..6eccf54569 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -220,6 +220,7 @@ blobstor: |---------------------|-----------|---------------|-------------------------------------------------------| | `path` | `string` | | Path to the Peapod database file. | | `perm` | file mode | `0660` | Default permission for created files and directories. | +| `flush_interval` | `duration`| `10ms` | Time interval between batch writes to disk. | ### `gc` subsection diff --git a/pkg/local_object_storage/blobstor/common/storage_test.go b/pkg/local_object_storage/blobstor/common/storage_test.go index 793862cbb3..33f494f913 100644 --- a/pkg/local_object_storage/blobstor/common/storage_test.go +++ b/pkg/local_object_storage/blobstor/common/storage_test.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "path/filepath" "testing" + "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" @@ -43,7 +44,7 @@ func TestCopy(t *testing.T) { require.NoError(t, src.Close()) - dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600) + dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600, 10*time.Millisecond) err := common.Copy(dst, src) require.NoError(t, err) diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go index 18d6569db8..20aa8c595e 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -43,6 +43,8 @@ type Peapod struct { path string perm fs.FileMode + flushInterval time.Duration + compress *compression.Config readOnly bool @@ -63,18 +65,24 @@ var errMissingRootBucket = errors.New("missing root bucket") // New creates new Peapod instance to be located at the given path with // specified permissions. -func New(path string, perm fs.FileMode) *Peapod { +// +// Specified flush interval MUST be positive (see Init). +func New(path string, perm fs.FileMode, flushInterval time.Duration) *Peapod { + if flushInterval <= 0 { + panic(fmt.Sprintf("non-positive flush interval %v", flushInterval)) + } return &Peapod{ path: path, perm: perm, + + flushInterval: flushInterval, } } func (x *Peapod) flushLoop() { defer close(x.chFlushDone) - const flushInterval = 10 * time.Millisecond - t := time.NewTimer(flushInterval) + t := time.NewTimer(x.flushInterval) defer t.Stop() for { @@ -88,7 +96,7 @@ func (x *Peapod) flushLoop() { x.flushCurrentBatch(true) - interval := flushInterval - time.Since(st) + interval := x.flushInterval - time.Since(st) if interval <= 0 { interval = time.Microsecond } @@ -207,7 +215,9 @@ func (x *Peapod) Open(readOnly bool) error { return nil } -// Init initializes internal structure of the underlying database. +// Init initializes internal structure of the underlying database and runs +// flushing routine. The routine writes data batches into disk once per time +// interval configured in New. func (x *Peapod) Init() error { if x.readOnly { // no extra actions needed in read-only mode diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go index 199825e205..f428a72c41 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod_test.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" @@ -23,20 +24,20 @@ func TestGeneric(t *testing.T) { } blobstortest.TestAll(t, func(t *testing.T) common.Storage { - return peapod.New(newPath(), 0600) + return peapod.New(newPath(), 0600, 10*time.Millisecond) }, 2048, 16*1024) t.Run("info", func(t *testing.T) { path := newPath() blobstortest.TestInfo(t, func(t *testing.T) common.Storage { - return peapod.New(path, 0600) + return peapod.New(path, 0600, 10*time.Millisecond) }, peapod.Type, path) }) } func TestControl(t *testing.T) { blobstortest.TestControl(t, func(t *testing.T) common.Storage { - return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600) + return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600, 10*time.Millisecond) }, 2048, 2048) } @@ -73,7 +74,7 @@ func newTestPeapodReadOnly(tb testing.TB) (*peapod.Peapod, oid.Address) { } func _newTestPeapod(tb testing.TB, path string, readOnly bool) *peapod.Peapod { - ppd := peapod.New(path, 0600) + ppd := peapod.New(path, 0600, 10*time.Millisecond) require.NoError(tb, ppd.Open(readOnly)) require.NoError(tb, ppd.Init())