Skip to content

Commit

Permalink
added import db mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Mar 22, 2024
1 parent 205614c commit 4967f79
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 14 deletions.
5 changes: 5 additions & 0 deletions cmd/connector/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ var (
Name: "disable-ansi-color",
Usage: "Boolean option for disabling ANSI colors in the logging system.",
}

importDBMode = cli.BoolFlag{
Name: "import-db-mode",
Usage: "Boolean option for enabling import db mode.",
}
)
5 changes: 4 additions & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
logLevel,
logSaveFile,
disableAnsiColor,
importDBMode,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -73,7 +74,9 @@ func startConnector(ctx *cli.Context) error {
}
}

dataProcessor, err := factory.CreateDataProcessor(cfg)
importDBMode := ctx.GlobalBool(importDBMode.Name)

dataProcessor, err := factory.CreateDataProcessor(cfg, importDBMode)
if err != nil {
return fmt.Errorf("cannot create ws firehose data processor, error: %w", err)
}
Expand Down
34 changes: 21 additions & 13 deletions factory/dataProcessorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// CreateDataProcessor will create a new instance of data processor
func CreateDataProcessor(cfg config.Config) (websocket.PayloadHandler, error) {
func CreateDataProcessor(cfg config.Config, importDBMode bool) (websocket.PayloadHandler, error) {
protoMarshaller := &marshal.GogoProtoMarshalizer{}

blockContainer, err := createBlockContainer()
Expand All @@ -30,18 +30,7 @@ func CreateDataProcessor(cfg config.Config) (websocket.PayloadHandler, error) {
return nil, err
}

cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.CacheType(cfg.OutportBlocksStorage.Cache.Type),
SizeInBytes: cfg.OutportBlocksStorage.Cache.SizeInBytes,
Capacity: cfg.OutportBlocksStorage.Cache.Capacity,
}

cacher, err := storageUnit.NewCache(cacheConfig)
if err != nil {
return nil, err
}

storer, err := process.NewPruningStorer(cfg.OutportBlocksStorage.DB, cacher, cfg.DataPool.NumPersistersToKeep)
storer, err := createStorer(cfg, importDBMode)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,3 +66,22 @@ func createBlockContainer() (process.BlockContainerHandler, error) {

return container, nil
}

func createStorer(cfg config.Config, importDBMode bool) (process.PruningStorer, error) {
cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.CacheType(cfg.OutportBlocksStorage.Cache.Type),
SizeInBytes: cfg.OutportBlocksStorage.Cache.SizeInBytes,
Capacity: cfg.OutportBlocksStorage.Cache.Capacity,
}

cacher, err := storageUnit.NewCache(cacheConfig)
if err != nil {
return nil, err
}

if importDBMode {
return process.NewImportDBStorer(cacher)
}

return process.NewPruningStorer(cfg.OutportBlocksStorage.DB, cacher, cfg.DataPool.NumPersistersToKeep)
}
3 changes: 3 additions & 0 deletions process/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ var ErrInvalidFilePath = errors.New("invalid file path provided")

// ErrNilPruningStorer signals that a nil pruning storer was provide
var ErrNilPruningStorer = errors.New("nil pruning storer")

// ErrNilCacher signals that a nil cacher was provided
var ErrNilCacher = errors.New("nil cacher")
50 changes: 50 additions & 0 deletions process/importDBStorer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package process

import (
"encoding/hex"
"fmt"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-storage-go/types"
)

type importDBStorer struct {
cacher types.Cacher
}

// NewImportDBStorer creates a new import db storer instancer
func NewImportDBStorer(cacher types.Cacher) (*importDBStorer, error) {
if check.IfNil(cacher) {
return nil, ErrNilCacher
}

return &importDBStorer{
cacher: cacher,
}, nil
}

// Get will get value from cache
func (is *importDBStorer) Get(key []byte) ([]byte, error) {
v, ok := is.cacher.Get(key)
if ok {
return v.([]byte), nil
}

return nil, fmt.Errorf("key %s not found", hex.EncodeToString(key))
}

// Put will put data into cacher
func (is *importDBStorer) Put(key []byte, data []byte) error {
is.cacher.Put(key, data, len(data))
return nil
}

// Prune returns nil
func (is *importDBStorer) Prune(_ uint64) error {
return nil
}

// IsInterfaceNil returns nil if there is no value under the interface
func (is *importDBStorer) IsInterfaceNil() bool {
return is == nil
}
45 changes: 45 additions & 0 deletions process/importDBStorer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package process_test

import (
"testing"

"github.com/multiversx/mx-chain-ws-connector-template-go/process"
"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
"github.com/stretchr/testify/require"
)

func TestNewImportDBStorer(t *testing.T) {
t.Parallel()

t.Run("nil cacher", func(t *testing.T) {
t.Parallel()

is, err := process.NewImportDBStorer(nil)
require.Nil(t, is)
require.Equal(t, process.ErrNilCacher, err)
})

t.Run("should work", func(t *testing.T) {
t.Parallel()

is, err := process.NewImportDBStorer(testscommon.NewCacherMock())
require.Nil(t, err)
require.False(t, is.IsInterfaceNil())

key := []byte("key1")
val := []byte("val1")

err = is.Put(key, val)
require.Nil(t, err)

ret, err := is.Get(key)
require.Nil(t, err)
require.Equal(t, val, ret)

ret, err = is.Get([]byte("not existing"))
require.Nil(t, ret)
require.Error(t, err)

require.Nil(t, is.Prune(2))
})
}
7 changes: 7 additions & 0 deletions process/pruningStorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"sync"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-storage-go/leveldb"
"github.com/multiversx/mx-chain-storage-go/storageUnit"
"github.com/multiversx/mx-chain-storage-go/types"
Expand All @@ -23,7 +24,11 @@ type pruningStorer struct {
numPersistersToKeep int
}

// NewPruningStorer will create a new instance of pruning storer
func NewPruningStorer(cfg config.DBConfig, cacher types.Cacher, numPersistersToKeep int) (*pruningStorer, error) {
if check.IfNil(cacher) {
return nil, ErrNilCacher
}
if cfg.FilePath == "" {
return nil, ErrInvalidFilePath
}
Expand Down Expand Up @@ -121,6 +126,7 @@ func reverseSlice(s []string) []string {
return s
}

// Get will get data from cacher or storer
func (ps *pruningStorer) Get(key []byte) ([]byte, error) {
v, ok := ps.cacher.Get(key)
if ok {
Expand All @@ -146,6 +152,7 @@ func (ps *pruningStorer) Get(key []byte) ([]byte, error) {
return nil, fmt.Errorf("key %s not found", hex.EncodeToString(key))
}

// Put will try to put data to cacher and storer
func (ps *pruningStorer) Put(key, data []byte) error {
ps.cacher.Put(key, data, len(data))

Expand Down

0 comments on commit 4967f79

Please sign in to comment.