Skip to content

Commit

Permalink
Introduce tool to migrate objects from Blobovnicza tree to Peapod
Browse files Browse the repository at this point in the history
Add `cmd/blobovnicza-to-peapod` application which accepts YAML
configuration file of the storage node and, for each configured shard,
overtakes data from Blobovnicza tree to Peapod created in the parent
directory.

The tool is going to be used for phased and safe rejection of the
Blobovnicza trees and the transition to Peapods.

Refs nspcc-dev#2453.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Aug 15, 2023
1 parent ec80229 commit e36ad52
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 1 deletion.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changelog for NeoFS Node
- Histogram metrics for RPC and engine operations (#2351)
- SN's version is announced via the attributes automatically but can be overwritten explicitly (#2455)
- New storage component for small objects named Peapod (#2453)
- New `blobovnicza-to-peapod` tool providing blobovnicza-to-peapod data migration (#2453)

### Fixed
- `neo-go` RPC connection loss handling (#1337)
Expand Down Expand Up @@ -72,6 +73,17 @@ Docker images now contain a single executable file and SSL certificates only.

`neofs-cli control healthcheck` exit code is `0` only for "READY" state.

To migrate data from Blobovnicza trees to Peapods:
```shell
$ blobovnicza-to-peapod -config </path/to/storage/node/config>
```
For any shard, the data from the configured Blobovnicza tree is copied into
a created Peapod file named `peapod.db` in the directory where the tree is
located. For example, `/neofs/data/blobovcniza/*` -> `/neofs/data/peapod.db`.
Notice that existing Blobovnicza trees are untouched. Configuration is also
updated, for example, `/etc/neofs/config.yaml` -> `/etc/neofs/config_peapod.yaml`.
WARN: carefully review the updated config before using it in the application!

## [0.37.0] - 2023-06-15 - Sogado

### Added
Expand Down
261 changes: 261 additions & 0 deletions cmd/blobovnicza-to-peapod/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package main

import (
"errors"
"flag"
"fmt"
"io/fs"
"log"
"os"
"path/filepath"
"strings"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod"
"gopkg.in/yaml.v3"
)

func main() {
nodeCfgPath := flag.String("config", "", "Path to storage node's YAML configuration file")

flag.Parse()

if *nodeCfgPath == "" {
log.Fatal("missing storage node config flag")
}

appCfg := config.New(config.Prm{}, config.WithConfigFile(*nodeCfgPath))

err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
log.Println("processing shard...")

var bbcz common.Storage
var perm fs.FileMode
storagesCfg := sc.BlobStor().Storages()

for i := range storagesCfg {
if storagesCfg[i].Type() == blobovniczatree.Type {
bbczCfg := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))

perm = storagesCfg[i].Perm()
bbcz = blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(storagesCfg[i].Path()),
blobovniczatree.WithPermissions(storagesCfg[i].Perm()),
blobovniczatree.WithBlobovniczaSize(bbczCfg.Size()),
blobovniczatree.WithBlobovniczaShallowDepth(bbczCfg.ShallowDepth()),
blobovniczatree.WithBlobovniczaShallowWidth(bbczCfg.ShallowWidth()),
blobovniczatree.WithOpenedCacheSize(bbczCfg.OpenedCacheSize()))

break
}
}

if bbcz == nil {
log.Println("Blobovnicza is not configured for the current shard, going to next one...")
return nil
}

bbczPath := bbcz.Path()
if !filepath.IsAbs(bbczPath) {
log.Fatalf("Blobobvnicza tree path '%s' is not absolute, make it like this in the config file first\n", bbczPath)
}

ppdPath := filepath.Join(filepath.Dir(bbcz.Path()), "peapod.db")
ppd := peapod.New(ppdPath, perm)

var compressCfg compression.Config
compressCfg.Enabled = sc.Compress()
compressCfg.UncompressableContentTypes = sc.UncompressableContentTypes()

err := compressCfg.Init()
if err != nil {
log.Fatal("init compression config for the current shard: ", err)
}

bbcz.SetCompressor(&compressCfg)
ppd.SetCompressor(&compressCfg)

log.Printf("migrating data from Blobovnicza tree '%s' to Peapod '%s'...\n", bbcz.Path(), ppd.Path())

err = common.Copy(ppd, bbcz)
if err != nil {
log.Fatal("migration failed: ", err)
}

log.Println("data successfully migrated in the current shard, going to the next one...")

return nil
})
if err != nil {
log.Fatal(err)
}

srcPath := *nodeCfgPath
ss := strings.Split(srcPath, ".")
ss[0] += "_peapod"

dstPath := strings.Join(ss, ".")

log.Printf("data successfully migrated in all shards, migrating configuration to '%s' file...\n", dstPath)

err = migrateConfigToPeapod(dstPath, srcPath)
if err != nil {
log.Fatal(err)
}
}

