Skip to content

Commit

Permalink
updated config + refactoring factory
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Mar 20, 2024
1 parent a4f5236 commit 4f38d98
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 160 deletions.
24 changes: 24 additions & 0 deletions cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,43 @@
# IP with port used to recieve data via ws. Should be compatible with the one from node. See [HostDriverConfig].URL
# from https://github.com/multiversx/mx-chain-go/blob/master/cmd/node/config/external.toml.
url = "localhost:22111"

# Possible values: json, gogo protobuf. Should be compatible with [HostDriverConfig].MarshallerType
marshaller_type = "gogo protobuf"

# This flag describes the mode to start the WebSocket connector. Can be "client" or "server"
mode = "server"

# Retry duration (receive/send data/acknowledge) in seconds
retry_duration = 5

# This flag specifies if we should send an acknowledge signal upon recieving data
with_acknowledge = true

# The duration in seconds to wait for an acknowledgement message
acknowledge_timeout_in_sec = 5

# Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block
# incoming data in case of a local error, this should be set to true.
blocking_ack_on_error = true

# This flag specifies if we should drop messages if there is no connection to the host
drop_messages_if_no_connection = false
version = 1

[DataPool]
NumberOfShards = 3
MaxDelta = 10

[OutportBlocksStorage]
[OutportBlocksStorage.Cache]
Name = "OutportBlocksStorage"
Capacity = 100
Type = "SizeLRU"
SizeInBytes = 209715200 # 200MB
[OutportBlocksStorage.DB]
FilePath = "OutportBlocks"
Type = "LvlDBSerial"
BatchDelaySeconds = 2
MaxBatchSize = 100
MaxOpenFiles = 10
7 changes: 6 additions & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ func startConnector(ctx *cli.Context) error {
}
}

wsClient, err := factory.CreateWSConnector(cfg.WebSocketConfig)
dataProcessor, err := factory.CreateDataProcessor(cfg)
if err != nil {
return fmt.Errorf("cannot create ws firehose data processor, error: %w", err)
}

wsClient, err := factory.CreateWSConnector(cfg.WebSocketConfig, dataProcessor)
if err != nil {
return fmt.Errorf("cannot create ws firehose connector, error: %w", err)
}
Expand Down
33 changes: 32 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package config

// Config holds general configuration
type Config struct {
WebSocketConfig WebSocketConfig `toml:"web_socket"`
WebSocketConfig WebSocketConfig `toml:"web_socket"`
DataPoolConfig DataPoolConfig
OutportBlocksStorage StorageConfig
}

// WebSocketConfig holds web sockets config
Expand All @@ -17,3 +19,32 @@ type WebSocketConfig struct {
DropMessagesIfNoConnection bool `toml:"drop_messages_if_no_connection"` // Set to `true` to drop messages if there is no active WebSocket connection to send to.
Version uint32 `toml:"version"` // Defines the payload version.
}

// DataPoolConfig will map data poil configuration
type DataPoolConfig struct {
NumberOfShards uint32
MaxDelta uint64
}

// StorageConfig will map the storage unit configuration
type StorageConfig struct {
Cache CacheConfig
DB DBConfig
}

// CacheConfig will map the cache configuration
type CacheConfig struct {
Name string
Type string
Capacity uint32
SizeInBytes uint64
}

// DBConfig will map the database configuration
type DBConfig struct {
FilePath string
Type string
BatchDelaySeconds int
MaxBatchSize int
MaxOpenFiles int
}
90 changes: 90 additions & 0 deletions factory/dataProcessorFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package factory

import (
"os"

"github.com/multiversx/mx-chain-communication-go/websocket"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-storage-go/leveldb"
"github.com/multiversx/mx-chain-storage-go/storageUnit"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
)

// CreateDataProcessor will create a new instance of data processor
func CreateDataProcessor(cfg config.Config) (websocket.PayloadHandler, error) {
protoMarshaller := &marshal.GogoProtoMarshalizer{}

blockContainer, err := createBlockContainer()
if err != nil {
return nil, err
}

firehosePublisher, err := process.NewFirehosePublisher(
os.Stdout,
blockContainer,
protoMarshaller,
)
if err != nil {
return nil, err
}

cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.CacheType(cfg.OutportBlocksStorage.Cache.Type),
SizeInBytes: cfg.OutportBlocksStorage.Cache.SizeInBytes,
Capacity: cfg.OutportBlocksStorage.Cache.Capacity,
}

cacher, err := storageUnit.NewCache(cacheConfig)
if err != nil {
return nil, err
}

persister, err := leveldb.NewDB(
cfg.OutportBlocksStorage.DB.FilePath,
cfg.OutportBlocksStorage.DB.BatchDelaySeconds,
cfg.OutportBlocksStorage.DB.MaxBatchSize,
cfg.OutportBlocksStorage.DB.MaxOpenFiles,
)
if err != nil {
return nil, err
}

storageUnit, err := storageUnit.NewStorageUnit(cacher, persister)
if err != nil {
return nil, err
}

blocksPool, err := process.NewBlocksPool(storageUnit, protoMarshaller, cfg.DataPoolConfig.NumberOfShards, cfg.DataPoolConfig.MaxDelta)
if err != nil {
return nil, err
}

dataAggregator, err := process.NewDataAggregator(blocksPool)
if err != nil {
return nil, err
}

return process.NewDataProcessor(firehosePublisher, protoMarshaller, blocksPool, dataAggregator, blockContainer)
}

