From 29f76c4051dd48b4d977fc7b4afe84dd40b1b025 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 29 Jul 2023 19:58:31 +0400 Subject: [PATCH] Introduce new storage component Peapod Currently, storage node saves relatively small NeoFS objects in Blobovnicza tree component: group of BoltDB wrappers managed as a tree. This component has pretty complex data structure, code implementation and dubious performance results. Peapod is a new storage component introduced to replace Blobovnicza one as more simple and effective. It also bases on single BoltDB instance, but organizes batch writes in a specific way. In future, Peapod is going to be used as a storage of small objects by the BlobStor. Signed-off-by: Leonard Lyubich --- .../peapod/common_test.go | 18 +++ pkg/local_object_storage/peapod/get.go | 37 +++++ pkg/local_object_storage/peapod/get_test.go | 26 +++ pkg/local_object_storage/peapod/peapod.go | 148 ++++++++++++++++++ pkg/local_object_storage/peapod/put.go | 44 ++++++ pkg/local_object_storage/peapod/put_test.go | 64 ++++++++ 6 files changed, 337 insertions(+) create mode 100644 pkg/local_object_storage/peapod/common_test.go create mode 100644 pkg/local_object_storage/peapod/get.go create mode 100644 pkg/local_object_storage/peapod/get_test.go create mode 100644 pkg/local_object_storage/peapod/peapod.go create mode 100644 pkg/local_object_storage/peapod/put.go create mode 100644 pkg/local_object_storage/peapod/put_test.go diff --git a/pkg/local_object_storage/peapod/common_test.go b/pkg/local_object_storage/peapod/common_test.go new file mode 100644 index 00000000000..2a718eb70e9 --- /dev/null +++ b/pkg/local_object_storage/peapod/common_test.go @@ -0,0 +1,18 @@ +package peapod_test + +import ( + "path/filepath" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" + "github.com/stretchr/testify/require" +) + +func newTestPeapod(tb testing.TB) *peapod.Peapod { + ppd, err := peapod.New(filepath.Join(tb.TempDir(), "peapod.db")) + require.NoError(tb, err) + + tb.Cleanup(func() { _ = ppd.Close() }) + + return ppd +} diff --git a/pkg/local_object_storage/peapod/get.go b/pkg/local_object_storage/peapod/get.go new file mode 100644 index 00000000000..04b0e693dba --- /dev/null +++ b/pkg/local_object_storage/peapod/get.go @@ -0,0 +1,37 @@ +package peapod + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/util/slice" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +// Get reads data from the underlying database by the given object address. +// Returns apistatus.ErrObjectNotFound if object is missing in the Peapod. +func (x *Peapod) Get(addr oid.Address) ([]byte, error) { + var data []byte + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return fmt.Errorf("%w: missing root bucket", apistatus.ErrObjectNotFound) + } + + val := bktRoot.Get(keyForObject(addr)) + if val == nil { + return apistatus.ErrObjectNotFound + } + + data = slice.Copy(val) + + return nil + }) + if err != nil { + return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return data, nil +} diff --git a/pkg/local_object_storage/peapod/get_test.go b/pkg/local_object_storage/peapod/get_test.go new file mode 100644 index 00000000000..6006893dd67 --- /dev/null +++ b/pkg/local_object_storage/peapod/get_test.go @@ -0,0 +1,26 @@ +package peapod_test + +import ( + "context" + "testing" + + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestPeapod_Get(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + data := []byte("Hello, world!") + + _, err := ppd.Get(addr) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + err = ppd.Put(context.Background(), addr, data) + require.NoError(t, err) + + res, err := ppd.Get(addr) + require.NoError(t, err) + require.Equal(t, data, res) +} diff --git a/pkg/local_object_storage/peapod/peapod.go b/pkg/local_object_storage/peapod/peapod.go new file mode 100644 index 00000000000..355efc582ec --- /dev/null +++ b/pkg/local_object_storage/peapod/peapod.go @@ -0,0 +1,148 @@ +package peapod + +import ( + "crypto/sha256" + "fmt" + "sync" + "time" + + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +type batch struct { + initErr error + + tx *bbolt.Tx + + nonIdle bool + + commitErr error + chCommitted chan struct{} + + bktRootMtx sync.Mutex + bktRoot *bbolt.Bucket +} + +// Peapod provides storage for relatively small NeoFS binary object (peas). +// Peapod is a single low-level key/value database optimized to work with big +// number of stored units. +type Peapod struct { + bolt *bbolt.DB + + currentBatchMtx sync.RWMutex + currentBatch *batch + + chClose chan struct{} + chFlushDone chan struct{} +} + +var rootBucket = []byte("root") + +// New returns initialized Peapod instance located at the given path. Resulting Peapod +// must be finally closed. +func New(path string) (*Peapod, error) { + db, err := bbolt.Open(path, 0600, &bbolt.Options{ + Timeout: 100 * time.Millisecond, // to handle flock + }) + if err != nil { + return nil, fmt.Errorf("open BoltDB instance: %w", err) + } + + err = db.Update(func(tx *bbolt.Tx) error { + _, err = tx.CreateBucketIfNotExists(rootBucket) + return err + }) + if err != nil { + return nil, fmt.Errorf("create root bucket in BoltDB instance: %w", err) + } + + res := &Peapod{ + bolt: db, + chClose: make(chan struct{}), + chFlushDone: make(chan struct{}), + } + + res.beginNewBatch() + + go res.flushLoop() + + return res, nil +} + +// Close syncs data and closes the database. +func (x *Peapod) Close() error { + close(x.chClose) + <-x.chFlushDone + return x.bolt.Close() +} + +func (x *Peapod) flushLoop() { + defer close(x.chFlushDone) + + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-x.chClose: + // commit current transaction to prevent bbolt.DB.Close blocking + x.flushCurrentBatch(false) + return + case <-ticker.C: + x.flushCurrentBatch(true) + } + } +} + +func (x *Peapod) flushCurrentBatch(beginNew bool) { + x.currentBatchMtx.Lock() + + if !x.currentBatch.nonIdle { + if !beginNew { + _ = x.currentBatch.tx.Rollback() + } + x.currentBatchMtx.Unlock() + return + } + + err := x.currentBatch.tx.Commit() + if err != nil { + err = fmt.Errorf("commit BoltDB batch transaction: %w", err) + } + + x.currentBatch.commitErr = err + close(x.currentBatch.chCommitted) + + if beginNew { + x.beginNewBatch() + } + + x.currentBatchMtx.Unlock() +} + +func (x *Peapod) beginNewBatch() { + var err error + x.currentBatch = new(batch) + + x.currentBatch.tx, x.currentBatch.initErr = x.bolt.Begin(true) + if x.currentBatch.initErr != nil { + x.currentBatch.initErr = fmt.Errorf("begin new BoltDB writable transaction: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.bktRoot, x.currentBatch.initErr = x.currentBatch.tx.CreateBucketIfNotExists(rootBucket) + if err != nil { + x.currentBatch.initErr = fmt.Errorf("create BoltDB bucket for containers: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.chCommitted = make(chan struct{}) +} + +func keyForObject(addr oid.Address) []byte { + b := make([]byte, 2*sha256.Size) + addr.Container().Encode(b) + addr.Object().Encode(b[sha256.Size:]) + return b +} diff --git a/pkg/local_object_storage/peapod/put.go b/pkg/local_object_storage/peapod/put.go new file mode 100644 index 00000000000..14c80da7229 --- /dev/null +++ b/pkg/local_object_storage/peapod/put.go @@ -0,0 +1,44 @@ +package peapod + +import ( + "context" + "fmt" + + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +// Put saves given data in the underlying database by specified object address. +// The data can be anything, but in practice a binary NeoFS object is expected. +// Operation is executed within provided context: if the context is done, Put +// returns its error (in this case data may be saved). +func (x *Peapod) Put(ctx context.Context, addr oid.Address, data []byte) error { + x.currentBatchMtx.RLock() + + currentBatch := x.currentBatch + + if currentBatch.initErr != nil { + x.currentBatchMtx.RUnlock() + return currentBatch.initErr + } + + // bbolt.Bucket.Put MUST NOT be called concurrently. This is not obvious from + // the docs, but panic occurs in practice + currentBatch.bktRootMtx.Lock() + err := currentBatch.bktRoot.Put(keyForObject(addr), data) + currentBatch.bktRootMtx.Unlock() + if err != nil { + x.currentBatchMtx.RUnlock() + return fmt.Errorf("put object into BoltDB bucket for container: %w", err) + } + + currentBatch.nonIdle = true + + x.currentBatchMtx.RUnlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-currentBatch.chCommitted: + return currentBatch.commitErr + } +} diff --git a/pkg/local_object_storage/peapod/put_test.go b/pkg/local_object_storage/peapod/put_test.go new file mode 100644 index 00000000000..f404150e909 --- /dev/null +++ b/pkg/local_object_storage/peapod/put_test.go @@ -0,0 +1,64 @@ +package peapod_test + +import ( + "context" + "crypto/rand" + "fmt" + "sync" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func benchmark(b *testing.B, ppd *peapod.Peapod, objSize uint64, nThreads int) { + data := make([]byte, objSize) + rand.Read(data) + + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + + for i := 0; i < nThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + addr := oidtest.Address() + + err := ppd.Put(ctx, addr, data) + require.NoError(b, err) + }() + } + + wg.Wait() + } +} + +func BenchmarkPeapod_Put(b *testing.B) { + ppd := newTestPeapod(b) + + for _, tc := range []struct { + objSize uint64 + nThreads int + }{ + {1, 1}, + {1, 20}, + {1, 100}, + {1 << 10, 1}, + {1 << 10, 20}, + {1 << 10, 100}, + {100 << 10, 1}, + {100 << 10, 20}, + {100 << 10, 100}, + } { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + benchmark(b, ppd, tc.objSize, tc.nThreads) + }) + } +}