Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/hyperblock #18

Open
wants to merge 248 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
248 commits
Select commit Hold shift + click to select a range
61a89af
added base components structure
ssd04 Mar 14, 2024
5188b70
switch default ws connector mode to server
ssd04 Mar 14, 2024
3e09445
added testdata and ws connector tool
ssd04 Mar 14, 2024
8811cf2
added hyper outport block proto struct
ssd04 Mar 14, 2024
8990a7c
update data aggregator
ssd04 Mar 14, 2024
eca2bba
refactor hyper outport block propagation
ssd04 Mar 14, 2024
5859a2a
fix notarized refence + updated test data
ssd04 Mar 14, 2024
08fd7d4
fix storage cacher reference
ssd04 Mar 14, 2024
590d499
added blocks caches
ssd04 Mar 15, 2024
0878889
blocks pool with rounds map
ssd04 Mar 15, 2024
bdca33f
added persistent storage for events
ssd04 Mar 18, 2024
7ee4a1d
added pruning for persister
ssd04 Mar 18, 2024
cd715f5
update ws connector tool
ssd04 Mar 19, 2024
58540fc
added missing comments
ssd04 Mar 19, 2024
9a136dd
added unit tests
ssd04 Mar 19, 2024
c485bd7
remove ws connector tool
ssd04 Mar 19, 2024
d48bb17
remove test data
ssd04 Mar 19, 2024
0b011cf
fix linter issue
ssd04 Mar 19, 2024
e7362e7
fix merge conflicts
ssd04 Mar 19, 2024
934e2a3
fix merge conflicts
ssd04 Mar 19, 2024
fb80947
Merge branch 'hyperblock-processor' into implement-blocks-pool
ssd04 Mar 19, 2024
a4f5236
rm test data and test tools
ssd04 Mar 19, 2024
4f38d98
updated config + refactoring factory
ssd04 Mar 20, 2024
b6915da
use camel case format for websocket config
ssd04 Mar 20, 2024
0fd8ca9
renamings + unit test fix
ssd04 Mar 20, 2024
15aa4b9
added pruning storer
ssd04 Mar 20, 2024
615f939
pruning storer unit tests
ssd04 Mar 21, 2024
abff3c0
added cacher mock and stub
ssd04 Mar 21, 2024
0afb4cc
cleanup old persisters
ssd04 Mar 21, 2024
5c9b702
use pruning storer in data pool
ssd04 Mar 21, 2024
bb4c05f
add test make command
ssd04 Mar 21, 2024
205614c
added unit test blocks pool
ssd04 Mar 22, 2024
4967f79
added import db mode
ssd04 Mar 22, 2024
fefbcab
fix linter issue
ssd04 Mar 22, 2024
d526525
create dir of not existing
ssd04 Mar 22, 2024
bb0a6ba
Added Dockerfile.
cristure Mar 25, 2024
cf7b8b6
dump cache data to persister
ssd04 Mar 26, 2024
a9f1a5c
pruning storer close
ssd04 Mar 26, 2024
c289b13
refactor to use connector runner component
ssd04 Mar 26, 2024
a722602
fix tests - update storer stub
ssd04 Mar 26, 2024
a9d4926
added config flag and Dockerfile.
cristure Mar 26, 2024
e34b630
updated default value for config file.
cristure Mar 27, 2024
5bdb010
fixes after review - update storage version + small checks
ssd04 Mar 27, 2024
c2c1da4
cosmetic changes.
cristure Mar 28, 2024
d480d5a
Merge pull request #4 from multiversx/hyperblock-processor
ssd04 Mar 29, 2024
500d14c
Merge branch 'feat/hyperblock' into implement-blocks-pool
ssd04 Apr 1, 2024
f5707e5
set checkpoint
ssd04 Apr 3, 2024
1754bcf
added converter from gogo protobuf to google protobuf.
cristure Apr 5, 2024
18f44a9
fix after review.
cristure Apr 11, 2024
2e28e61
added proto marshalizer and tests.
cristure Apr 11, 2024
1b67dfc
integrated protoMarshalizer in converter.
cristure Apr 11, 2024
5dc249f
fix linter and cosmetic changes.
cristure Apr 11, 2024
06c9374
add t.Parallel()
cristure Apr 11, 2024
820a0f7
fix linter.
cristure Apr 11, 2024
0f96e5f
update test stubs
ssd04 Apr 12, 2024
e2e8481
added separate outport and hyper outport blocks pool
ssd04 Apr 12, 2024
9472b98
refactor base blocks pool to use marshalled data
ssd04 Apr 12, 2024
f665e4d
added disabled blocks pool components
ssd04 Apr 12, 2024
dbccd44
added grpc publisher
ssd04 Apr 12, 2024
79a6731
added publisher factory
ssd04 Apr 12, 2024
ba9aae3
remove disabled components
ssd04 Apr 15, 2024
af89d47
fix blocks pool get test
ssd04 Apr 15, 2024
b792ea1
refactor data components factory
ssd04 Apr 15, 2024
de57511
added disabled hyper blocks pool
ssd04 Apr 15, 2024
e419464
added failed to put block error
ssd04 Apr 16, 2024
5744bf8
added persister data into prunning storer
ssd04 Apr 16, 2024
af7c083
added db modes
ssd04 Apr 16, 2024
9dff85c
added storer destroy + fix mutex deadlock
ssd04 Apr 16, 2024
2658d64
update storer stub
ssd04 Apr 16, 2024
276bffc
pruning storer dump unit test
ssd04 Apr 16, 2024
e16d307
data pool unit tests
ssd04 Apr 16, 2024
bcfa48c
blocks pool integration tests
ssd04 Apr 16, 2024
5578a9c
fix integration test
ssd04 Apr 16, 2024
ba1ce92
added grpc blocks handler
ssd04 Apr 17, 2024
72f84e1
remove separated outport blocks pools
ssd04 Apr 17, 2024
6216b8b
fixes after review.
cristure Apr 18, 2024
c93f2a3
cosmetic changes.
cristure Apr 18, 2024
a91eb76
moved checkpoint functionality to blocks pool
ssd04 Apr 19, 2024
d44c435
removed number of shards field
ssd04 Apr 19, 2024
dcd19b8
Merge branch 'implement-blocks-pool' into blocks-pool-improvements
ssd04 Apr 19, 2024
9ff2ebf
removed not necessary tests
ssd04 Apr 19, 2024
90afa46
Merge pull request #11 from multiversx/pruning-storer-tests
ssd04 Apr 19, 2024
e6e6512
Merge pull request #5 from multiversx/implement-blocks-pool
ssd04 Apr 19, 2024
6527f62
Merge pull request #10 from multiversx/converterV1
ssd04 Apr 19, 2024
7e0d337
remove init from start
ssd04 Apr 19, 2024
d34b43a
added gRPC server and slight modifications.
cristure Apr 19, 2024
7b3cac1
moved marshaler from gogo to standard proto.
cristure Apr 19, 2024
a05329b
bump go version in github actions.
cristure Apr 19, 2024
0aac083
modify linter job.
cristure Apr 19, 2024
bfe6aa4
fix linter.yml
cristure Apr 19, 2024
25a6483
replace deprecated functions.
cristure Apr 19, 2024
5b5b126
added first commitable block config option
ssd04 Apr 21, 2024
2dbaf6e
reverted config file override.
cristure Apr 22, 2024
cabb4a7
fixes after review.
cristure Apr 22, 2024
abd010d
add previous block succesive check
ssd04 Apr 22, 2024
ac64822
improved gRPC server and factory.
cristure Apr 22, 2024
f43b5a9
Merge branch 'feat/hyperblock' into dockerfile
cristure Apr 22, 2024
b7e3bed
cosmetic changes.
cristure Apr 22, 2024
b26c4b8
fixes after review.
cristure Apr 22, 2024
4ddee09
Merge pull request #6 from multiversx/dockerfile
AdoAdoAdo Apr 22, 2024
0df8a40
fix tests.
cristure Apr 22, 2024
0ab621d
fixed blocksPool expression.
cristure Apr 22, 2024
017c0d8
Merge remote-tracking branch 'refs/remotes/origin/feat/hyperblock' in…
cristure Apr 22, 2024
46b0f95
Update service/hyperOutportBlock/blockService.go
cristure Apr 23, 2024
f9d87c4
fix imports.
cristure Apr 23, 2024
ab37eb9
added missing comments.
cristure Apr 23, 2024
9e78fa1
improve slice allocation by specifying the capacity.
cristure Apr 23, 2024
27188c4
add integration test for optimized persister mode
ssd04 Apr 23, 2024
143e3c0
more slice allocation fixes.
cristure Apr 23, 2024
cdb69fe
avoid setting meta index twice
ssd04 Apr 23, 2024
6fa0e69
reverted disabling data processor tests.
cristure Apr 23, 2024
3ee852c
cosmetic changes.
cristure Apr 23, 2024
38ae825
added tmp dir file path
ssd04 Apr 23, 2024
99447a3
added stream endpoint.
cristure Apr 23, 2024
f32cfbc
remove client.
cristure Apr 23, 2024
a408d37
fixes after review.
cristure Apr 23, 2024
36bdd0d
fixes after review.
cristure Apr 23, 2024
b7b93fc
remove breaking test.
cristure Apr 23, 2024
a990c00
added common package
ssd04 Apr 24, 2024
e24e931
added db modes comments
ssd04 Apr 24, 2024
baf50c0
Merge branch 'grpcServer' into grpc-streaming
cristure Apr 24, 2024
99c597a
Merge pull request #15 from multiversx/grpcServer
ssd04 Apr 24, 2024
7f36b93
Merge branch 'feat/hyperblock' into blocks-pool-improvements
ssd04 Apr 24, 2024
f1da267
fix blocks pool tests - use gogo proto marshaller
ssd04 Apr 24, 2024
692f909
added hyper blocks pool
ssd04 Apr 24, 2024
0078764
change block checkpoint to use standard proto
ssd04 Apr 24, 2024
d38a478
added error check for updating meta state
ssd04 Apr 24, 2024
eacfdf5
removed queue and added buffered channel.
cristure Apr 24, 2024
a58a31d
fix linter.
cristure Apr 24, 2024
105c3d9
fix unit tests
ssd04 Apr 24, 2024
f914bb9
fix integration tests
ssd04 Apr 24, 2024
8932d2d
fix linter issue - added err check in test
ssd04 Apr 25, 2024
f3dfb83
rename go module to match repo name
ssd04 Apr 25, 2024
e61cb62
rename also proto struct
ssd04 Apr 25, 2024
f619fee
update firehose message type
ssd04 Apr 25, 2024
621dd65
Merge remote-tracking branch 'refs/remotes/origin/feat/hyperblock' in…
cristure Apr 25, 2024
e29adbc
fix conflicts.
cristure Apr 25, 2024
899d714
improved the graceful shutdown mechanism.
cristure Apr 25, 2024
0dd37ec
fix broken tests.
cristure Apr 25, 2024
88b0fb4
added db mode type
ssd04 Apr 25, 2024
11de24b
update error formats
ssd04 Apr 26, 2024
95d497d
Merge pull request #19 from multiversx/rename-module-name
AdoAdoAdo Apr 26, 2024
2a7a1a0
revamped the streaming mechanism.
cristure Apr 26, 2024
e26404d
comsetic changes.
cristure Apr 26, 2024
5e6d1fa
comsetic changes.
cristure Apr 26, 2024
4752172
fix tests and linter.
cristure Apr 26, 2024
c642304
Merge branch 'refs/heads/feat/hyperblock' into grpc-streaming
cristure Apr 26, 2024
4c0fdfa
Merge remote-tracking branch 'refs/remotes/origin/feat/hyperblock' in…
cristure Apr 26, 2024
0424845
fix after review
ssd04 Apr 26, 2024
15c28dd
Merge branch 'feat/hyperblock' into blocks-pool-improvements
ssd04 Apr 26, 2024
6fac50e
fix broken imports.
cristure Apr 26, 2024
72e5040
Update server/grpcServer.go
cristure Apr 26, 2024
7ad43a4
added TODOS and cosmetic changes.
cristure Apr 26, 2024
648a401
move succesive check to hyper blocks pool
ssd04 Apr 26, 2024
254357a
added common publisher
ssd04 Apr 29, 2024
af2df4b
adapt to use common publisher
ssd04 Apr 29, 2024
10bdaa2
handle revert event
ssd04 Apr 29, 2024
b193fc4
fix data processor unit tests
ssd04 Apr 30, 2024
2066e6a
added publisher handler unit tests
ssd04 Apr 30, 2024
2c71761
added varibles to config
ssd04 Apr 30, 2024
5477209
more unit tests for first commitable block check
ssd04 Apr 30, 2024
41e287d
fix linter checks
ssd04 Apr 30, 2024
146ea69
fix concurrency tests issue
ssd04 Apr 30, 2024
9ea7570
Merge pull request #17 from multiversx/grpc-streaming
AdoAdoAdo Apr 30, 2024
39a0e00
Merge branch 'feat/hyperblock' into blocks-pool-improvements
ssd04 Apr 30, 2024
0dcc0a6
fix after merge
ssd04 Apr 30, 2024
79e3df8
update proto to time duration
ssd04 Apr 30, 2024
3b59b8a
add protogen command
ssd04 Apr 30, 2024
f79a750
added separate grpc proto file
ssd04 Apr 30, 2024
e63cfa8
update gprc service names
ssd04 Apr 30, 2024
879c289
fix after review
ssd04 Apr 30, 2024
09ec534
added polling unit tests
ssd04 May 2, 2024
6e06b53
added grpc server wrapper
ssd04 May 2, 2024
afe37fd
fix renaming
ssd04 May 7, 2024
fdc1a8d
fix linter issue
ssd04 May 7, 2024
eb2111b
added unit test for stream requests
ssd04 May 7, 2024
94d4d8f
fix linter issue in unit test
ssd04 May 7, 2024
47128b9
Update process/blocksPool.go
ssd04 May 7, 2024
851e289
fixes after review
ssd04 May 7, 2024
b374c06
moved custom outport block handling to separate files
ssd04 May 7, 2024
da30cd6
Merge pull request #20 from multiversx/update-firehose-message-type
ssd04 May 7, 2024
2f65057
fixes after review: rename mock files
ssd04 May 7, 2024
09d54b7
updated to use nonce instead of round for index reference
ssd04 May 8, 2024
8abaaca
rename hyper blocks pool
ssd04 May 8, 2024
46f4e74
rename blocks pool files
ssd04 May 8, 2024
32810dc
fixes after review
ssd04 May 8, 2024
042f242
update changes in connector
ssd04 May 8, 2024
b369d17
added first commitable blocks per shard
ssd04 May 9, 2024
5edf104
aupdate first commitable blocks per shard config
ssd04 May 9, 2024
4acca74
fixes after review
ssd04 May 9, 2024
fa73294
Merge pull request #13 from multiversx/blocks-pool-improvements
AdoAdoAdo May 9, 2024
c668fed
Merge branch 'feat/hyperblock' into add-meta-blocks-optimistically
ssd04 May 10, 2024
2fcf6be
fixes after review
ssd04 May 13, 2024
f0be16a
fixes after review
ssd04 May 13, 2024
e89a44d
fix after review - renaming
ssd04 May 14, 2024
4b6d7dd
added publisher checkpoint
ssd04 May 14, 2024
42ff2a5
added nil check
ssd04 May 14, 2024
e944cb7
change publish and update state order
ssd04 May 14, 2024
4b56518
handle checkpoint from init on goroutine
ssd04 May 14, 2024
d5e6f0b
fix unit test
ssd04 May 14, 2024
32b889a
added concurrency test
ssd04 May 15, 2024
282e663
fix variable typo
ssd04 May 17, 2024
925c0e8
Merge pull request #21 from multiversx/add-meta-blocks-optimistically
ssd04 May 20, 2024
5ab5c15
Merge branch 'feat/hyperblock' into grpc-streaming-updates
ssd04 May 20, 2024
6f04819
rename mock file
ssd04 May 20, 2024
1ef30bc
update grpc server mock
ssd04 May 20, 2024
78dec20
fix concurrency test issue
ssd04 May 20, 2024
1544e6c
Merge pull request #22 from multiversx/grpc-streaming-updates
AdoAdoAdo May 24, 2024
e7293a6
save last soft checkpoint
ssd04 Jun 11, 2024
8d7930c
moved first commitable blocks check to data processor
ssd04 Jun 12, 2024
da37707
fix unit tests
ssd04 Jun 13, 2024
8f78bfe
added unit test for data processor first commitable blocks
ssd04 Jun 13, 2024
cbdc4f0
changed proto definition along with converting system.
cristure Jul 12, 2024
774931a
added v2 for shard conversion.
cristure Jul 15, 2024
83b04d6
added first commitable blocks check to publisher handler
ssd04 Jul 16, 2024
7cc6f22
fix header hash reference
ssd04 Jul 16, 2024
46d8ea8
refactored proto definitions.
cristure Jul 18, 2024
141d57e
added reset checkpoints flag
ssd04 Jul 18, 2024
23ee1d5
added interfaces for v2.
cristure Jul 19, 2024
b523e87
improved copying mechanism.
cristure Jul 25, 2024
81456a2
fixed nil pointer.
cristure Jul 25, 2024
88c11dc
fix linter.
cristure Jul 25, 2024
3379e9a
remove debug part.
cristure Jul 26, 2024
afd89ba
update config
ssd04 Jul 26, 2024
c8539ed
restore config
ssd04 Jul 26, 2024
865512d
remove revent events handling
ssd04 Aug 5, 2024
7fbca4c
get last checkpoint fix in publisher handler
ssd04 Aug 6, 2024
8169fac
fixes after review.
cristure Aug 6, 2024
2126233
revert to v1.
cristure Aug 6, 2024
a8bd56e
cosmetic changes.
cristure Aug 7, 2024
d6de3c3
fixes after review
ssd04 Aug 7, 2024
40e9bfc
use deep copy for map
ssd04 Aug 7, 2024
527f494
cosmetic changes.
cristure Aug 7, 2024
7564475
Merge pull request #23 from multiversx/data-pool-fixes
ssd04 Aug 7, 2024
8f76917
Merge pull request #24 from multiversx/proto-v2
cristure Aug 7, 2024
73fcc71
improve slice allocation by specifying the capacity.
cristure Apr 23, 2024
0601975
more slice allocation fixes.
cristure Apr 23, 2024
4a6335e
fixes after review.
cristure Apr 23, 2024
d41e16b
fix conflicts.
cristure Apr 25, 2024
22f5a7c
fix after rebase.
cristure Aug 26, 2024
e146985
Merge remote-tracking branch 'origin/improve-slice-allocation' into i…
cristure Aug 26, 2024
5978baf
Merge remote-tracking branch 'refs/remotes/origin/feat/hyperblock' in…
cristure Aug 26, 2024
1f29ce0
Merge remote-tracking branch 'origin/improve-slice-allocation' into i…
cristure Aug 26, 2024
ab42226
Merge pull request #16 from multiversx/improve-slice-allocation
AdoAdoAdo Aug 26, 2024
c1e7b86
added state changes to convertor.
cristure Sep 23, 2024
d7bc540
fixes after review.
cristure Sep 25, 2024
93c93a9
cut down content from test file to decrease size.
cristure Sep 26, 2024
ffe985a
Merge pull request #25 from multiversx/convert_state_changes
cristure Oct 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ run: build
debug: build
cd ${cmd_dir} && \
${debugger} exec ./${binary}

