Skip to content

Commit

Permalink
neofs-lens: add new command object get
Browse files Browse the repository at this point in the history
This command pulls out the object from the StorageEngine physical
snapshot.

Closes #1336.

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Sep 5, 2023
1 parent 9871712 commit 1ac23c8
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 169 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ minor release, the component will be purged, so be prepared (see `Updating` sect
- `neofs-cli object nodes` command to get SNs for an object (#2512)
- Fetching container estimations via iterators to prevent NeoVM stack overflow (#2173)
- `neofs-adm morph netmap-candidates` CLI command (#1889)
- `neofs-lens storage inspect` CLI command (#1336)

### Fixed
- `neo-go` RPC connection loss handling (#1337)
Expand Down
9 changes: 9 additions & 0 deletions cmd/neofs-lens/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
flagAddress = "address"
flagEnginePath = "path"
flagOutFile = "out"
flagConfigFile = "config"
)

// AddAddressFlag adds the address flag to the passed cobra command.
Expand All @@ -33,3 +34,11 @@ func AddOutputFileFlag(cmd *cobra.Command, v *string) {
"File to save object payload")
_ = cmd.MarkFlagFilename(flagOutFile)
}

// AddConfigFileFlag adds the config file flag to the passed cobra command.
func AddConfigFileFlag(cmd *cobra.Command, v *string) {
cmd.Flags().StringVar(v, flagConfigFile, "",
"Path to file with storage engine config")
_ = cmd.MarkFlagFilename(flagConfigFile)
_ = cmd.MarkFlagRequired(flagEnginePath)
}
40 changes: 40 additions & 0 deletions cmd/neofs-lens/internal/storage/inspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package storage

import (
common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/spf13/cobra"
)

var getObjectCMD = &cobra.Command{
Use: "inspect",
Short: "Get object from the NeoFS node's storage snapshot",
Long: "Get object from the NeoFS node's storage snapshot",
Args: cobra.NoArgs,
Run: getObject,
}

func init() {
common.AddAddressFlag(getObjectCMD, &vAddress)
common.AddOutputFileFlag(getObjectCMD, &vOut)
common.AddConfigFileFlag(getObjectCMD, &vConfig)
}

func getObject(cmd *cobra.Command, _ []string) {
var addr oid.Address

err := addr.DecodeString(vAddress)
common.ExitOnErr(cmd, common.Errf("invalid address argument: %w", err))

storage := openEngine(cmd)
defer storage.Close()

obj, err := engine.Get(storage, addr)
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))

common.PrintObjectHeader(cmd, *obj)
data, err := obj.Marshal()
common.ExitOnErr(cmd, common.Errf("could not marshal object: %w", err))
common.WriteObjectToFile(cmd, vOut, data)
}
278 changes: 278 additions & 0 deletions cmd/neofs-lens/internal/storage/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package storage

import (
"fmt"
"time"

common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/storage"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/panjf2000/ants/v2"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
)

var (
vAddress string
vOut string
vConfig string
)

var Root = &cobra.Command{
Use: "storage",
Short: "Operations with an object",
Args: cobra.NoArgs,
}

func init() {
Root.AddCommand(
getObjectCMD,
)
}

type shardOptsWithID struct {
configID string
shOpts []shard.Option
}

type epochState struct {
}

func (e epochState) CurrentEpoch() uint64 {
return 0
}

func openEngine(cmd *cobra.Command) *engine.StorageEngine {
appCfg := config.New(config.Prm{}, config.WithConfigFile(vConfig))

ls := engine.New()

var shards []storage.ShardCfg
err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error {
var sh storage.ShardCfg

sh.RefillMetabase = sc.RefillMetabase()
sh.Mode = sc.Mode()
sh.Compress = sc.Compress()
sh.UncompressableContentType = sc.UncompressableContentTypes()
sh.SmallSizeObjectLimit = sc.SmallSizeLimit()

// write-cache

writeCacheCfg := sc.WriteCache()
if writeCacheCfg.Enabled() {
wc := &sh.WritecacheCfg

wc.Enabled = true
wc.Path = writeCacheCfg.Path()
wc.MaxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.MaxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.MaxObjSize = writeCacheCfg.MaxObjectSize()
wc.SmallObjectSize = writeCacheCfg.SmallObjectSize()
wc.FlushWorkerCount = writeCacheCfg.WorkersNumber()
wc.SizeLimit = writeCacheCfg.SizeLimit()
wc.NoSync = writeCacheCfg.NoSync()
}

// blobstor with substorages

blobStorCfg := sc.BlobStor()
storagesCfg := blobStorCfg.Storages()
metabaseCfg := sc.Metabase()
gcCfg := sc.GC()

if config.BoolSafe(appCfg.Sub("tree"), "enabled") {
piloramaCfg := sc.Pilorama()
pr := &sh.PiloramaCfg

pr.Enabled = true
pr.Path = piloramaCfg.Path()
pr.Perm = piloramaCfg.Perm()
pr.NoSync = piloramaCfg.NoSync()
pr.MaxBatchSize = piloramaCfg.MaxBatchSize()
pr.MaxBatchDelay = piloramaCfg.MaxBatchDelay()
}

ss := make([]storage.SubStorageCfg, 0, len(storagesCfg))
for i := range storagesCfg {
var sCfg storage.SubStorageCfg

sCfg.Typ = storagesCfg[i].Type()
sCfg.Path = storagesCfg[i].Path()
sCfg.Perm = storagesCfg[i].Perm()

switch storagesCfg[i].Type() {
case blobovniczatree.Type:
sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))

sCfg.Size = sub.Size()
sCfg.Depth = sub.ShallowDepth()
sCfg.Width = sub.ShallowWidth()
sCfg.OpenedCacheSize = sub.OpenedCacheSize()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.Depth = sub.Depth()
sCfg.NoSync = sub.NoSync()
case peapod.Type:
peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i]))
sCfg.FlushInterval = peapodCfg.FlushInterval()
default:
return fmt.Errorf("can't initiate storage. invalid storage type: %s", storagesCfg[i].Type())
}

