-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce new storage component Peapod
Currently, storage node saves relatively small NeoFS objects in Blobovnicza tree component: group of BoltDB wrappers managed as a tree. This component has pretty complex data structure, code implementation and dubious performance results. Peapod is a new storage component introduced to replace Blobovnicza one as more simple and effective. It also bases on single BoltDB instance, but organizes batch writes in a specific way. In future, Peapod is going to be used as a storage of small objects by the BlobStor. Signed-off-by: Leonard Lyubich <[email protected]>
- Loading branch information
1 parent
950ee7c
commit 6c50e19
Showing
8 changed files
with
730 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package peapod | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" | ||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" | ||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" | ||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
"go.etcd.io/bbolt" | ||
) | ||
|
||
type storage struct { | ||
path string | ||
|
||
ppd *peapod.Peapod | ||
|
||
compress compression.Config | ||
} | ||
|
||
func New(path string) common.Storage { | ||
return &storage{ | ||
path: path, | ||
} | ||
} | ||
|
||
func (x *storage) Open(readOnly bool) error { | ||
var err error | ||
|
||
if x.ppd != nil { | ||
err := x.ppd.Close() | ||
if err != nil { | ||
return fmt.Errorf("close Peapod: %w", err) | ||
} | ||
} | ||
|
||
x.ppd, err = peapod.New(x.path, readOnly) | ||
return err | ||
} | ||
|
||
func (x *storage) Init() error { | ||
// no-op because peapod.New initializes everything | ||
return nil | ||
} | ||
|
||
func (x *storage) Close() error { | ||
err := x.ppd.Close() | ||
x.ppd = nil | ||
return err | ||
} | ||
|
||
// Type is peapod storage type used in logs and configuration. | ||
const Type = "peapod" | ||
|
||
func (x *storage) Type() string { | ||
return Type | ||
} | ||
|
||
func (x *storage) Path() string { | ||
return x.path | ||
} | ||
|
||
func (x *storage) SetCompressor(cc *compression.Config) { | ||
x.compress = *cc | ||
} | ||
|
||
func (x *storage) SetReportErrorFunc(func(string, error)) { | ||
// no-op like FSTree | ||
} | ||
|
||
func (x *storage) Get(prm common.GetPrm) (common.GetRes, error) { | ||
data, err := x.ppd.Get(prm.Address) | ||
if err != nil { | ||
if errors.Is(err, apistatus.ErrObjectNotFound) { | ||
return common.GetRes{}, logicerr.Wrap(err) | ||
} | ||
return common.GetRes{}, err | ||
} | ||
|
||
// copy-paste from FSTree | ||
data, err = x.compress.Decompress(data) | ||
if err != nil { | ||
return common.GetRes{}, fmt.Errorf("decompress data: %w", err) | ||
} | ||
|
||
obj := objectSDK.New() | ||
if err := obj.Unmarshal(data); err != nil { | ||
return common.GetRes{}, fmt.Errorf("decode object from binary: %w", err) | ||
} | ||
|
||
return common.GetRes{Object: obj, RawData: data}, err | ||
} | ||
|
||
func (x *storage) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { | ||
// copy-paste from FSTree | ||
res, err := x.Get(common.GetPrm{Address: prm.Address}) | ||
if err != nil { | ||
return common.GetRangeRes{}, err | ||
} | ||
|
||
payload := res.Object.Payload() | ||
from := prm.Range.GetOffset() | ||
to := from + prm.Range.GetLength() | ||
|
||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { | ||
return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{}) | ||
} | ||
|
||
return common.GetRangeRes{ | ||
Data: payload[from:to], | ||
}, nil | ||
} | ||
|
||
func (x *storage) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { | ||
var res common.ExistsRes | ||
var err error | ||
res.Exists, err = x.ppd.Exists(prm.Address) | ||
return res, err | ||
} | ||
|
||
func (x *storage) Put(prm common.PutPrm) (common.PutRes, error) { | ||
if !prm.DontCompress { | ||
prm.RawData = x.compress.Compress(prm.RawData) | ||
} | ||
|
||
// TODO: create issue to support Put op context | ||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) | ||
defer cancel() | ||
|
||
err := x.ppd.Put(ctx, prm.Address, prm.RawData) | ||
if err != nil && errors.Is(err, bbolt.ErrDatabaseReadOnly) { | ||
return common.PutRes{}, common.ErrReadOnly | ||
} | ||
|
||
return common.PutRes{}, err | ||
} | ||
|
||
func (x *storage) Delete(prm common.DeletePrm) (common.DeleteRes, error) { | ||
// TODO: create issue to support Put op context | ||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) | ||
defer cancel() | ||
|
||
err := x.ppd.Delete(ctx, prm.Address) | ||
if err != nil { | ||
if errors.Is(err, bbolt.ErrDatabaseReadOnly) { | ||
return common.DeleteRes{}, common.ErrReadOnly | ||
} | ||
if errors.Is(err, apistatus.ErrObjectNotFound) { | ||
return common.DeleteRes{}, logicerr.Wrap(err) | ||
} | ||
return common.DeleteRes{}, err | ||
} | ||
|
||
return common.DeleteRes{}, err | ||
} | ||
|
||
func (x *storage) Iterate(prm common.IteratePrm) (common.IterateRes, error) { | ||
var e error | ||
err := x.ppd.Iterate(func(addr oid.Address, data []byte) bool { | ||
if prm.LazyHandler != nil { | ||
e = prm.LazyHandler(addr, func() ([]byte, error) { | ||
return data, nil | ||
}) | ||
return e == nil | ||
} | ||
|
||
e = prm.Handler(common.IterationElement{ | ||
ObjectData: data, | ||
Address: addr, | ||
}) | ||
return e == nil | ||
}) | ||
if err != nil { | ||
return common.IterateRes{}, err | ||
} | ||
|
||
return common.IterateRes{}, e | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package peapod_test | ||
|
||
import ( | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" | ||
) | ||
|
||
func TestGeneric(t *testing.T) { | ||
newPath := func() string { | ||
return filepath.Join(t.TempDir(), "peapod.db") | ||
} | ||
|
||
blobstortest.TestAll(t, func(t *testing.T) common.Storage { | ||
return peapod.New(newPath()) | ||
}, 2048, 16*1024) | ||
|
||
t.Run("info", func(t *testing.T) { | ||
path := newPath() | ||
blobstortest.TestInfo(t, func(t *testing.T) common.Storage { | ||
return peapod.New(path) | ||
}, peapod.Type, path) | ||
}) | ||
} | ||
|
||
func TestControl(t *testing.T) { | ||
blobstortest.TestControl(t, func(t *testing.T) common.Storage { | ||
return peapod.New(filepath.Join(t.TempDir(), "peapod.db")) | ||
}, 2048, 2048) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package peapod_test | ||
|
||
import ( | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/peapod" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func newTestPeapod(tb testing.TB) *peapod.Peapod { | ||
ppd, err := peapod.New(filepath.Join(tb.TempDir(), "peapod.db"), false) | ||
require.NoError(tb, err) | ||
|
||
tb.Cleanup(func() { _ = ppd.Close() }) | ||
|
||
return ppd | ||
} |
Oops, something went wrong.