test:
@echo " > Running unit tests"
go test -cover -race -coverprofile=coverage.txt -covermode=atomic -v ./...
50 changes: 40 additions & 10 deletions cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,50 @@
[web_socket]
[WebSocket]
# IP with port used to recieve data via ws. Should be compatible with the one from node. See [HostDriverConfig].URL
# from https://github.com/multiversx/mx-chain-go/blob/master/cmd/node/config/external.toml.
url = "localhost:22111"
URL = "localhost:22111"

# Possible values: json, gogo protobuf. Should be compatible with [HostDriverConfig].MarshallerType
marshaller_type = "gogo protobuf"
MarshallerType = "gogo protobuf"

# This flag describes the mode to start the WebSocket connector. Can be "client" or "server"
mode = "server"
Mode = "server"

# Retry duration (receive/send data/acknowledge) in seconds
retry_duration = 5
RetryDurationInSec = 5

# This flag specifies if we should send an acknowledge signal upon recieving data
with_acknowledge = true
WithAcknowledge = true

# The duration in seconds to wait for an acknowledgement message
acknowledge_timeout_in_sec = 5
AcknowledgeTimeoutInSec = 5

# Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block
# incoming data in case of a local error, this should be set to true.
blocking_ack_on_error = true
BlockingAckOnError = true