func createBlockContainer() (process.BlockContainerHandler, error) {
container := block.NewEmptyBlockCreatorsContainer()

err := container.Add(core.ShardHeaderV1, block.NewEmptyHeaderCreator())
if err != nil {
return nil, err
}
err = container.Add(core.ShardHeaderV2, block.NewEmptyHeaderV2Creator())
if err != nil {
return nil, err
}
err = container.Add(core.MetaHeader, block.NewEmptyMetaBlockCreator())
if err != nil {
return nil, err
}

return container, nil
}
83 changes: 2 additions & 81 deletions factory/wsConnectorFactory.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package factory

import (
"os"

"github.com/multiversx/mx-chain-communication-go/websocket"
"github.com/multiversx/mx-chain-communication-go/websocket/data"
factoryHost "github.com/multiversx/mx-chain-communication-go/websocket/factory"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-core-go/marshal/factory"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-storage-go/leveldb"
"github.com/multiversx/mx-chain-storage-go/storageUnit"

"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
Expand All @@ -21,67 +16,12 @@ var log = logger.GetOrCreate("ws-connector")

// CreateWSConnector will create a ws connector able to receive and process incoming data
// from a multiversx node
func CreateWSConnector(cfg config.WebSocketConfig) (process.WSConnector, error) {
func CreateWSConnector(cfg config.WebSocketConfig, dataProcessor websocket.PayloadHandler) (process.WSConnector, error) {
marshaller, err := factory.NewMarshalizer(cfg.MarshallerType)
if err != nil {
return nil, err
}

blockContainer, err := createBlockContainer()
if err != nil {
return nil, err
}

protoMarshaller := &marshal.GogoProtoMarshalizer{}

firehosePublisher, err := process.NewFirehosePublisher(
os.Stdout, // DO NOT CHANGE
blockContainer,
protoMarshaller,
)
if err != nil {
return nil, err
}

// TODO: move cache to config
cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.SizeLRUCache,
SizeInBytes: 209715200, // 200MB
Capacity: 100,
}

cacher, err := storageUnit.NewCache(cacheConfig)
if err != nil {
return nil, err
}

path := "shardBlocks"
persister, err := leveldb.NewDB(path, 2, 100, 10)
if err != nil {
return nil, err
}

storageUnit, err := storageUnit.NewStorageUnit(cacher, persister)
if err != nil {
return nil, err
}

blocksPool, err := process.NewBlocksPool(storageUnit, blockContainer, protoMarshaller)
if err != nil {
return nil, err
}

dataAggregator, err := process.NewDataAggregator(blocksPool)
if err != nil {
return nil, err
}

// TODO: move to separate factory
dataProcessor, err := process.NewDataProcessor(firehosePublisher, protoMarshaller, blocksPool, dataAggregator, blockContainer)
if err != nil {
return nil, err
}

wsHost, err := createWsHost(marshaller, cfg)
if err != nil {
return nil, err
Expand All @@ -95,25 +35,6 @@ func CreateWSConnector(cfg config.WebSocketConfig) (process.WSConnector, error)
return wsHost, nil
}

func createBlockContainer() (process.BlockContainerHandler, error) {
container := block.NewEmptyBlockCreatorsContainer()

err := container.Add(core.ShardHeaderV1, block.NewEmptyHeaderCreator())
if err != nil {
return nil, err
}
err = container.Add(core.ShardHeaderV2, block.NewEmptyHeaderV2Creator())
if err != nil {
return nil, err
}
err = container.Add(core.MetaHeader, block.NewEmptyMetaBlockCreator())
if err != nil {
return nil, err
}

return container, nil
}

func createWsHost(wsMarshaller marshal.Marshalizer, cfg config.WebSocketConfig) (factoryHost.FullDuplexHost, error) {
return factoryHost.CreateWebSocketHost(factoryHost.ArgsWebSocketHost{
WebSocketConfig: data.WebSocketConfig{
Expand Down
7 changes: 4 additions & 3 deletions factory/wsConnectorFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package factory
import (
"testing"

"github.com/multiversx/mx-chain-communication-go/websocket"
"github.com/multiversx/mx-chain-communication-go/websocket/data"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/stretchr/testify/require"
Expand All @@ -27,7 +28,7 @@ func TestCreateWSConnector(t *testing.T) {

cfg := createConfig()
cfg.MarshallerType = "invalid"
ws, err := CreateWSConnector(cfg)
ws, err := CreateWSConnector(cfg, websocket.NewNilPayloadHandler())
require.NotNil(t, err)
require.Nil(t, ws)
})
Expand All @@ -37,7 +38,7 @@ func TestCreateWSConnector(t *testing.T) {

cfg := createConfig()
cfg.RetryDuration = 0
ws, err := CreateWSConnector(cfg)
ws, err := CreateWSConnector(cfg, websocket.NewNilPayloadHandler())
require.NotNil(t, err)
require.Nil(t, ws)
})
Expand All @@ -46,7 +47,7 @@ func TestCreateWSConnector(t *testing.T) {
t.Parallel()

cfg := createConfig()
ws, err := CreateWSConnector(cfg)
ws, err := CreateWSConnector(cfg, websocket.NewNilPayloadHandler())
require.Nil(t, err)
require.NotNil(t, ws)

Expand Down
Loading

0 comments on commit 4f38d98

Please sign in to comment.