Skip to content

Commit

Permalink
Merge branch 'feat/hyperblock' into add-meta-blocks-optimistically
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed May 10, 2024
2 parents 146ea69 + fa73294 commit c668fed
Show file tree
Hide file tree
Showing 52 changed files with 3,152 additions and 2,432 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ build:

run: build
cd ${cmd_dir} && \
./${binary} --log-level="*:TRACE"
./${binary} --log-level="*:DEBUG"

debug: build
cd ${cmd_dir} && \
Expand Down
13 changes: 9 additions & 4 deletions cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@
# Defines the number of active persisters to keep open
NumPersistersToKeep = 2

# Defines the first commitable meta block round
# If it is set to 0, it will try to get last rounds info from storage,
# otherwise, it will try to sync from observers
FirstCommitableBlock = 0
# Defines the first commitable blocks per shard.
# The nonces have to be setup based on metachain shard, they can be fetched by interogating
# this gateway endpoint `https://gateway.multiversx.com/block/4294967295/by-nonce/<metachain nonce here>`
FirstCommitableBlocks = [
{ ShardID = "metachain", Nonce = 0 },
{ ShardID = "0", Nonce = 0 },
{ ShardID = "1", Nonce = 0 },
{ ShardID = "2", Nonce = 0 }
]

[Publisher]
# RetryDurationInMiliseconds defines the retry duration for common publisher
Expand Down
2 changes: 1 addition & 1 deletion cmd/connector/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

const (
defaultConfigPath = "./config/config.toml"
defaultConfigPath = "config/config.toml"
)

var (
Expand Down
21 changes: 21 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package common

import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-ws-connector-firehose-go/config"
)

// DBMode defines db mode type
type DBMode string

Expand All @@ -15,3 +20,19 @@ const (
// cache, and they will be dumped to persister when necessary
OptimizedPersisterDBMode DBMode = "optimized-persister"
)

// ConvertFirstCommitableBlocks will convert first commitable blocks map
func ConvertFirstCommitableBlocks(blocks []config.FirstCommitableBlock) (map[uint32]uint64, error) {
newBlocks := make(map[uint32]uint64, len(blocks))

for _, firstCommitableBlock := range blocks {
shardID, err := core.ConvertShardIDToUint32(firstCommitableBlock.ShardID)
if err != nil {
return nil, err
}

newBlocks[shardID] = firstCommitableBlock.Nonce
}

return newBlocks, nil
}
16 changes: 11 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ type WebSocketConfig struct {
Version uint32
}

// DataPoolConfig will map data poil configuration
// DataPoolConfig will map data pool configuration
type DataPoolConfig struct {
MaxDelta uint64
PruningWindow uint64
NumPersistersToKeep int
FirstCommitableBlock uint64
MaxDelta uint64
PruningWindow uint64
NumPersistersToKeep int
FirstCommitableBlocks []FirstCommitableBlock
}

// FirstCommitableBlock will map first commitable block configuration
type FirstCommitableBlock struct {
ShardID string
Nonce uint64
}

// PublisherConfig will map publisher configuration
Expand Down
51 changes: 28 additions & 23 deletions connector/connectorRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,58 +45,62 @@ func (cr *connectorRunner) Run() error {
gogoProtoMarshaller := &marshal.GogoProtoMarshalizer{}
protoMarshaller := &process.ProtoMarshaller{}

outportBlockConverter := process.NewOutportBlockConverter(gogoProtoMarshaller, protoMarshaller)
firstCommitableBlocks, err := common.ConvertFirstCommitableBlocks(cr.config.DataPool.FirstCommitableBlocks)
if err != nil {
return err
}

blockContainer, err := factory.CreateBlockContainer()
outportBlockConverter, err := process.NewOutportBlockConverter(gogoProtoMarshaller, protoMarshaller)
if err != nil {
return err
}

blocksStorer, err := factory.CreateStorer(*cr.config, cr.dbMode)
blockContainer, err := factory.CreateBlockContainer()
if err != nil {
return err
}

baseBlocksPool, err := process.NewBlocksPool(
blocksStorer,
protoMarshaller,
cr.config.DataPool.MaxDelta,
cr.config.DataPool.PruningWindow,
cr.config.DataPool.FirstCommitableBlock,
)
blocksStorer, err := factory.CreateStorer(*cr.config, cr.dbMode)
if err != nil {
return err
}

outportBlocksPool, err := process.NewHyperOutportBlocksPool(
baseBlocksPool,
protoMarshaller,
)
argsBlocksPool := process.DataPoolArgs{
Storer: blocksStorer,
Marshaller: protoMarshaller,
MaxDelta: cr.config.DataPool.MaxDelta,
CleanupInterval: cr.config.DataPool.PruningWindow,
FirstCommitableBlocks: firstCommitableBlocks,
}
dataPool, err := process.NewDataPool(argsBlocksPool)
if err != nil {
return err
}

dataAggregator, err := process.NewDataAggregator(outportBlocksPool)
blocksPool, err := process.NewBlocksPool(
dataPool,
protoMarshaller,
)
if err != nil {
return err
}

server, err := factory.CreateGRPCServer(cr.enableGrpcServer, cr.config.GRPC, outportBlocksPool, dataAggregator)
dataAggregator, err := process.NewDataAggregator(blocksPool)
if err != nil {
return err
}

hyperBlockPublisher, err := factory.CreatePublisher(cr.enableGrpcServer, blockContainer)
hyperBlockPublisher, err := factory.CreatePublisher(cr.config, cr.enableGrpcServer, blockContainer, blocksPool, dataAggregator)
if err != nil {
return fmt.Errorf("cannot create publisher: %w", err)
}

publisherHandler, err := process.NewPublisherHandler(
hyperBlockPublisher,
outportBlocksPool,
blocksPool,
dataAggregator,
cr.config.Publisher.RetryDurationInMiliseconds,
cr.config.DataPool.FirstCommitableBlock,
firstCommitableBlocks,
)
if err != nil {
return fmt.Errorf("cannot create common publisher: %w", err)
Expand All @@ -105,7 +109,7 @@ func (cr *connectorRunner) Run() error {
dataProcessor, err := process.NewDataProcessor(
publisherHandler,
gogoProtoMarshaller,
outportBlocksPool,
blocksPool,
outportBlockConverter,
)
if err != nil {
Expand All @@ -126,7 +130,7 @@ func (cr *connectorRunner) Run() error {

log.Info("application closing, calling Close on all subcomponents...")

err = outportBlocksPool.Close()
err = publisherHandler.Close()
if err != nil {
log.Error(err.Error())
}
Expand All @@ -136,8 +140,9 @@ func (cr *connectorRunner) Run() error {
log.Error(err.Error())
}

if server != nil {
server.Close()
err = blocksPool.Close()
if err != nil {
log.Error(err.Error())
}

return err
Expand Down
20 changes: 10 additions & 10 deletions data/blockCheckpoint.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion data/blockCheckpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ package proto;
option go_package = "github.com/multiversx/mx-chain-ws-connector-template-go/data;data";

message BlockCheckpoint {
map <uint32, uint64> LastRounds = 1;
map <uint32, uint64> LastNonces = 1;
}
Loading

0 comments on commit c668fed

Please sign in to comment.