# This flag specifies if we should drop messages if there is no connection to the host
drop_messages_if_no_connection = false
version = 1
DropMessagesIfNoConnection = false

# Version specifies payload version (default = 1)
Version = 1

[DataPool]
# Should be smaller then PruningWindow
MaxDelta = 10
PruningWindow = 1000

# Defines the number of active persisters to keep open
NumPersistersToKeep = 2

[OutportBlocksStorage]
[OutportBlocksStorage.Cache]
Name = "OutportBlocksStorage"
Capacity = 100
Type = "SizeLRU"
SizeInBytes = 209715200 # 200MB
[OutportBlocksStorage.DB]
FilePath = "OutportBlocks"
Type = "LvlDBSerial"
BatchDelaySeconds = 2
MaxBatchSize = 100
MaxOpenFiles = 10
6 changes: 6 additions & 0 deletions cmd/connector/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ var (
Name: "disable-ansi-color",
Usage: "Boolean option for disabling ANSI colors in the logging system.",
}

dbMode = cli.StringFlag{
Name: "db-mode",
Usage: "Option for specifying db mode. Available options: `full-persister`, `import-db`, `optimized-persister`",
Value: "full-persister",
}
)
35 changes: 16 additions & 19 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/closing"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-logger-go/file"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/factory"
"github.com/multiversx/mx-chain-ws-connector-template-go/connector"
"github.com/urfave/cli"
)

