diff --git a/CHANGELOG.md b/CHANGELOG.md index 155d3d2959..d6dc70781d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ Changelog for NeoFS Node ## [Unreleased] +This is the last release to support the Blobovnicza tree. Starting with the next +minor release, the component will be purged, so be prepared (see `Updating` section). + ### Added - Embedded Neo contracts in `contracts` dir (#2391) - `dump-names` command for adm @@ -11,6 +14,8 @@ Changelog for NeoFS Node - Stored payload metric per shard (#2023) - Histogram metrics for RPC and engine operations (#2351) - SN's version is announced via the attributes automatically but can be overwritten explicitly (#2455) +- New storage component for small objects named Peapod (#2453) +- New `blobovnicza-to-peapod` tool providing blobovnicza-to-peapod data migration (#2453) ### Fixed - `neo-go` RPC connection loss handling (#1337) @@ -71,6 +76,22 @@ Docker images now contain a single executable file and SSL certificates only. `neofs-cli control healthcheck` exit code is `0` only for "READY" state. +To migrate data from Blobovnicza trees to Peapods: +```shell +$ blobovnicza-to-peapod -config +``` +For any shard, the data from the configured Blobovnicza tree is copied into +a created Peapod file named `peapod.db` in the directory where the tree is +located. For example, `/neofs/data/blobovcniza/*` -> `/neofs/data/peapod.db`. +Notice that existing Blobovnicza trees are untouched. Configuration is also +updated, for example, `/etc/neofs/config.yaml` -> `/etc/neofs/config_peapod.yaml`. +WARN: carefully review the updated config before using it in the application! + +To store small objects in more effective and simple sub-storage Peapod, replace +`blobovnicza` sub-storage with the `peapod` one in `blobstor` config section. +If storage node already stores some data, don't forget to make data migration +described above. + ## [0.37.0] - 2023-06-15 - Sogado ### Added diff --git a/cmd/blobovnicza-to-peapod/main.go b/cmd/blobovnicza-to-peapod/main.go new file mode 100644 index 0000000000..80b8ba5d33 --- /dev/null +++ b/cmd/blobovnicza-to-peapod/main.go @@ -0,0 +1,262 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "io/fs" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" + engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" + shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" + blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + "gopkg.in/yaml.v3" +) + +func main() { + nodeCfgPath := flag.String("config", "", "Path to storage node's YAML configuration file") + + flag.Parse() + + if *nodeCfgPath == "" { + log.Fatal("missing storage node config flag") + } + + appCfg := config.New(config.Prm{}, config.WithConfigFile(*nodeCfgPath)) + + err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error { + log.Println("processing shard...") + + var bbcz common.Storage + var perm fs.FileMode + storagesCfg := sc.BlobStor().Storages() + + for i := range storagesCfg { + if storagesCfg[i].Type() == blobovniczatree.Type { + bbczCfg := blobovniczaconfig.From((*config.Config)(storagesCfg[i])) + + perm = storagesCfg[i].Perm() + bbcz = blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithRootPath(storagesCfg[i].Path()), + blobovniczatree.WithPermissions(storagesCfg[i].Perm()), + blobovniczatree.WithBlobovniczaSize(bbczCfg.Size()), + blobovniczatree.WithBlobovniczaShallowDepth(bbczCfg.ShallowDepth()), + blobovniczatree.WithBlobovniczaShallowWidth(bbczCfg.ShallowWidth()), + blobovniczatree.WithOpenedCacheSize(bbczCfg.OpenedCacheSize())) + + break + } + } + + if bbcz == nil { + log.Println("Blobovnicza is not configured for the current shard, going to next one...") + return nil + } + + bbczPath := bbcz.Path() + if !filepath.IsAbs(bbczPath) { + log.Fatalf("Blobobvnicza tree path '%s' is not absolute, make it like this in the config file first\n", bbczPath) + } + + ppdPath := filepath.Join(filepath.Dir(bbcz.Path()), "peapod.db") + ppd := peapod.New(ppdPath, perm, 10*time.Millisecond) + + var compressCfg compression.Config + compressCfg.Enabled = sc.Compress() + compressCfg.UncompressableContentTypes = sc.UncompressableContentTypes() + + err := compressCfg.Init() + if err != nil { + log.Fatal("init compression config for the current shard: ", err) + } + + bbcz.SetCompressor(&compressCfg) + ppd.SetCompressor(&compressCfg) + + log.Printf("migrating data from Blobovnicza tree '%s' to Peapod '%s'...\n", bbcz.Path(), ppd.Path()) + + err = common.Copy(ppd, bbcz) + if err != nil { + log.Fatal("migration failed: ", err) + } + + log.Println("data successfully migrated in the current shard, going to the next one...") + + return nil + }) + if err != nil { + log.Fatal(err) + } + + srcPath := *nodeCfgPath + ss := strings.Split(srcPath, ".") + ss[0] += "_peapod" + + dstPath := strings.Join(ss, ".") + + log.Printf("data successfully migrated in all shards, migrating configuration to '%s' file...\n", dstPath) + + err = migrateConfigToPeapod(dstPath, srcPath) + if err != nil { + log.Fatal(err) + } +} + +func migrateConfigToPeapod(dstPath, srcPath string) error { + fData, err := os.ReadFile(srcPath) + if err != nil { + return fmt.Errorf("read source config config file: %w", err) + } + + var mConfig map[any]any + + err = yaml.Unmarshal(fData, &mConfig) + if err != nil { + return fmt.Errorf("decode config from YAML: %w", err) + } + + v, ok := mConfig["storage"] + if !ok { + return errors.New("missing 'storage' section") + } + + mStorage, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage' section type: %T instead of %T", v, mStorage) + } + + v, ok = mStorage["shard"] + if !ok { + return errors.New("missing 'storage.shard' section") + } + + mShards, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage.shard' section type: %T instead of %T", v, mShards) + } + + replaceBlobovniczaWithPeapod := func(mShard map[any]any, shardDesc any) error { + v, ok := mShard["blobstor"] + if !ok { + return fmt.Errorf("missing 'blobstor' section in shard '%v' config", shardDesc) + } + + sBlobStor, ok := v.([]any) + if !ok { + return fmt.Errorf("unexpected 'blobstor' section type in shard '%v': %T instead of %T", shardDesc, v, sBlobStor) + } + + var bbczSubStorage map[any]any + + for i := range sBlobStor { + mSubStorage, ok := sBlobStor[i].(map[any]any) + if !ok { + return fmt.Errorf("unexpected sub-storage #%d type in shard '%v': %T instead of %T", i, shardDesc, v, mStorage) + } + + v, ok := mSubStorage["type"] + if ok { + typ, ok := v.(string) + if !ok { + return fmt.Errorf("unexpected type of sub-storage name: %T instead of %T", v, typ) + } + + if typ == blobovniczatree.Type { + bbczSubStorage = mSubStorage + } + + continue + } + + // in 'default' section 'type' may be missing + + _, withDepth := mSubStorage["depth"] + _, withWidth := mSubStorage["width"] + + if withWidth && withDepth { + bbczSubStorage = mSubStorage + } + } + + if bbczSubStorage == nil { + log.Printf("blobovnicza tree is not configured for the shard '%s', skip\n", shardDesc) + return nil + } + + for k := range bbczSubStorage { + switch k { + default: + delete(bbczSubStorage, k) + case "type", "path", "perm": + } + } + + bbczSubStorage["type"] = peapod.Type + + v, ok = bbczSubStorage["path"] + if ok { + path, ok := v.(string) + if !ok { + return fmt.Errorf("unexpected sub-storage path type: %T instead of %T", v, path) + } + + bbczSubStorage["path"] = filepath.Join(filepath.Dir(path), "peapod.db") + } + + return nil + } + + v, ok = mShards["default"] + if ok { + mShard, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage.shard.default' section type: %T instead of %T", v, mShard) + } + + err = replaceBlobovniczaWithPeapod(mShard, "default") + if err != nil { + return err + } + } + + for i := 0; ; i++ { + v, ok = mShards[i] + if !ok { + if i == 0 { + return errors.New("missing numbered shards") + } + break + } + + mShard, ok := v.(map[any]any) + if !ok { + return fmt.Errorf("unexpected 'storage.shard.%d' section type: %T instead of %T", i, v, mStorage) + } + + err = replaceBlobovniczaWithPeapod(mShard, i) + if err != nil { + return err + } + } + + data, err := yaml.Marshal(mConfig) + if err != nil { + return fmt.Errorf("encode modified config into YAML: %w", err) + } + + err = os.WriteFile(dstPath, data, 0640) + if err != nil { + return fmt.Errorf("write resulting config to the destination file: %w", err) + } + + return nil +} diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 5644ef7b20..74d143f6f7 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -25,6 +25,7 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" + peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" @@ -36,6 +37,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" @@ -158,9 +160,11 @@ func (c *shardCfg) id() string { type subStorageCfg struct { // common for all storages - typ string - path string - perm fs.FileMode + typ string + path string + perm fs.FileMode + + // tree-specific (FS and blobovnicza) depth uint64 noSync bool @@ -168,6 +172,9 @@ type subStorageCfg struct { size uint64 width uint64 openedCacheSize int + + // Peapod-specific + flushInterval time.Duration } // readConfig fills applicationConfiguration with raw configuration values @@ -264,6 +271,9 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() + case peapod.Type: + peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i])) + sCfg.flushInterval = peapodCfg.FlushInterval() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } @@ -732,6 +742,13 @@ func (c *cfg) shardOpts() []shardOptsWithID { return true }, }) + case peapod.Type: + ss = append(ss, blobstor.SubStorage{ + Storage: peapod.New(sRead.path, sRead.perm, sRead.flushInterval), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) default: // should never happen, that has already // been handled: when the config was read diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index c57017fb58..dc49f9a946 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -10,8 +10,10 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" + peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/stretchr/testify/require" ) @@ -132,13 +134,11 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 102400, sc.SmallSizeLimit()) require.Equal(t, 2, len(ss)) - - blz := blobovniczaconfig.From((*config.Config)(ss[0])) - require.Equal(t, "tmp/1/blob/blobovnicza", ss[0].Path()) - require.EqualValues(t, 4194304, blz.Size()) - require.EqualValues(t, 1, blz.ShallowDepth()) - require.EqualValues(t, 4, blz.ShallowWidth()) - require.EqualValues(t, 50, blz.OpenedCacheSize()) + ppd := peapodconfig.From((*config.Config)(ss[0])) + require.Equal(t, "tmp/1/blob/peapod.db", ss[0].Path()) + require.EqualValues(t, 0644, ss[0].Perm()) + require.EqualValues(t, peapod.Type, ss[0].Type()) + require.EqualValues(t, 30*time.Millisecond, ppd.FlushInterval()) require.Equal(t, "tmp/1/blob", ss[1].Path()) require.EqualValues(t, 0644, ss[1].Perm()) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/config.go b/cmd/neofs-node/config/engine/shard/blobstor/config.go index 4e6dc86c2c..fe9ccdc8cc 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/config.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/storage" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" ) // Config is a wrapper over the config section @@ -28,7 +29,7 @@ func (x *Config) Storages() []*storage.Config { switch typ { case "": return ss - case fstree.Type, blobovniczatree.Type: + case fstree.Type, blobovniczatree.Type, peapod.Type: sub := storage.From((*config.Config)(x).Sub(strconv.Itoa(i))) ss = append(ss, sub) default: diff --git a/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go b/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go index cf9df9020a..6595e9375d 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/fstree/config.go @@ -6,7 +6,7 @@ import ( ) // Config is a wrapper over the config section -// which provides access to Blobovnicza configurations. +// which provides access to FSTree configurations. type Config config.Config // DepthDefault is a default shallow dir depth. diff --git a/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go new file mode 100644 index 0000000000..cd01d21ca9 --- /dev/null +++ b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go @@ -0,0 +1,34 @@ +package peapodconfig + +import ( + "time" + + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" +) + +// Config is a wrapper over the config section +// which provides access to Peapod configurations. +type Config config.Config + +// Various Peapod config defaults. +const ( + // DefaultFlushInterval is a default time interval between Peapod's batch writes + // to disk. + DefaultFlushInterval = 10 * time.Millisecond +) + +// From wraps config section into Config. +func From(c *config.Config) *Config { + return (*Config)(c) +} + +// FlushInterval returns the value of "flush_interval" config parameter. +// +// Returns DefaultFlushInterval if the value is not a positive duration. +func (x *Config) FlushInterval() time.Duration { + d := config.DurationSafe((*config.Config)(x), "flush_interval") + if d > 0 { + return d + } + return DefaultFlushInterval +} diff --git a/cmd/neofs-node/validate.go b/cmd/neofs-node/validate.go index 5f337731d8..573e8acf58 100644 --- a/cmd/neofs-node/validate.go +++ b/cmd/neofs-node/validate.go @@ -11,6 +11,7 @@ import ( treeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/tree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" "go.uber.org/zap/zapcore" ) @@ -54,7 +55,7 @@ func validateConfig(c *config.Config) error { } for i := range blobstor { switch blobstor[i].Type() { - case fstree.Type, blobovniczatree.Type: + case fstree.Type, blobovniczatree.Type, peapod.Type: default: // FIXME #1764 (@fyrchik): this line is currently unreachable, // because we panic in `sc.BlobStor().Storages()`. diff --git a/config/example/node.env b/config/example/node.env index 2e81750530..5449b6567a 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -152,12 +152,10 @@ NEOFS_STORAGE_SHARD_1_METABASE_MAX_BATCH_DELAY=20ms NEOFS_STORAGE_SHARD_1_COMPRESS=false NEOFS_STORAGE_SHARD_1_SMALL_OBJECT_SIZE=102400 ### Blobovnicza config -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=blobovnicza -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/blobovnicza -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_SIZE=4194304 -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH=1 -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH=4 -NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50 +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=peapod +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/peapod.db +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PERM=0644 +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_FLUSH_INTERVAL=30ms ### FSTree config NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE=fstree NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH=tmp/1/blob diff --git a/config/example/node.json b/config/example/node.json index 908662ba1a..fa11730f57 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -198,13 +198,10 @@ "small_object_size": 102400, "blobstor": [ { - "type": "blobovnicza", - "path": "tmp/1/blob/blobovnicza", + "type": "peapod", + "path": "tmp/1/blob/peapod.db", "perm": "0644", - "size": 4194304, - "depth": 1, - "width": 4, - "opened_cache_capacity": 50 + "flush_interval": "30ms" }, { "type": "fstree", diff --git a/config/example/node.yaml b/config/example/node.yaml index cfd3a6a956..fde52a969f 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -194,8 +194,9 @@ storage: path: tmp/1/meta # metabase path blobstor: - - type: blobovnicza - path: tmp/1/blob/blobovnicza + - type: peapod + path: tmp/1/blob/peapod.db # path to Peapod database + flush_interval: 30ms # time interval between batch writes to disk (defaults to 10ms) - type: fstree path: tmp/1/blob # blobstor path no_sync: true diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 3ba935a3f3..6eccf54569 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -175,7 +175,7 @@ The following table describes configuration for each shard. ### `blobstor` subsection Contains a list of substorages each with it's own type. -Currently only 2 types are supported: `fstree` and `blobovnicza`. +Currently only 3 types are supported: `fstree`, `blobovnicza` and `peapod`. ```yaml blobstor: @@ -215,6 +215,13 @@ blobstor: | `width` | `int` | `16` | Blobovnicza tree width. | | `opened_cache_capacity` | `int` | `16` | Maximum number of simultaneously opened blobovniczas. | +#### `peapod` type options +| Parameter | Type | Default value | Description | +|---------------------|-----------|---------------|-------------------------------------------------------| +| `path` | `string` | | Path to the Peapod database file. | +| `perm` | file mode | `0660` | Default permission for created files and directories. | +| `flush_interval` | `duration`| `10ms` | Time interval between batch writes to disk. | + ### `gc` subsection Contains garbage-collection service configuration. It iterates over the blobstor and removes object the node no longer needs. diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index b66b412005..6a6ecd9a91 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -1,6 +1,10 @@ package common -import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" +) // Storage represents key-value object storage. // It is used as a building block for a blobstor of a shard. @@ -23,3 +27,58 @@ type Storage interface { Delete(DeletePrm) (DeleteRes, error) Iterate(IteratePrm) (IterateRes, error) } + +// Copy copies all objects from source Storage into the destination one. If any +// object cannot be stored, Copy immediately fails. +func Copy(dst, src Storage) error { + err := src.Open(true) + if err != nil { + return fmt.Errorf("open source sub-storage: %w", err) + } + + defer func() { _ = src.Close() }() + + err = src.Init() + if err != nil { + return fmt.Errorf("initialize source sub-storage: %w", err) + } + + err = dst.Open(false) + if err != nil { + return fmt.Errorf("open destination sub-storage: %w", err) + } + + defer func() { _ = dst.Close() }() + + err = dst.Init() + if err != nil { + return fmt.Errorf("initialize destination sub-storage: %w", err) + } + + _, err = src.Iterate(IteratePrm{ + Handler: func(el IterationElement) error { + exRes, err := dst.Exists(ExistsPrm{ + Address: el.Address, + }) + if err != nil { + return fmt.Errorf("check presence of object %s in the destination sub-storage: %w", el.Address, err) + } else if exRes.Exists { + return nil + } + + _, err = dst.Put(PutPrm{ + Address: el.Address, + RawData: el.ObjectData, + }) + if err != nil { + return fmt.Errorf("put object %s into destination sub-storage: %w", el.Address, err) + } + return nil + }, + }) + if err != nil { + return fmt.Errorf("iterate over source sub-storage: %w", err) + } + + return nil +} diff --git a/pkg/local_object_storage/blobstor/common/storage_test.go b/pkg/local_object_storage/blobstor/common/storage_test.go new file mode 100644 index 0000000000..33f494f913 --- /dev/null +++ b/pkg/local_object_storage/blobstor/common/storage_test.go @@ -0,0 +1,64 @@ +package common_test + +import ( + "crypto/rand" + "path/filepath" + "testing" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestCopy(t *testing.T) { + dir := t.TempDir() + const nObjects = 100 + + src := blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithBlobovniczaShallowWidth(2), + blobovniczatree.WithBlobovniczaShallowDepth(3), + blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")), + ) + + require.NoError(t, src.Open(false)) + require.NoError(t, src.Init()) + + mObjs := make(map[oid.Address][]byte, nObjects) + + for i := 0; i < nObjects; i++ { + addr := oidtest.Address() + data := make([]byte, 32) + rand.Read(data) + mObjs[addr] = data + + _, err := src.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + } + + require.NoError(t, src.Close()) + + dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600, 10*time.Millisecond) + + err := common.Copy(dst, src) + require.NoError(t, err) + + require.NoError(t, dst.Open(true)) + t.Cleanup(func() { _ = dst.Close() }) + + _, err = dst.Iterate(common.IteratePrm{ + Handler: func(el common.IterationElement) error { + data, ok := mObjs[el.Address] + require.True(t, ok) + require.Equal(t, data, el.ObjectData) + return nil + }, + }) + require.NoError(t, err) +} diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go index f54c255b97..e06c95a539 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go @@ -95,7 +95,7 @@ func TestIterate(t *testing.T, cons Constructor, min, max uint64) { } _, err := s.Iterate(iterPrm) - require.Equal(t, logicErr, err) + require.ErrorIs(t, err, logicErr) require.Equal(t, len(objects)/2, len(seen)) for i := range objects { d, ok := seen[objects[i].addr.String()] diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go new file mode 100644 index 0000000000..20aa8c595e --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -0,0 +1,494 @@ +package peapod + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io/fs" + "path/filepath" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/util/slice" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" + "github.com/nspcc-dev/neofs-node/pkg/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +type batch struct { + initErr error + + tx *bbolt.Tx + + nonIdle bool + + commitErr error + chCommitted chan struct{} + + bktRootMtx sync.Mutex + bktRoot *bbolt.Bucket +} + +// Peapod provides storage for relatively small NeoFS binary object (peas). +// Peapod is a single low-level key/value database optimized to work with big +// number of stored units. +type Peapod struct { + path string + perm fs.FileMode + + flushInterval time.Duration + + compress *compression.Config + + readOnly bool + + bolt *bbolt.DB + + currentBatchMtx sync.RWMutex + currentBatch *batch + + chClose chan struct{} + chFlushDone chan struct{} +} + +var rootBucket = []byte("root") + +// returned when BoltDB rootBucket is inaccessible within particular transaction. +var errMissingRootBucket = errors.New("missing root bucket") + +// New creates new Peapod instance to be located at the given path with +// specified permissions. +// +// Specified flush interval MUST be positive (see Init). +func New(path string, perm fs.FileMode, flushInterval time.Duration) *Peapod { + if flushInterval <= 0 { + panic(fmt.Sprintf("non-positive flush interval %v", flushInterval)) + } + return &Peapod{ + path: path, + perm: perm, + + flushInterval: flushInterval, + } +} + +func (x *Peapod) flushLoop() { + defer close(x.chFlushDone) + + t := time.NewTimer(x.flushInterval) + defer t.Stop() + + for { + select { + case <-x.chClose: + // commit current transaction to prevent bbolt.DB.Close blocking + x.flushCurrentBatch(false) + return + case <-t.C: + st := time.Now() + + x.flushCurrentBatch(true) + + interval := x.flushInterval - time.Since(st) + if interval <= 0 { + interval = time.Microsecond + } + + t.Reset(interval) + } + } +} + +func (x *Peapod) flushCurrentBatch(beginNew bool) { + x.currentBatchMtx.Lock() + + if !x.currentBatch.nonIdle { + if !beginNew && x.currentBatch.tx != nil { + _ = x.currentBatch.tx.Commit() + } + x.currentBatchMtx.Unlock() + return + } + + err := x.currentBatch.tx.Commit() + if err != nil { + err = fmt.Errorf("commit BoltDB batch transaction: %w", err) + } + + x.currentBatch.commitErr = err + close(x.currentBatch.chCommitted) + + if beginNew { + x.beginNewBatch() + } + + x.currentBatchMtx.Unlock() +} + +func (x *Peapod) beginNewBatch() { + x.currentBatch = new(batch) + + x.currentBatch.tx, x.currentBatch.initErr = x.bolt.Begin(true) + if x.currentBatch.initErr != nil { + x.currentBatch.initErr = fmt.Errorf("begin new BoltDB writable transaction: %w", x.currentBatch.initErr) + return + } + + x.currentBatch.bktRoot = x.currentBatch.tx.Bucket(rootBucket) + if x.currentBatch.bktRoot == nil { + x.currentBatch.initErr = errMissingRootBucket + return + } + + x.currentBatch.chCommitted = make(chan struct{}) +} + +const objectAddressKeySize = 2 * sha256.Size + +func keyForObject(addr oid.Address) []byte { + b := make([]byte, objectAddressKeySize) + addr.Container().Encode(b) + addr.Object().Encode(b[sha256.Size:]) + return b +} + +func decodeKeyForObject(addr *oid.Address, key []byte) error { + if len(key) != objectAddressKeySize { + return fmt.Errorf("invalid object address key size: %d instead of %d", len(key), objectAddressKeySize) + } + + var cnr cid.ID + var obj oid.ID + + err := cnr.Decode(key[:sha256.Size]) + if err != nil { + return fmt.Errorf("decode container ID: %w", err) + } + + err = obj.Decode(key[sha256.Size:]) + if err != nil { + return fmt.Errorf("decode object ID: %w", err) + } + + addr.SetContainer(cnr) + addr.SetObject(obj) + + return nil +} + +// Open opens underlying database in the specified mode. +func (x *Peapod) Open(readOnly bool) error { + err := util.MkdirAllX(filepath.Dir(x.path), x.perm) + if err != nil { + return fmt.Errorf("create directory '%s' for database: %w", x.path, err) + } + + x.bolt, err = bbolt.Open(x.path, x.perm, &bbolt.Options{ + ReadOnly: readOnly, + Timeout: 100 * time.Millisecond, // to handle flock + }) + if err != nil { + return fmt.Errorf("open BoltDB instance: %w", err) + } + + if readOnly { + err = x.bolt.View(func(tx *bbolt.Tx) error { + if tx.Bucket(rootBucket) == nil { + return errMissingRootBucket + } + return nil + }) + if err != nil { + return fmt.Errorf("check root bucket presence in BoltDB instance: %w", err) + } + } + + x.readOnly = readOnly + + return nil +} + +// Init initializes internal structure of the underlying database and runs +// flushing routine. The routine writes data batches into disk once per time +// interval configured in New. +func (x *Peapod) Init() error { + if x.readOnly { + // no extra actions needed in read-only mode + return nil + } + + err := x.bolt.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(rootBucket) + return err + }) + if err != nil { + return fmt.Errorf("create root bucket in BoltDB instance: %w", err) + } + + x.chClose = make(chan struct{}) + x.chFlushDone = make(chan struct{}) + + x.beginNewBatch() + + go x.flushLoop() + + return nil +} + +// Close syncs data and closes the database. +func (x *Peapod) Close() error { + if !x.readOnly { + close(x.chClose) + <-x.chFlushDone + } + return x.bolt.Close() +} + +// Type is peapod storage type used in logs and configuration. +const Type = "peapod" + +func (x *Peapod) Type() string { + return Type +} + +func (x *Peapod) Path() string { + return x.path +} + +func (x *Peapod) SetCompressor(cc *compression.Config) { + x.compress = cc +} + +func (x *Peapod) SetReportErrorFunc(func(string, error)) { + // no-op like FSTree +} + +// Get reads data from the underlying database by the given object address. +// Returns apistatus.ErrObjectNotFound if object is missing in the Peapod. +func (x *Peapod) Get(prm common.GetPrm) (common.GetRes, error) { + var data []byte + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + val := bktRoot.Get(keyForObject(prm.Address)) + if val == nil { + return apistatus.ErrObjectNotFound + } + + data = slice.Copy(val) + + return nil + }) + if err != nil { + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.GetRes{}, logicerr.Wrap(err) + } + return common.GetRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + // copy-paste from FSTree + data, err = x.compress.Decompress(data) + if err != nil { + return common.GetRes{}, fmt.Errorf("decompress data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, fmt.Errorf("decode object from binary: %w", err) + } + + return common.GetRes{Object: obj, RawData: data}, err +} + +// GetRange works like Get but reads specific payload range. +func (x *Peapod) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { + // copy-paste from FSTree + res, err := x.Get(common.GetPrm{Address: prm.Address}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := res.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(apistatus.ObjectOutOfRange{}) + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil +} + +// Exists checks presence of the object in the underlying database by the given +// address. +func (x *Peapod) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { + var res common.ExistsRes + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + res.Exists = bktRoot.Get(keyForObject(prm.Address)) != nil + + return nil + }) + if err != nil { + return common.ExistsRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return res, nil +} + +var storageID = []byte("peapod") + +// Put saves given data in the underlying database by specified object address. +// The data can be anything, but in practice a binary NeoFS object is expected. +// Operation is executed within provided context: if the context is done, Put +// returns its error (in this case data may be saved). +// +// Put returns common.ErrReadOnly if Peadpod is read-only. +func (x *Peapod) Put(prm common.PutPrm) (common.PutRes, error) { + if !prm.DontCompress { + prm.RawData = x.compress.Compress(prm.RawData) + } + + // Track https://github.com/nspcc-dev/neofs-node/issues/2480 + err := x.batch(context.TODO(), func(bktRoot *bbolt.Bucket) error { + return bktRoot.Put(keyForObject(prm.Address), prm.RawData) + }) + + return common.PutRes{ + StorageID: storageID, + }, err +} + +// Delete removes data associated with the given object address from the +// underlying database. Delete returns apistatus.ErrObjectNotFound if object is +// missing. +// +// Put returns common.ErrReadOnly if Peadpod is read-only. +func (x *Peapod) Delete(prm common.DeletePrm) (common.DeleteRes, error) { + // Track https://github.com/nspcc-dev/neofs-node/issues/2480 + err := x.batch(context.TODO(), func(bktRoot *bbolt.Bucket) error { + key := keyForObject(prm.Address) + if bktRoot.Get(key) == nil { + return apistatus.ErrObjectNotFound + } + + return bktRoot.Delete(key) + }) + if errors.Is(err, apistatus.ErrObjectNotFound) { + return common.DeleteRes{}, logicerr.Wrap(err) + } + + return common.DeleteRes{}, err +} + +func (x *Peapod) batch(ctx context.Context, fBktRoot func(bktRoot *bbolt.Bucket) error) error { + if x.readOnly { + return common.ErrReadOnly + } + + x.currentBatchMtx.RLock() + + currentBatch := x.currentBatch + + if currentBatch.initErr != nil { + x.currentBatchMtx.RUnlock() + return currentBatch.initErr + } + + // bbolt.Bucket.Put MUST NOT be called concurrently. This is not obvious from + // the docs, but panic occurs in practice + currentBatch.bktRootMtx.Lock() + err := fBktRoot(currentBatch.bktRoot) + currentBatch.bktRootMtx.Unlock() + if err != nil { + x.currentBatchMtx.RUnlock() + return fmt.Errorf("put object into BoltDB bucket for container: %w", err) + } + + currentBatch.nonIdle = true + + x.currentBatchMtx.RUnlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-currentBatch.chCommitted: + return currentBatch.commitErr + } +} + +// Iterate iterates over all objects stored in the underlying database and +// passes them into LazyHandler or Handler. Break on f's false return. +func (x *Peapod) Iterate(prm common.IteratePrm) (common.IterateRes, error) { + var addr oid.Address + + err := x.bolt.View(func(tx *bbolt.Tx) error { + bktRoot := tx.Bucket(rootBucket) + if bktRoot == nil { + return errMissingRootBucket + } + + return bktRoot.ForEach(func(k, v []byte) error { + err := decodeKeyForObject(&addr, k) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + + return nil + } + + return fmt.Errorf("decode object address from bucket key: %w", err) + } + + v, err = x.compress.Decompress(v) + if err != nil { + if prm.IgnoreErrors { + if prm.ErrorHandler != nil { + return prm.ErrorHandler(addr, err) + } + + return nil + } + + return fmt.Errorf("decompress value for object '%s': %w", addr, err) + } + + if prm.LazyHandler != nil { + return prm.LazyHandler(addr, func() ([]byte, error) { + return v, nil + }) + } + + return prm.Handler(common.IterationElement{ + ObjectData: v, + Address: addr, + StorageID: storageID, + }) + }) + }) + if err != nil { + return common.IterateRes{}, fmt.Errorf("exec read-only BoltDB transaction: %w", err) + } + + return common.IterateRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go new file mode 100644 index 0000000000..f428a72c41 --- /dev/null +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -0,0 +1,300 @@ +package peapod_test + +import ( + "crypto/rand" + "fmt" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + "github.com/stretchr/testify/require" +) + +func TestGeneric(t *testing.T) { + newPath := func() string { + return filepath.Join(t.TempDir(), "peapod.db") + } + + blobstortest.TestAll(t, func(t *testing.T) common.Storage { + return peapod.New(newPath(), 0600, 10*time.Millisecond) + }, 2048, 16*1024) + + t.Run("info", func(t *testing.T) { + path := newPath() + blobstortest.TestInfo(t, func(t *testing.T) common.Storage { + return peapod.New(path, 0600, 10*time.Millisecond) + }, peapod.Type, path) + }) +} + +func TestControl(t *testing.T) { + blobstortest.TestControl(t, func(t *testing.T) common.Storage { + return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600, 10*time.Millisecond) + }, 2048, 2048) +} + +func testPeapodPath(tb testing.TB) string { + return filepath.Join(tb.TempDir(), "peapod.db") +} + +func newTestPeapod(tb testing.TB) *peapod.Peapod { + ppd := _newTestPeapod(tb, testPeapodPath(tb), false) + tb.Cleanup(func() { _ = ppd.Close() }) + return ppd +} + +// creates new read-only peapod.Peapod with one stored object. +func newTestPeapodReadOnly(tb testing.TB) (*peapod.Peapod, oid.Address) { + path := testPeapodPath(tb) + + ppd := _newTestPeapod(tb, path, false) + addr := oidtest.Address() + + _, err := ppd.Put(common.PutPrm{ + Address: addr, + RawData: []byte("Hello, world!"), + DontCompress: false, + }) + require.NoError(tb, err) + require.NoError(tb, ppd.Close()) + + ppd = _newTestPeapod(tb, path, true) + + tb.Cleanup(func() { _ = ppd.Close() }) + + return ppd, addr +} + +func _newTestPeapod(tb testing.TB, path string, readOnly bool) *peapod.Peapod { + ppd := peapod.New(path, 0600, 10*time.Millisecond) + require.NoError(tb, ppd.Open(readOnly)) + require.NoError(tb, ppd.Init()) + + return ppd +} + +func TestPeapod_Get(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + getPrm := common.GetPrm{Address: addr} + + _, err = ppd.Get(getPrm) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err := ppd.Get(getPrm) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) +} + +func TestPeapod_Exists(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + data := []byte("Hello, world!") + + existsPrm := common.ExistsPrm{ + Address: addr, + } + + res, err := ppd.Exists(existsPrm) + require.NoError(t, err) + require.False(t, res.Exists) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err = ppd.Exists(existsPrm) + require.NoError(t, err) + require.True(t, res.Exists) +} + +func TestPeapod_Iterate(t *testing.T) { + ppd := newTestPeapod(t) + + mSrc := map[oid.Address][]byte{ + oidtest.Address(): {1, 2, 3}, + oidtest.Address(): {4, 5, 6}, + oidtest.Address(): {7, 8, 9}, + } + + mDst := make(map[oid.Address][]byte) + + f := func(el common.IterationElement) error { + mDst[el.Address] = el.ObjectData + return nil + } + + iterPrm := common.IteratePrm{ + Handler: f, + } + + _, err := ppd.Iterate(iterPrm) + require.NoError(t, err) + require.Empty(t, mDst) + + for addr, data := range mSrc { + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + } + + _, err = ppd.Iterate(iterPrm) + require.NoError(t, err) + require.Equal(t, mSrc, mDst) +} + +func TestPeapod_Put(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + res, err := ppd.Get(common.GetPrm{ + Address: addr, + }) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) + + t.Run("read-only", func(t *testing.T) { + ppd, _ := newTestPeapodReadOnly(t) + + _, err := ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.ErrorIs(t, err, common.ErrReadOnly) + }) +} + +func TestPeapod_Delete(t *testing.T) { + ppd := newTestPeapod(t) + addr := oidtest.Address() + obj := objecttest.Object(t) + + data, err := obj.Marshal() + require.NoError(t, err) + + _, err = ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + _, err = ppd.Put(common.PutPrm{ + Address: addr, + RawData: data, + }) + require.NoError(t, err) + + getPrm := common.GetPrm{ + Address: addr, + } + + res, err := ppd.Get(getPrm) + require.NoError(t, err) + require.Equal(t, data, res.RawData) + require.Equal(t, obj, res.Object) + + _, err = ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.NoError(t, err) + + res, err = ppd.Get(getPrm) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + + t.Run("read-only", func(t *testing.T) { + ppd, addr := newTestPeapodReadOnly(t) + + _, err := ppd.Delete(common.DeletePrm{ + Address: addr, + }) + require.ErrorIs(t, err, common.ErrReadOnly) + }) +} + +func benchmark(b *testing.B, ppd *peapod.Peapod, objSize uint64, nThreads int) { + data := make([]byte, objSize) + rand.Read(data) + + prm := common.PutPrm{ + RawData: data, + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + + for i := 0; i < nThreads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + prm := prm + prm.Address = oidtest.Address() + + _, err := ppd.Put(prm) + require.NoError(b, err) + }() + } + + wg.Wait() + } +} + +func BenchmarkPeapod_Put(b *testing.B) { + ppd := newTestPeapod(b) + + for _, tc := range []struct { + objSize uint64 + nThreads int + }{ + {1, 1}, + {1, 20}, + {1, 100}, + {1 << 10, 1}, + {1 << 10, 20}, + {1 << 10, 100}, + {100 << 10, 1}, + {100 << 10, 20}, + {100 << 10, 100}, + } { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + benchmark(b, ppd, tc.objSize, tc.nThreads) + }) + } +}