-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce new storage component Peapod
Currently, storage node saves relatively small NeoFS objects in Blobovnicza tree component: group of BoltDB wrappers managed as a tree. This component has pretty complex data structure, code implementation and dubious performance results. Peapod is a new storage component introduced to replace Blobovnicza one as more simple and effective. It also bases on single BoltDB instance, but organizes batch writes in a specific way. In future, Peapod is going to be used as a storage of small objects by the BlobStor. Signed-off-by: Leonard Lyubich <[email protected]>
- Loading branch information
1 parent
950ee7c
commit a7318e7
Showing
6 changed files
with
354 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package peapod_test | ||
|
||
import ( | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func newTestPeapod(tb testing.TB) *peapod.Peapod { | ||
ppd, err := peapod.New(filepath.Join(tb.TempDir(), "peapod.db")) | ||
require.NoError(tb, err) | ||
|
||
tb.Cleanup(func() { _ = ppd.Close() }) | ||
|
||
return ppd | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package peapod | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/nspcc-dev/neo-go/pkg/util/slice" | ||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" | ||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
"go.etcd.io/bbolt" | ||
) | ||
|
||
// Get reads data from the underlying database by the given object address. | ||
// Returns apistatus.ErrObjectNotFound if object is missing in the Peapod. | ||
func (x *Peapod) Get(addr oid.Address) ([]byte, error) { | ||
var data []byte | ||
|
||
err := x.bolt.View(func(tx *bbolt.Tx) error { | ||
bktRoot := tx.Bucket(rootBucket) | ||
if bktRoot == nil { | ||
return fmt.Errorf("%w: missing root bucket", apistatus.ErrObjectNotFound) | ||
} | ||
|
||
bktContainer := bktRoot.Bucket(keyForContainer(addr.Container())) | ||
if bktContainer == nil { | ||
return fmt.Errorf("%w: missing container bucket", apistatus.ErrObjectNotFound) | ||
} | ||
|
||
val := bktContainer.Get(keyForObject(addr.Object())) | ||
if val == nil { | ||
return apistatus.ErrObjectNotFound | ||
} | ||
|
||
data = slice.Copy(val) | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("exec read-only BoltDB transaction: %w", err) | ||
} | ||
|
||
return data, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package peapod_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" | ||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestPeapod_Get(t *testing.T) { | ||
ppd := newTestPeapod(t) | ||
addr := oidtest.Address() | ||
data := []byte("Hello, world!") | ||
|
||
_, err := ppd.Get(addr) | ||
require.ErrorIs(t, err, apistatus.ErrObjectNotFound) | ||
|
||
err = ppd.Put(context.Background(), addr, data) | ||
require.NoError(t, err) | ||
|
||
res, err := ppd.Get(addr) | ||
require.NoError(t, err) | ||
require.Equal(t, data, res) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package peapod | ||
|
||
import ( | ||
"crypto/sha256" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
"go.etcd.io/bbolt" | ||
) | ||
|
||
type batch struct { | ||
initErr error | ||
|
||
tx *bbolt.Tx | ||
|
||
commitErr error | ||
chCommitted chan struct{} | ||
|
||
bktRootMtx sync.Mutex | ||
bktRoot *bbolt.Bucket | ||
} | ||
|
||
// Peapod provides storage for relatively small NeoFS binary object (peas). | ||
// Peapod is a single low-level key/value database optimized to work with big | ||
// number of stored units. | ||
type Peapod struct { | ||
bolt *bbolt.DB | ||
|
||
currentBatchMtx sync.RWMutex | ||
currentBatch *batch | ||
|
||
chClose chan struct{} | ||
chFlushDone chan struct{} | ||
} | ||
|
||
var rootBucket = []byte("root") | ||
|
||
// New returns initialized Peapod instance located at the given path. Resulting Peapod | ||
// must be finally closed. | ||
func New(path string) (*Peapod, error) { | ||
db, err := bbolt.Open(path, 0600, &bbolt.Options{ | ||
Timeout: 100 * time.Millisecond, // to handle flock | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("open BoltDB instance: %w", err) | ||
} | ||
|
||
err = db.Update(func(tx *bbolt.Tx) error { | ||
_, err = tx.CreateBucketIfNotExists(rootBucket) | ||
return err | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("create root bucket in BoltDB instance: %w", err) | ||
} | ||
|
||
res := &Peapod{ | ||
bolt: db, | ||
chClose: make(chan struct{}), | ||
chFlushDone: make(chan struct{}), | ||
} | ||
|
||
res.beginNewBatch() | ||
|
||
go res.flushLoop() | ||
|
||
return res, nil | ||
} | ||
|
||
// Close syncs data and closes the database. | ||
func (x *Peapod) Close() error { | ||
close(x.chClose) | ||
<-x.chFlushDone | ||
return x.bolt.Close() | ||
} | ||
|
||
func (x *Peapod) flushLoop() { | ||
defer close(x.chFlushDone) | ||
|
||
ticker := time.NewTicker(10 * time.Millisecond) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-x.chClose: | ||
// commit current transaction to prevent bbolt.DB.Close blocking | ||
x.flushCurrentBatch(false) | ||
return | ||
case <-ticker.C: | ||
x.flushCurrentBatch(true) | ||
} | ||
} | ||
} | ||
|
||
func (x *Peapod) flushCurrentBatch(beginNew bool) { | ||
x.currentBatchMtx.Lock() | ||
|
||
err := x.currentBatch.tx.Commit() | ||
if err != nil { | ||
err = fmt.Errorf("commit BoltDB batch transaction: %w", err) | ||
} | ||
|
||
x.currentBatch.commitErr = err | ||
close(x.currentBatch.chCommitted) | ||
|
||
if beginNew { | ||
x.beginNewBatch() | ||
} | ||
|
||
x.currentBatchMtx.Unlock() | ||
} | ||
|
||
func (x *Peapod) beginNewBatch() { | ||
var err error | ||
x.currentBatch = new(batch) | ||
|
||
x.currentBatch.tx, x.currentBatch.initErr = x.bolt.Begin(true) | ||
if x.currentBatch.initErr != nil { | ||
x.currentBatch.initErr = fmt.Errorf("begin new BoltDB writable transaction: %w", x.currentBatch.initErr) | ||
return | ||
} | ||
|
||
x.currentBatch.bktRoot, x.currentBatch.initErr = x.currentBatch.tx.CreateBucketIfNotExists(rootBucket) | ||
if err != nil { | ||
x.currentBatch.initErr = fmt.Errorf("create BoltDB bucket for containers: %w", x.currentBatch.initErr) | ||
return | ||
} | ||
|
||
x.currentBatch.chCommitted = make(chan struct{}) | ||
} | ||
|
||
func keyForContainer(cnr cid.ID) []byte { | ||
b := make([]byte, sha256.Size) | ||
cnr.Encode(b) | ||
return b | ||
} | ||
|
||
func keyForObject(obj oid.ID) []byte { | ||
b := make([]byte, sha256.Size) | ||
obj.Encode(b) | ||
return b | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package peapod | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
) | ||
|
||
// Put saves given data in the underlying database by specified object address. | ||
// The data can be anything, but in practice a binary NeoFS object is expected. | ||
// Operation is executed within provided context: if the context is done, Put | ||
// returns its error (in this case data may be saved). | ||
func (x *Peapod) Put(ctx context.Context, addr oid.Address, data []byte) error { | ||
x.currentBatchMtx.RLock() | ||
|
||
currentBatch := x.currentBatch | ||
|
||
if currentBatch.initErr != nil { | ||
x.currentBatchMtx.RUnlock() | ||
return currentBatch.initErr | ||
} | ||
|
||
// bbolt.Bucket.Put MUST NOT be called concurrently. This is not obvious from | ||
// the docs, but panic occurs in practice | ||
currentBatch.bktRootMtx.Lock() | ||
|
||
bktContainer, err := currentBatch.bktRoot.CreateBucketIfNotExists(keyForContainer(addr.Container())) | ||
if err != nil { | ||
currentBatch.bktRootMtx.Unlock() | ||
x.currentBatchMtx.RUnlock() | ||
return fmt.Errorf("get bucket for the container: %w", err) | ||
} | ||
|
||
// currently single routine writes object into the root bucket at the moment. In | ||
// theory, we may make root bucket mutex r-w and have per-container locks, but | ||
// number of containers may be huge. To reduce number of per-container mutexes | ||
// we can add bucket levels and group containers (e.g. with similar prefix). | ||
// This also changes data structure and may affect performance/space usage. | ||
// | ||
// For simplicity, now we acquire root lock. Described optimization worth future | ||
// research. | ||
|
||
// TODO: should pre-check existence? Btw Blobovnicza don't do that | ||
err = bktContainer.Put(keyForObject(addr.Object()), data) | ||
currentBatch.bktRootMtx.Unlock() | ||
if err != nil { | ||
x.currentBatchMtx.RUnlock() | ||
return fmt.Errorf("put object into BoltDB bucket for container: %w", err) | ||
} | ||
|
||
x.currentBatchMtx.RUnlock() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-currentBatch.chCommitted: | ||
return currentBatch.commitErr | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package peapod_test | ||
|
||
import ( | ||
"context" | ||
"crypto/rand" | ||
"fmt" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" | ||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func benchmark(b *testing.B, ppd *peapod.Peapod, objSize uint64, nThreads int) { | ||
data := make([]byte, objSize) | ||
rand.Read(data) | ||
|
||
ctx := context.Background() | ||
|
||
b.ReportAllocs() | ||
b.ResetTimer() | ||
|
||
for i := 0; i < b.N; i++ { | ||
var wg sync.WaitGroup | ||
|
||
for i := 0; i < nThreads; i++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
|
||
addr := oidtest.Address() | ||
|
||
err := ppd.Put(ctx, addr, data) | ||
require.NoError(b, err) | ||
}() | ||
} | ||
|
||
wg.Wait() | ||
} | ||
} | ||
|
||
func BenchmarkPeapod_Put(b *testing.B) { | ||
ppd := newTestPeapod(b) | ||
|
||
for _, tc := range []struct { | ||
objSize uint64 | ||
nThreads int | ||
}{ | ||
{1, 1}, | ||
{1, 20}, | ||
{1, 100}, | ||
{1 << 10, 1}, | ||
{1 << 10, 20}, | ||
{1 << 10, 100}, | ||
{100 << 10, 1}, | ||
{100 << 10, 20}, | ||
{100 << 10, 100}, | ||
} { | ||
b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { | ||
benchmark(b, ppd, tc.objSize, tc.nThreads) | ||
}) | ||
} | ||
} |