Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Add new event tracker flags to server command
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Oct 5, 2023
1 parent bdc00bb commit 8e5ec7f
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 158 deletions.
34 changes: 28 additions & 6 deletions command/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ type Config struct {
CorsAllowedOrigins []string `json:"cors_allowed_origins" yaml:"cors_allowed_origins"`

Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`
RelayerTrackerPollInterval time.Duration `json:"relayer_tracker_poll_interval" yaml:"relayer_tracker_poll_interval"`

ConcurrentRequestsDebug uint64 `json:"concurrent_requests_debug" yaml:"concurrent_requests_debug"`
WebSocketReadLimit uint64 `json:"web_socket_read_limit" yaml:"web_socket_read_limit"`

MetricsInterval time.Duration `json:"metrics_interval" yaml:"metrics_interval"`

// event tracker
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`
TrackerSyncBatchSize uint64 `json:"tracker_sync_batch_size" yaml:"tracker_sync_batch_size"`
TrackerBlocksToReconcile uint64 `json:"tracker_blocks_to_reconcile" yaml:"tracker_blocks_to_reconcile"`
}

// Telemetry holds the config details for metric services.
Expand Down Expand Up @@ -83,10 +87,6 @@ const (
// requests with fromBlock/toBlock values (e.g. eth_getLogs)
DefaultJSONRPCBlockRangeLimit uint64 = 1000

// DefaultNumBlockConfirmations minimal number of child blocks required for the parent block to be considered final
// on ethereum epoch lasts for 32 blocks. more details: https://www.alchemy.com/overviews/ethereum-commitment-levels
DefaultNumBlockConfirmations uint64 = 64

// DefaultConcurrentRequestsDebug specifies max number of allowed concurrent requests for debug endpoints
DefaultConcurrentRequestsDebug uint64 = 32

Expand All @@ -102,6 +102,25 @@ const (
// DefaultMetricsInterval specifies the time interval after which Prometheus metrics will be generated.
// A value of 0 means the metrics are disabled.
DefaultMetricsInterval time.Duration = time.Second * 8

// event tracker

// DefaultNumBlockConfirmations minimal number of child blocks required for the parent block to be considered final
// on ethereum epoch lasts for 32 blocks. more details: https://www.alchemy.com/overviews/ethereum-commitment-levels
DefaultNumBlockConfirmations uint64 = 64

// DefaultTrackerSyncBatchSize defines a default batch size of blocks that will be gotten from tracked chain,
// when tracker is out of sync and needs to sync a number of blocks.
DefaultTrackerSyncBatchSize uint64 = 10

// DefaultTrackerBlocksToReconcile defines how default number blocks that tracker
// will sync up from the latest block on tracked chain.
// If a node that has tracker, was offline for days, months, a year, it will miss a lot of blocks.
// In the meantime, we expect the rest of nodes to have collected the desired events and did their
// logic with them, continuing consensus and relayer stuff.
// In order to not waste too much unnecessary time in syncing all those blocks, with NumOfBlocksToReconcile,
// we tell the tracker to sync only latestBlock.Number - NumOfBlocksToReconcile number of blocks.
DefaultTrackerBlocksToReconcile uint64 = 10000
)

// DefaultConfig returns the default server configuration
Expand Down Expand Up @@ -138,11 +157,14 @@ func DefaultConfig() *Config {
JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit,
JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit,
Relayer: false,
NumBlockConfirmations: DefaultNumBlockConfirmations,
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
WebSocketReadLimit: DefaultWebSocketReadLimit,
RelayerTrackerPollInterval: DefaultRelayerTrackerPollInterval,
MetricsInterval: DefaultMetricsInterval,
// event tracker
NumBlockConfirmations: DefaultNumBlockConfirmations,
TrackerSyncBatchSize: DefaultTrackerSyncBatchSize,
TrackerBlocksToReconcile: DefaultTrackerBlocksToReconcile,
}
}

Expand Down
13 changes: 10 additions & 3 deletions command/server/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ const (
corsOriginFlag = "access-control-allow-origins"
logFileLocationFlag = "log-to"

relayerFlag = "relayer"
numBlockConfirmationsFlag = "num-block-confirmations"
relayerFlag = "relayer"

concurrentRequestsDebugFlag = "concurrent-requests-debug"
webSocketReadLimitFlag = "websocket-read-limit"

relayerTrackerPollIntervalFlag = "relayer-poll-interval"

metricsIntervalFlag = "metrics-interval"

// event tracker
numBlockConfirmationsFlag = "num-block-confirmations"
trackerSyncBatchSizeFlag = "tracker-sync-batch-size"
trackerBlocksToReconcileFlag = "tracker-blocks-to-reconcile"
)