func migrateConfigToPeapod(dstPath, srcPath string) error {
fData, err := os.ReadFile(srcPath)
if err != nil {
return fmt.Errorf("read source config config file: %w", err)
}

var mConfig map[any]any

err = yaml.Unmarshal(fData, &mConfig)
if err != nil {
return fmt.Errorf("decode config from YAML: %w", err)
}

v, ok := mConfig["storage"]
if !ok {
return errors.New("missing 'storage' section")
}

mStorage, ok := v.(map[any]any)
if !ok {
return fmt.Errorf("unexpected 'storage' section type: %T instead of %T", v, mStorage)
}

v, ok = mStorage["shard"]
if !ok {
return errors.New("missing 'storage.shard' section")
}

mShards, ok := v.(map[any]any)
if !ok {
return fmt.Errorf("unexpected 'storage.shard' section type: %T instead of %T", v, mShards)
}

replaceBlobovniczaWithPeapod := func(mShard map[any]any, shardDesc any) error {
v, ok := mShard["blobstor"]
if !ok {
return fmt.Errorf("missing 'blobstor' section in shard '%v' config", shardDesc)
}

sBlobStor, ok := v.([]any)
if !ok {
return fmt.Errorf("unexpected 'blobstor' section type in shard '%v': %T instead of %T", shardDesc, v, sBlobStor)
}

var bbczSubStorage map[any]any

for i := range sBlobStor {
mSubStorage, ok := sBlobStor[i].(map[any]any)
if !ok {
return fmt.Errorf("unexpected sub-storage #%d type in shard '%v': %T instead of %T", i, shardDesc, v, mStorage)
}

v, ok := mSubStorage["type"]
if ok {
typ, ok := v.(string)
if !ok {
return fmt.Errorf("unexpected type of sub-storage name: %T instead of %T", v, typ)
}

if typ == blobovniczatree.Type {
bbczSubStorage = mSubStorage
}

continue
}

// in 'default' section 'type' may be missing

_, withDepth := mSubStorage["depth"]
_, withWidth := mSubStorage["width"]

if withWidth && withDepth {
bbczSubStorage = mSubStorage
}
}

if bbczSubStorage == nil {
log.Printf("blobovnicza tree is not configured for the shard '%s', skip\n", shardDesc)
return nil
}

for k := range bbczSubStorage {
switch k {
default:
delete(bbczSubStorage, k)
case "type", "path", "perm":
}
}

bbczSubStorage["type"] = peapod.Type

v, ok = bbczSubStorage["path"]
if ok {
path, ok := v.(string)
if !ok {
return fmt.Errorf("unexpected sub-storage path type: %T instead of %T", v, path)
}

bbczSubStorage["path"] = filepath.Join(filepath.Dir(path), "peapod.db")
}

return nil
}

v, ok = mShards["default"]
if ok {
mShard, ok := v.(map[any]any)
if !ok {
return fmt.Errorf("unexpected 'storage.shard.default' section type: %T instead of %T", v, mShard)
}

err = replaceBlobovniczaWithPeapod(mShard, "default")
if err != nil {
return err
}
}

for i := 0; ; i++ {
v, ok = mShards[i]
if !ok {
if i == 0 {
return errors.New("missing numbered shards")
}
break
}

mShard, ok := v.(map[any]any)
if !ok {
return fmt.Errorf("unexpected 'storage.shard.%d' section type: %T instead of %T", i, v, mStorage)
}

err = replaceBlobovniczaWithPeapod(mShard, i)
if err != nil {
return err
}
}

data, err := yaml.Marshal(mConfig)
if err != nil {
return fmt.Errorf("encode modified config into YAML: %w", err)
}

err = os.WriteFile(dstPath, data, 0640)
if err != nil {
return fmt.Errorf("write resulting config to the destination file: %w", err)
}

return nil
}
61 changes: 60 additions & 1 deletion pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package common

import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
)

// Storage represents key-value object storage.
// It is used as a building block for a blobstor of a shard.
Expand All @@ -23,3 +27,58 @@ type Storage interface {
Delete(DeletePrm) (DeleteRes, error)
Iterate(IteratePrm) (IterateRes, error)
}

// Copy copies all objects from source Storage into the destination one. If any
// object cannot be stored, Copy immediately fails.
func Copy(dst, src Storage) error {
err := src.Open(true)
if err != nil {
return fmt.Errorf("open source sub-storage: %w", err)
}

defer func() { _ = src.Close() }()

err = src.Init()
if err != nil {
return fmt.Errorf("initialize source sub-storage: %w", err)
}

err = dst.Open(false)
if err != nil {
return fmt.Errorf("open destination sub-storage: %w", err)
}

defer func() { _ = dst.Close() }()

err = dst.Init()
if err != nil {
return fmt.Errorf("initialize destination sub-storage: %w", err)
}

_, err = src.Iterate(IteratePrm{
Handler: func(el IterationElement) error {
exRes, err := dst.Exists(ExistsPrm{
Address: el.Address,
})
if err != nil {
return fmt.Errorf("check presence of object %s in the destination sub-storage: %w", el.Address, err)
} else if exRes.Exists {
return nil
}

_, err = dst.Put(PutPrm{
Address: el.Address,
RawData: el.ObjectData,
})
if err != nil {
return fmt.Errorf("put object %s into destination sub-storage: %w", el.Address, err)
}
return nil
},
})
if err != nil {
return fmt.Errorf("iterate over source sub-storage: %w", err)
}

return nil
}
Loading

0 comments on commit e36ad52

Please sign in to comment.