ss = append(ss, sCfg)
}

sh.SubStorages = ss

// meta

m := &sh.MetaCfg

m.Path = metabaseCfg.Path()
m.Perm = metabaseCfg.BoltDB().Perm()
m.MaxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay()
m.MaxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()

// GC

sh.GcCfg.RemoverBatchSize = gcCfg.RemoverBatchSize()
sh.GcCfg.RemoverSleepInterval = gcCfg.RemoverSleepInterval()

shards = append(shards, sh)

return nil
})
common.ExitOnErr(cmd, err)

var shardsWithMeta []shardOptsWithID
for _, shCfg := range shards {
var writeCacheOpts []writecache.Option
if wcRead := shCfg.WritecacheCfg; wcRead.Enabled {
writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.Path),
writecache.WithMaxBatchSize(wcRead.MaxBatchSize),
writecache.WithMaxBatchDelay(wcRead.MaxBatchDelay),
writecache.WithMaxObjectSize(wcRead.MaxObjSize),
writecache.WithSmallObjectSize(wcRead.SmallObjectSize),
writecache.WithFlushWorkersCount(wcRead.FlushWorkerCount),
writecache.WithMaxCacheSize(wcRead.SizeLimit),
writecache.WithNoSync(wcRead.NoSync),
)
}

var piloramaOpts []pilorama.Option
if prRead := shCfg.PiloramaCfg; prRead.Enabled {
piloramaOpts = append(piloramaOpts,
pilorama.WithPath(prRead.Path),
pilorama.WithPerm(prRead.Perm),
pilorama.WithNoSync(prRead.NoSync),
pilorama.WithMaxBatchSize(prRead.MaxBatchSize),
pilorama.WithMaxBatchDelay(prRead.MaxBatchDelay),
)
}

var ss []blobstor.SubStorage
for _, sRead := range shCfg.SubStorages {
switch sRead.Typ {
case blobovniczatree.Type:
ss = append(ss, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(sRead.Path),
blobovniczatree.WithPermissions(sRead.Perm),
blobovniczatree.WithBlobovniczaSize(sRead.Size),
blobovniczatree.WithBlobovniczaShallowDepth(sRead.Depth),
blobovniczatree.WithBlobovniczaShallowWidth(sRead.Width),
blobovniczatree.WithOpenedCacheSize(sRead.OpenedCacheSize)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.SmallSizeObjectLimit
},
})
case fstree.Type:
ss = append(ss, blobstor.SubStorage{
Storage: fstree.New(
fstree.WithPath(sRead.Path),
fstree.WithPerm(sRead.Perm),
fstree.WithDepth(sRead.Depth),
fstree.WithNoSync(sRead.NoSync)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return true
},
})
case peapod.Type:
ss = append(ss, blobstor.SubStorage{
Storage: peapod.New(sRead.Path, sRead.Perm, sRead.FlushInterval),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.SmallSizeObjectLimit
},
})
default:
// should never happen, that has already
// been handled: when the config was read
}
}

var sh shardOptsWithID
sh.configID = shCfg.ID()
sh.shOpts = []shard.Option{
shard.WithRefillMetabase(shCfg.RefillMetabase),
shard.WithMode(shCfg.Mode),
shard.WithBlobStorOptions(
blobstor.WithCompressObjects(shCfg.Compress),
blobstor.WithUncompressableContentTypes(shCfg.UncompressableContentType),
blobstor.WithStorages(ss),
),
shard.WithMetaBaseOptions(
meta.WithPath(shCfg.MetaCfg.Path),
meta.WithPermissions(shCfg.MetaCfg.Perm),
meta.WithMaxBatchSize(shCfg.MetaCfg.MaxBatchSize),
meta.WithMaxBatchDelay(shCfg.MetaCfg.MaxBatchDelay),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),

meta.WithEpochState(epochState{}),
),
shard.WithPiloramaOptions(piloramaOpts...),
shard.WithWriteCache(shCfg.WritecacheCfg.Enabled),
shard.WithWriteCacheOptions(writeCacheOpts...),
shard.WithRemoverBatchSize(shCfg.GcCfg.RemoverBatchSize),
shard.WithGCRemoverSleepInterval(shCfg.GcCfg.RemoverSleepInterval),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
common.ExitOnErr(cmd, err)

return pool
}),
}

shardsWithMeta = append(shardsWithMeta, sh)
}

for _, optsWithMeta := range shardsWithMeta {
_, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithMode(mode.ReadOnly))...)
common.ExitOnErr(cmd, err)
}

common.ExitOnErr(cmd, ls.Open())
common.ExitOnErr(cmd, ls.Init())

return ls
}
2 changes: 2 additions & 0 deletions cmd/neofs-lens/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal/blobovnicza"
"github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal/meta"
"github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal/peapod"
"github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal/storage"
"github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal/writecache"
"github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/util/gendoc"
Expand Down Expand Up @@ -40,6 +41,7 @@ func init() {
blobovnicza.Root,
meta.Root,
writecache.Root,
storage.Root,
gendoc.Command(command),
)
}
Expand Down
Loading

0 comments on commit 1ac23c8

Please sign in to comment.