Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat/hyperblock' into feat/hyper…
Browse files Browse the repository at this point in the history
…block
  • Loading branch information
cristure committed Apr 11, 2024
2 parents 74d4625 + 4b95d6d commit 16186fd
Show file tree
Hide file tree
Showing 39 changed files with 9,985 additions and 126 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ run: build
debug: build
cd ${cmd_dir} && \
${debugger} exec ./${binary}

test:
@echo " > Running unit tests"
go test -cover -race -coverprofile=coverage.txt -covermode=atomic -v ./...
52 changes: 42 additions & 10 deletions cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
[web_socket]
[WebSocket]
# IP with port used to recieve data via ws. Should be compatible with the one from node. See [HostDriverConfig].URL
# from https://github.com/multiversx/mx-chain-go/blob/master/cmd/node/config/external.toml.
url = "localhost:22111"
URL = "localhost:22111"

# Possible values: json, gogo protobuf. Should be compatible with [HostDriverConfig].MarshallerType
marshaller_type = "gogo protobuf"
MarshallerType = "gogo protobuf"

# This flag describes the mode to start the WebSocket connector. Can be "client" or "server"
mode = "server"
Mode = "server"

# Retry duration (receive/send data/acknowledge) in seconds
retry_duration = 5
RetryDurationInSec = 5

# This flag specifies if we should send an acknowledge signal upon recieving data
with_acknowledge = true
WithAcknowledge = true

# The duration in seconds to wait for an acknowledgement message
acknowledge_timeout_in_sec = 5
AcknowledgeTimeoutInSec = 5

# Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block
# incoming data in case of a local error, this should be set to true.
blocking_ack_on_error = true
BlockingAckOnError = true

# This flag specifies if we should drop messages if there is no connection to the host
drop_messages_if_no_connection = false
version = 1
DropMessagesIfNoConnection = false

# Version specifies payload version (default = 1)
Version = 1

[DataPool]
NumberOfShards = 3

# Should be smaller then PruningWindow
MaxDelta = 10
PruningWindow = 1000

# Defines the number of active persisters to keep open
NumPersistersToKeep = 2

[OutportBlocksStorage]
[OutportBlocksStorage.Cache]
Name = "OutportBlocksStorage"
Capacity = 100
Type = "SizeLRU"
SizeInBytes = 209715200 # 200MB
[OutportBlocksStorage.DB]
FilePath = "OutportBlocks"
Type = "LvlDBSerial"
BatchDelaySeconds = 2
MaxBatchSize = 100
MaxOpenFiles = 10
5 changes: 5 additions & 0 deletions cmd/connector/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ var (
Name: "disable-ansi-color",
Usage: "Boolean option for disabling ANSI colors in the logging system.",
}

importDBMode = cli.BoolFlag{
Name: "import-db-mode",
Usage: "Boolean option for enabling import db mode.",
}
)
34 changes: 15 additions & 19 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/closing"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-logger-go/file"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/factory"
"github.com/multiversx/mx-chain-ws-connector-template-go/connector"
"github.com/urfave/cli"
)

var log = logger.GetOrCreate("mx-chain-ws-connector-template-go")
var log = logger.GetOrCreate("main")

const (
configPath = "config/config.toml"
Expand All @@ -36,6 +34,7 @@ func main() {
logLevel,
logSaveFile,
disableAnsiColor,
importDBMode,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -73,32 +72,29 @@ func startConnector(ctx *cli.Context) error {
}
}

wsClient, err := factory.CreateWSConnector(cfg.WebSocketConfig)
importDBMode := ctx.GlobalBool(importDBMode.Name)

connectorRunner, err := connector.NewConnectorRunner(cfg, importDBMode)
if err != nil {
return fmt.Errorf("cannot create ws firehose connector, error: %w", err)
return fmt.Errorf("cannot create connector runner, error: %w", err)
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)

log.Info("starting ws client...")

<-interrupt
log.Info("closing app at user's signal")

err = wsClient.Close()
log.LogIfError(err)
connectorRunner.Start()
if err != nil {
return err
}

if withLogFile {
err = logFile.Close()
log.LogIfError(err)
}

return nil
}

func loadConfig(filepath string) (config.Config, error) {
cfg := config.Config{}
err := core.LoadTomlFile(&cfg, filepath)
func loadConfig(filepath string) (*config.Config, error) {
cfg := &config.Config{}
err := core.LoadTomlFile(cfg, filepath)

log.Info("loaded config", "path", configPath)

Expand Down
53 changes: 43 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,51 @@ package config

// Config holds general configuration
type Config struct {
WebSocketConfig WebSocketConfig `toml:"web_socket"`
WebSocket WebSocketConfig
DataPool DataPoolConfig
OutportBlocksStorage StorageConfig
}

// WebSocketConfig holds web sockets config
type WebSocketConfig struct {
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
AcknowledgeTimeoutInSec int `toml:"acknowledge_timeout_in_sec"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
DropMessagesIfNoConnection bool `toml:"drop_messages_if_no_connection"` // Set to `true` to drop messages if there is no active WebSocket connection to send to.
Version uint32 `toml:"version"` // Defines the payload version.
URL string
MarshallerType string
Mode string
RetryDurationInSec uint32
WithAcknowledge bool
AcknowledgeTimeoutInSec int
BlockingAckOnError bool
DropMessagesIfNoConnection bool
Version uint32
}

// DataPoolConfig will map data poil configuration
type DataPoolConfig struct {
NumberOfShards uint32
MaxDelta uint64
PruningWindow uint64
NumPersistersToKeep int
}

// StorageConfig will map the storage unit configuration
type StorageConfig struct {
Cache CacheConfig
DB DBConfig
}

// CacheConfig will map the cache configuration
type CacheConfig struct {
Name string
Type string
Capacity uint32
SizeInBytes uint64
}

// DBConfig will map the database configuration
type DBConfig struct {
FilePath string
Type string
BatchDelaySeconds int
MaxBatchSize int
MaxOpenFiles int
}
74 changes: 74 additions & 0 deletions connector/connectorRunner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package connector

import (
"errors"
"fmt"
"os"
"os/signal"
"syscall"

logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/factory"
)

var log = logger.GetOrCreate("connectorRunner")

// ErrNilConfig signals that a nil config structure was provided
var ErrNilConfig = errors.New("nil configs provided")

type connectorRunner struct {
config *config.Config
importDBMode bool
}

// NewConnectorRunner will create a new connector runner instance
func NewConnectorRunner(cfg *config.Config, importDBMode bool) (*connectorRunner, error) {
if cfg == nil {
return nil, ErrNilConfig
}

return &connectorRunner{
config: cfg,
importDBMode: importDBMode,
}, nil
}

// Start will trigger connector service
func (cr *connectorRunner) Start() error {
storer, err := factory.CreateStorer(*cr.config, cr.importDBMode)
if err != nil {
return err
}

dataProcessor, err := factory.CreateDataProcessor(*cr.config, storer)
if err != nil {
return fmt.Errorf("cannot create ws firehose data processor, error: %w", err)
}

wsClient, err := factory.CreateWSConnector(cr.config.WebSocket, dataProcessor)
if err != nil {
return fmt.Errorf("cannot create ws firehose connector, error: %w", err)
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)

log.Info("starting ws client...")

<-interrupt

log.Info("application closing, calling Close on all subcomponents...")

err = storer.Close()
if err != nil {
return err
}

err = wsClient.Close()
if err != nil {
return err
}

return err
}
Loading

0 comments on commit 16186fd

Please sign in to comment.