var log = logger.GetOrCreate("mx-chain-ws-connector-template-go")
var log = logger.GetOrCreate("main")

const (
configPath = "config/config.toml"
Expand All @@ -36,6 +34,7 @@ func main() {
logLevel,
logSaveFile,
disableAnsiColor,
dbMode,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -73,32 +72,30 @@ func startConnector(ctx *cli.Context) error {
}
}

wsClient, err := factory.CreateWSConnector(cfg.WebSocketConfig)
dbMode := ctx.GlobalString(dbMode.Name)
log.Info("storer sync mode", "dbMode", dbMode)

connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode)
if err != nil {
return fmt.Errorf("cannot create ws firehose connector, error: %w", err)
return fmt.Errorf("cannot create connector runner, error: %w", err)
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)

log.Info("starting ws client...")

<-interrupt
log.Info("closing app at user's signal")

err = wsClient.Close()
log.LogIfError(err)
err = connectorRunner.Run()
if err != nil {
return fmt.Errorf("connector runner, error: %w", err)
}

if withLogFile {
err = logFile.Close()
log.LogIfError(err)
}

return nil
}

func loadConfig(filepath string) (config.Config, error) {
cfg := config.Config{}
err := core.LoadTomlFile(&cfg, filepath)
func loadConfig(filepath string) (*config.Config, error) {
cfg := &config.Config{}
err := core.LoadTomlFile(cfg, filepath)

log.Info("loaded config", "path", configPath)

Expand Down
52 changes: 42 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,50 @@ package config

// Config holds general configuration
type Config struct {
WebSocketConfig WebSocketConfig `toml:"web_socket"`
WebSocket WebSocketConfig
DataPool DataPoolConfig
OutportBlocksStorage StorageConfig
}

// WebSocketConfig holds web sockets config
type WebSocketConfig struct {
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
AcknowledgeTimeoutInSec int `toml:"acknowledge_timeout_in_sec"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
DropMessagesIfNoConnection bool `toml:"drop_messages_if_no_connection"` // Set to `true` to drop messages if there is no active WebSocket connection to send to.
Version uint32 `toml:"version"` // Defines the payload version.
URL string
MarshallerType string
Mode string
RetryDurationInSec uint32
WithAcknowledge bool
AcknowledgeTimeoutInSec int
BlockingAckOnError bool
DropMessagesIfNoConnection bool
Version uint32
}

// DataPoolConfig will map data poil configuration
type DataPoolConfig struct {
MaxDelta uint64
PruningWindow uint64
NumPersistersToKeep int
}

// StorageConfig will map the storage unit configuration
type StorageConfig struct {
Cache CacheConfig
DB DBConfig
}

// CacheConfig will map the cache configuration
type CacheConfig struct {
Name string
Type string
Capacity uint32
SizeInBytes uint64
}

// DBConfig will map the database configuration
type DBConfig struct {
FilePath string
Type string
BatchDelaySeconds int
MaxBatchSize int
MaxOpenFiles int
}
102 changes: 102 additions & 0 deletions connector/connectorRunner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package connector

import (
"errors"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/multiversx/mx-chain-core-go/marshal"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/factory"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
)

var log = logger.GetOrCreate("connectorRunner")

// ErrNilConfig signals that a nil config structure was provided
var ErrNilConfig = errors.New("nil configs provided")

type connectorRunner struct {
config *config.Config
dbMode string
}

// NewConnectorRunner will create a new connector runner instance
func NewConnectorRunner(cfg *config.Config, dbMode string) (*connectorRunner, error) {
if cfg == nil {
return nil, ErrNilConfig
}

return &connectorRunner{
config: cfg,
dbMode: dbMode,
}, nil
}

// Run will trigger connector service
func (cr *connectorRunner) Run() error {
protoMarshaller := &marshal.GogoProtoMarshalizer{}

blockContainer, err := factory.CreateBlockContainer()
if err != nil {
return err
}

blocksStorer, err := factory.CreateStorer(*cr.config, cr.dbMode)
if err != nil {
return err
}

outportBlocksPool, err := process.NewBlocksPool(blocksStorer, protoMarshaller, cr.config.DataPool.MaxDelta, cr.config.DataPool.PruningWindow)
if err != nil {
return err
}

dataAggregator, err := process.NewDataAggregator(outportBlocksPool)
if err != nil {
return err
}

publisher, err := process.NewFirehosePublisher(
os.Stdout,
blockContainer,
protoMarshaller,
)
if err != nil {
return err
}

dataProcessor, err := process.NewDataProcessor(publisher, protoMarshaller, outportBlocksPool, dataAggregator, blockContainer)
if err != nil {
return fmt.Errorf("cannot create ws firehose data processor, error: %w", err)
}

wsClient, err := factory.CreateWSConnector(cr.config.WebSocket, dataProcessor)
if err != nil {
return fmt.Errorf("cannot create ws firehose connector, error: %w", err)
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)

log.Info("starting ws client...")

<-interrupt

log.Info("application closing, calling Close on all subcomponents...")

err = outportBlocksPool.Close()
if err != nil {
log.Error(err.Error())
}

err = wsClient.Close()
if err != nil {
log.Error(err.Error())
}

return err
}
65 changes: 65 additions & 0 deletions factory/dataProcessorFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package factory

import (
"errors"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-storage-go/storageUnit"
"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
)

// ErrNotSupportedDBMode signals that an invalid db mode was provided
var ErrNotSupportedDBMode = errors.New("not supported db mode")

const (
FullPersisterDBMode = "full-persister"
ImportDBMode = "import-db"
OptimizedPersisterDBMode = "optimized-persister"
)

// CreateBlockContainer will create a new block container component
func CreateBlockContainer() (process.BlockContainerHandler, error) {
container := block.NewEmptyBlockCreatorsContainer()

err := container.Add(core.ShardHeaderV1, block.NewEmptyHeaderCreator())
if err != nil {
return nil, err
}
err = container.Add(core.ShardHeaderV2, block.NewEmptyHeaderV2Creator())
if err != nil {
return nil, err
}
err = container.Add(core.MetaHeader, block.NewEmptyMetaBlockCreator())
if err != nil {
return nil, err
}

return container, nil
}

// CreateStorer will create a new pruning storer instace
func CreateStorer(cfg config.Config, dbMode string) (process.PruningStorer, error) {
cacheConfig := storageUnit.CacheConfig{
Type: storageUnit.CacheType(cfg.OutportBlocksStorage.Cache.Type),
SizeInBytes: cfg.OutportBlocksStorage.Cache.SizeInBytes,
Capacity: cfg.OutportBlocksStorage.Cache.Capacity,
}

cacher, err := storageUnit.NewCache(cacheConfig)
if err != nil {
return nil, err
}

switch dbMode {
case FullPersisterDBMode:
return process.NewPruningStorer(cfg.OutportBlocksStorage.DB, cacher, cfg.DataPool.NumPersistersToKeep, true)
case OptimizedPersisterDBMode:
return process.NewPruningStorer(cfg.OutportBlocksStorage.DB, cacher, cfg.DataPool.NumPersistersToKeep, false)
case ImportDBMode:
return process.NewImportDBStorer(cacher)
default:
return nil, ErrNotSupportedDBMode
}
}
Loading
Loading