// Flags that are deprecated, but need to be preserved for
Expand Down Expand Up @@ -190,8 +194,11 @@ func (p *serverParams) generateConfig() *server.Config {
LogFilePath: p.logFileLocation,

Relayer: p.relayer,
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
RelayerTrackerPollInterval: p.rawConfig.RelayerTrackerPollInterval,
MetricsInterval: p.rawConfig.MetricsInterval,
// event tracker
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
TrackerSyncBatchSize: p.rawConfig.TrackerSyncBatchSize,
TrackerBlocksToReconcile: p.rawConfig.TrackerBlocksToReconcile,
}
}
39 changes: 32 additions & 7 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,6 @@ func setFlags(cmd *cobra.Command) {
"start the state sync relayer service (PolyBFT only)",
)

cmd.Flags().Uint64Var(
&params.rawConfig.NumBlockConfirmations,
numBlockConfirmationsFlag,
defaultConfig.NumBlockConfirmations,
"minimal number of child blocks required for the parent block to be considered final",
)

cmd.Flags().Uint64Var(
&params.rawConfig.ConcurrentRequestsDebug,
concurrentRequestsDebugFlag,
Expand Down Expand Up @@ -256,6 +249,38 @@ func setFlags(cmd *cobra.Command) {
"the interval (in seconds) at which special metrics are generated. a value of zero means the metrics are disabled",
)

// event tracker config
cmd.Flags().Uint64Var(
&params.rawConfig.NumBlockConfirmations,
numBlockConfirmationsFlag,
defaultConfig.NumBlockConfirmations,
"minimal number of child blocks required for the parent block to be considered final",
)

cmd.Flags().Uint64Var(
&params.rawConfig.TrackerSyncBatchSize,
trackerSyncBatchSizeFlag,
defaultConfig.TrackerSyncBatchSize,
`defines a batch size of blocks that will be gotten from tracked chain,
when tracker is out of sync and needs to sync a number of blocks.
(e.g., SyncBatchSize = 10, trackers last processed block is 10, latest block on tracked chain is 100,
it will get blocks 11-20, get logs from confirmed blocks of given batch, remove processed confirm logs
from memory, and continue to the next batch)`,
)

cmd.Flags().Uint64Var(
&params.rawConfig.TrackerBlocksToReconcile,
trackerBlocksToReconcileFlag,
defaultConfig.TrackerBlocksToReconcile,
`defines how many blocks we will sync up from the latest block on tracked chain.
If a node that has tracker, was offline for days, months, a year, it will miss a lot of blocks.
In the meantime, we expect the rest of nodes to have collected the desired events and did their
logic with them, continuing consensus and relayer stuff.
In order to not waste too much unnecessary time in syncing all those blocks, with NumOfBlocksToReconcile,
we tell the tracker to sync only latestBlock.Number - NumOfBlocksToReconcile number of blocks.
If 0 is set to this flag, event tracker will sync all the blocks from tracked chain`,
)

setLegacyFlags(cmd)

setDevFlags(cmd)
Expand Down
7 changes: 5 additions & 2 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ type Params struct {
SecretsManager secrets.SecretsManager
BlockTime uint64

NumBlockConfirmations uint64
MetricsInterval time.Duration
MetricsInterval time.Duration
// event tracker
NumBlockConfirmations uint64
TrackerSyncBatchSize uint64
TrackerBlocksToReconcile uint64
}

// Factory is the factory function to create a discovery consensus
Expand Down
38 changes: 22 additions & 16 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ type guardedDataDTO struct {

// runtimeConfig is a struct that holds configuration data for given consensus runtime
type runtimeConfig struct {
PolyBFTConfig *PolyBFTConfig
DataDir string
Key *wallet.Key
State *State
blockchain blockchainBackend
polybftBackend polybftBackend
txPool txPoolInterface
bridgeTopic topic
numBlockConfirmations uint64
PolyBFTConfig *PolyBFTConfig
DataDir string
Key *wallet.Key
State *State
blockchain blockchainBackend
polybftBackend polybftBackend
txPool txPoolInterface
bridgeTopic topic
// event tracker
numBlockConfirmations uint64
trackerSyncBatchSize uint64
trackerBlocksToReconcile uint64
}

// consensusRuntime is a struct that provides consensus runtime features like epoch, state and event management
Expand Down Expand Up @@ -171,15 +174,18 @@ func (c *consensusRuntime) initStateSyncManager(logger hcf.Logger) error {
logger.Named("state-sync-manager"),
c.config.State,
&stateSyncConfig{
key: c.config.Key,
stateSenderAddr: stateSenderAddr,
stateSenderStartBlock: c.config.PolyBFTConfig.Bridge.EventTrackerStartBlocks[stateSenderAddr],
jsonrpcAddr: c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint,
dataDir: c.config.DataDir,
topic: c.config.bridgeTopic,
maxCommitmentSize: maxCommitmentSize,
key: c.config.Key,
stateSenderAddr: stateSenderAddr,
stateSenderStartBlock: c.config.PolyBFTConfig.Bridge.EventTrackerStartBlocks[stateSenderAddr],
jsonrpcAddr: c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint,
dataDir: c.config.DataDir,
topic: c.config.bridgeTopic,
maxCommitmentSize: maxCommitmentSize,
// event tracker
numBlockConfirmations: c.config.numBlockConfirmations,
blockTrackerPollInterval: c.config.PolyBFTConfig.BlockTrackerPollInterval.Duration,
trackerSyncBatchSize: c.config.trackerSyncBatchSize,
trackerBlocksToReconcile: c.config.trackerBlocksToReconcile,
},
c,
)
Expand Down
31 changes: 18 additions & 13 deletions consensus/polybft/eventtracker/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type EventTracker struct {
//
// Example Usage:
//
// config := &EventTracker{
// config := &EventTrackerConfig{
// RpcEndpoint: "http://some-json-rpc-url.com",
// StartBlockFromConfig: 100_000,
// NumBlockConfirmations: 10,
Expand All @@ -110,32 +110,36 @@ type EventTracker struct {
//
// Outputs:
// - A new instance of the EventTracker struct.
func NewEventTracker(config *EventTrackerConfig) (*EventTracker, error) {
func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) (*EventTracker, error) {
lastProcessedBlock, err := config.Store.GetLastProcessedBlock()
if err != nil {
return nil, err
}

definiteLastProcessedBlock := lastProcessedBlock
if lastProcessedBlock == 0 {
lastProcessedBlock = startBlockFromGenesis

if lastProcessedBlock == 0 && config.NumOfBlocksToReconcile > 0 {
latestBlock, err := config.BlockProvider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return nil, err
}
if config.NumOfBlocksToReconcile > 0 {
latestBlock, err := config.BlockProvider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return nil, err
}

if latestBlock.Number > config.NumOfBlocksToReconcile {
// if this is a fresh start, then we should start syncing from
// latestBlock.Number - NumOfBlocksToReconcile
definiteLastProcessedBlock = latestBlock.Number - config.NumOfBlocksToReconcile
if latestBlock.Number > config.NumOfBlocksToReconcile &&
startBlockFromGenesis < latestBlock.Number-config.NumOfBlocksToReconcile {
// if this is a fresh start, and we missed too much blocks,
// then we should start syncing from
// latestBlock.Number - NumOfBlocksToReconcile
lastProcessedBlock = latestBlock.Number - config.NumOfBlocksToReconcile
}
}
}

return &EventTracker{
config: config,
closeCh: make(chan struct{}),
blockTracker: blocktracker.NewJSONBlockTracker(config.BlockProvider),
blockContainer: NewTrackerBlockContainer(definiteLastProcessedBlock),
blockContainer: NewTrackerBlockContainer(lastProcessedBlock),
}, nil
}

Expand Down Expand Up @@ -175,6 +179,7 @@ func (e *EventTracker) Start() error {
"syncBatchSize", e.config.SyncBatchSize,
"numOfBlocksToReconcile", e.config.NumOfBlocksToReconcile,
"logFilter", e.config.LogFilter,
"lastBlockProcessed", e.blockContainer.LastProcessedBlock(),
)

ctx, cancelFn := context.WithCancel(context.Background())
Expand Down
14 changes: 7 additions & 7 deletions consensus/polybft/eventtracker/event_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
t.Run("Add block by block - no confirmed blocks", func(t *testing.T) {
t.Parallel()

tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), 0)

require.NoError(t, err)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return([]*ethgo.Log{}, nil).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return(logs, nil).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return(nil, errors.New("some error occurred")).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks + numOfCachedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfCachedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down
21 changes: 12 additions & 9 deletions consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,15 +537,18 @@ func (p *Polybft) Start() error {
// initRuntime creates consensus runtime
func (p *Polybft) initRuntime() error {
runtimeConfig := &runtimeConfig{
PolyBFTConfig: p.consensusConfig,
Key: p.key,
DataDir: p.dataDir,
State: p.state,
blockchain: p.blockchain,
polybftBackend: p,
txPool: p.txPool,
bridgeTopic: p.bridgeTopic,
numBlockConfirmations: p.config.NumBlockConfirmations,
PolyBFTConfig: p.consensusConfig,
Key: p.key,
DataDir: p.dataDir,
State: p.state,
blockchain: p.blockchain,
polybftBackend: p,
txPool: p.txPool,
bridgeTopic: p.bridgeTopic,
// event tracker
numBlockConfirmations: p.config.NumBlockConfirmations,
trackerSyncBatchSize: p.config.TrackerSyncBatchSize,
trackerBlocksToReconcile: p.config.TrackerBlocksToReconcile,
}

runtime, err := newConsensusRuntime(p.logger, runtimeConfig)
Expand Down
Loading

0 comments on commit 8e5ec7f

Please sign in to comment.