Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Comments fix
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Oct 18, 2023
1 parent 81b6562 commit 24f86d2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 37 deletions.
23 changes: 12 additions & 11 deletions consensus/polybft/eventtracker/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type EventTrackerConfig struct {
// LogFilter defines which events are tracked and from which contracts on the tracked chain
LogFilter map[ethgo.Address][]ethgo.Hash

// Store is the store implementation for data that tracker saves (lastProcessedBlock and logs)
Store EventTrackerStore

// BlockProvider is the implementation of a provider that returns blocks and logs from tracked chain
BlockProvider BlockProvider

Expand All @@ -81,6 +78,9 @@ type EventTracker struct {

blockTracker blocktracker.BlockTrackerInterface
blockContainer *TrackerBlockContainer

// store is the store implementation for data that tracker saves (lastProcessedBlock and logs)
store EventTrackerStore
}

// NewEventTracker is a constructor function that creates a new instance of the EventTracker struct.
Expand All @@ -89,10 +89,9 @@ type EventTracker struct {
//
// config := &EventTrackerConfig{
// RpcEndpoint: "http://some-json-rpc-url.com",
// StartBlockFromConfig: 100_000,
// NumBlockConfirmations: 10,
// SyncBatchSize: 20,
// MaxBacklogSize: 10_000,
// NumOfBlocksToReconcile:10_000,
// PollInterval: 2 * time.Second,
// Logger: logger,
// Store: store,
Expand All @@ -103,15 +102,16 @@ type EventTracker struct {
// IDs: []ethgo.Hash{idHashOfSomeEvent},
// },
// }
// t := NewEventTracker(config)
// t := NewEventTracker(config, store)
//
// Inputs:
// - config (TrackerConfig): configuration of EventTracker.
//
// Outputs:
// - A new instance of the EventTracker struct.
func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) (*EventTracker, error) {
lastProcessedBlock, err := config.Store.GetLastProcessedBlock()
func NewEventTracker(config *EventTrackerConfig, store EventTrackerStore,
startBlockFromGenesis uint64) (*EventTracker, error) {
lastProcessedBlock, err := store.GetLastProcessedBlock()
if err != nil {
return nil, err
}
Expand All @@ -137,6 +137,7 @@ func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) (

return &EventTracker{
config: config,
store: store,
closeCh: make(chan struct{}),
blockTracker: blocktracker.NewJSONBlockTracker(config.BlockProvider),
blockContainer: NewTrackerBlockContainer(lastProcessedBlock),
Expand Down Expand Up @@ -166,7 +167,7 @@ func (e *EventTracker) Close() {
// Start is a method in the EventTracker struct that starts the tracking of blocks
// and retrieval of logs from given blocks from the tracked chain.
// If the tracker was turned off (node was down) for some time, it will sync up all the missed
// blocks and logs from the last start (in regards to MaxBacklogSize field in config).
// blocks and logs from the last start (in regards to NumOfBlocksToReconcile field in config).
//
// Returns:
// - nil if start passes successfully.
Expand Down Expand Up @@ -416,7 +417,7 @@ func (e *EventTracker) processLogs() error {
}
}

if err := e.config.Store.InsertLastProcessedBlock(toBlock); err != nil {
if err := e.store.InsertLastProcessedBlock(toBlock); err != nil {
e.config.Logger.Error("Process logs failed on saving last processed block",
"fromBlock", fromBlock,
"toBlock", toBlock,
Expand All @@ -425,7 +426,7 @@ func (e *EventTracker) processLogs() error {
return err
}

if err := e.config.Store.InsertLogs(filteredLogs); err != nil {
if err := e.store.InsertLogs(filteredLogs); err != nil {
e.config.Logger.Error("Process logs failed on saving logs to store",
"fromBlock", fromBlock,
"toBlock", toBlock,
Expand Down
11 changes: 6 additions & 5 deletions consensus/polybft/eventtracker/event_tracker_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type getNewStateF struct {
LastProcessed uint64
BatchSize uint64
NumBlockConfirmations uint64
MaxBackLogSize uint64
NumBlocksToReconcile uint64
}

func FuzzGetNewState(f *testing.F) {
Expand All @@ -27,23 +27,23 @@ func FuzzGetNewState(f *testing.F) {
LastProcessed: 9,
BatchSize: 5,
NumBlockConfirmations: 3,
MaxBackLogSize: 1000,
NumBlocksToReconcile: 1000,
},
{
Address: types.Address(types.StringToAddress("1").Bytes()),
Number: 30,
LastProcessed: 29,
BatchSize: 5,
NumBlockConfirmations: 3,
MaxBackLogSize: 1000,
NumBlocksToReconcile: 1000,
},
{
Address: types.Address(types.StringToAddress("2").Bytes()),
Number: 100,
LastProcessed: 10,
BatchSize: 10,
NumBlockConfirmations: 3,
MaxBackLogSize: 15,
NumBlocksToReconcile: 15,
},
}

Expand Down Expand Up @@ -74,12 +74,13 @@ func FuzzGetNewState(f *testing.F) {
}
providerMock.On("GetLogs", mock.Anything).Return(logs, nil)

testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.MaxBackLogSize)
testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.NumBlocksToReconcile)
testConfig.BlockProvider = providerMock

eventTracker := &EventTracker{
config: testConfig,
blockContainer: NewTrackerBlockContainer(data.LastProcessed),
store: newTestTrackerStore(t),
}

require.NoError(t, eventTracker.getNewState(&ethgo.Block{Number: data.Number}))
Expand Down
43 changes: 24 additions & 19 deletions consensus/polybft/eventtracker/event_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
t.Run("Add block by block - no confirmed blocks", func(t *testing.T) {
t.Parallel()

tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), newTestTrackerStore(t), 0)

require.NoError(t, err)

Expand All @@ -112,7 +112,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {

// check that the last processed block is 0, since we did not have any confirmed blocks
require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlockLocked())
lastProcessedBlockInStore, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedBlockInStore, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, uint64(0), lastProcessedBlockInStore)

Expand All @@ -131,7 +131,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return([]*ethgo.Log{}, nil).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
newTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -164,7 +165,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// check if the last confirmed block processed is as expected
require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock)
// check that in memory cache removed processed confirmed logs
Expand Down Expand Up @@ -197,7 +198,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return(logs, nil).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
newTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -230,12 +232,12 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// check if the last confirmed block processed is as expected
require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock)
// check if we have logs in store
for _, log := range logs {
logFromDB, err := tracker.config.Store.GetLog(log.BlockNumber, log.LogIndex)
logFromDB, err := tracker.store.GetLog(log.BlockNumber, log.LogIndex)
require.NoError(t, err)
require.Equal(t, log.Address, logFromDB.Address)
require.Equal(t, log.BlockNumber, log.BlockNumber)
Expand Down Expand Up @@ -265,7 +267,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return(nil, errors.New("some error occurred")).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
newTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -298,7 +301,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// check if the last confirmed block processed is as expected, in this case 0, because an error occurred
require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, uint64(0), lastProcessedConfirmedBlock)
// check that in memory cache nothing got removed, and that we have the latest block as well
Expand Down Expand Up @@ -335,7 +338,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
newTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -369,11 +373,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
expectedLastProcessed := numOfMissedBlocks + 1 - numBlockConfirmations
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
// check if we have logs in store
logsFromDB, err := tracker.config.Store.GetAllLogs()
logsFromDB, err := tracker.store.GetAllLogs()
require.NoError(t, err)
require.Len(t, logsFromDB, len(logs))

Expand Down Expand Up @@ -420,7 +424,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks + numOfCachedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
newTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -467,11 +472,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
expectedLastProcessed := numOfMissedBlocks + numOfCachedBlocks + 1 - numBlockConfirmations
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
// check if we have logs in store
logsFromDB, err := tracker.config.Store.GetAllLogs()
logsFromDB, err := tracker.store.GetAllLogs()
require.NoError(t, err)
require.Len(t, logsFromDB, len(logs))

Expand Down Expand Up @@ -515,7 +520,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfCachedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
newTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -560,11 +566,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
expectedLastProcessed := numOfCachedBlocks + 1 - numBlockConfirmations
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
// check if we have logs in store
logsFromDB, err := tracker.config.Store.GetAllLogs()
logsFromDB, err := tracker.store.GetAllLogs()
require.NoError(t, err)
require.Len(t, logsFromDB, len(logs))

Expand Down Expand Up @@ -598,7 +604,6 @@ func createTestTrackerConfig(t *testing.T, numBlockConfirmations, batchSize,
LogFilter: map[ethgo.Address][]ethgo.Hash{
ethgo.ZeroAddress: {stateSyncEvent.Sig()},
},
Store: newTestTrackerStore(t),
EventSubscriber: new(mockEventSubscriber),
BlockProvider: new(mockProvider),
}
Expand Down
10 changes: 8 additions & 2 deletions consensus/polybft/state_sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type stateSyncManager struct {
nextCommittedIndex uint64

runtime Runtime
tracker *eventtracker.EventTracker
}

// topic is an interface for p2p message gossiping
Expand Down Expand Up @@ -141,6 +142,10 @@ func (s *stateSyncManager) Init() error {

func (s *stateSyncManager) Close() {
close(s.closeCh)

if s.tracker != nil {
s.tracker.Close()
}
}

// initTracker sets up and starts the event tracker implementation
Expand All @@ -164,17 +169,18 @@ func (s *stateSyncManager) initTracker() error {
NumOfBlocksToReconcile: s.config.trackerBlocksToReconcile,
PollInterval: s.config.blockTrackerPollInterval,
Logger: s.logger,
Store: store,
EventSubscriber: s,
BlockProvider: clt.Eth(),
LogFilter: map[ethgo.Address][]ethgo.Hash{
ethgo.Address(s.config.stateSenderAddr): {stateSyncEvent.Sig()},
},
}, s.config.stateSenderStartBlock)
}, store, s.config.stateSenderStartBlock)
if err != nil {
return err
}

s.tracker = tracker

return tracker.Start()
}

Expand Down

0 comments on commit 24f86d2

Please sign in to comment.