diff --git a/consensus/polybft/eventtracker/event_tracker.go b/consensus/polybft/eventtracker/event_tracker.go index effaa95a6f..f0e7d9f37b 100644 --- a/consensus/polybft/eventtracker/event_tracker.go +++ b/consensus/polybft/eventtracker/event_tracker.go @@ -62,9 +62,6 @@ type EventTrackerConfig struct { // LogFilter defines which events are tracked and from which contracts on the tracked chain LogFilter map[ethgo.Address][]ethgo.Hash - // Store is the store implementation for data that tracker saves (lastProcessedBlock and logs) - Store EventTrackerStore - // BlockProvider is the implementation of a provider that returns blocks and logs from tracked chain BlockProvider BlockProvider @@ -81,6 +78,9 @@ type EventTracker struct { blockTracker blocktracker.BlockTrackerInterface blockContainer *TrackerBlockContainer + + // store is the store implementation for data that tracker saves (lastProcessedBlock and logs) + store EventTrackerStore } // NewEventTracker is a constructor function that creates a new instance of the EventTracker struct. @@ -89,10 +89,9 @@ type EventTracker struct { // // config := &EventTrackerConfig{ // RpcEndpoint: "http://some-json-rpc-url.com", -// StartBlockFromConfig: 100_000, // NumBlockConfirmations: 10, // SyncBatchSize: 20, -// MaxBacklogSize: 10_000, +// NumOfBlocksToReconcile:10_000, // PollInterval: 2 * time.Second, // Logger: logger, // Store: store, @@ -103,15 +102,16 @@ type EventTracker struct { // IDs: []ethgo.Hash{idHashOfSomeEvent}, // }, // } -// t := NewEventTracker(config) +// t := NewEventTracker(config, store) // // Inputs: // - config (TrackerConfig): configuration of EventTracker. // // Outputs: // - A new instance of the EventTracker struct. -func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) (*EventTracker, error) { - lastProcessedBlock, err := config.Store.GetLastProcessedBlock() +func NewEventTracker(config *EventTrackerConfig, store EventTrackerStore, + startBlockFromGenesis uint64) (*EventTracker, error) { + lastProcessedBlock, err := store.GetLastProcessedBlock() if err != nil { return nil, err } @@ -137,6 +137,7 @@ func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) ( return &EventTracker{ config: config, + store: store, closeCh: make(chan struct{}), blockTracker: blocktracker.NewJSONBlockTracker(config.BlockProvider), blockContainer: NewTrackerBlockContainer(lastProcessedBlock), @@ -166,7 +167,7 @@ func (e *EventTracker) Close() { // Start is a method in the EventTracker struct that starts the tracking of blocks // and retrieval of logs from given blocks from the tracked chain. // If the tracker was turned off (node was down) for some time, it will sync up all the missed -// blocks and logs from the last start (in regards to MaxBacklogSize field in config). +// blocks and logs from the last start (in regards to NumOfBlocksToReconcile field in config). // // Returns: // - nil if start passes successfully. @@ -416,7 +417,7 @@ func (e *EventTracker) processLogs() error { } } - if err := e.config.Store.InsertLastProcessedBlock(toBlock); err != nil { + if err := e.store.InsertLastProcessedBlock(toBlock); err != nil { e.config.Logger.Error("Process logs failed on saving last processed block", "fromBlock", fromBlock, "toBlock", toBlock, @@ -425,7 +426,7 @@ func (e *EventTracker) processLogs() error { return err } - if err := e.config.Store.InsertLogs(filteredLogs); err != nil { + if err := e.store.InsertLogs(filteredLogs); err != nil { e.config.Logger.Error("Process logs failed on saving logs to store", "fromBlock", fromBlock, "toBlock", toBlock, diff --git a/consensus/polybft/eventtracker/event_tracker_fuzz_test.go b/consensus/polybft/eventtracker/event_tracker_fuzz_test.go index d2e560e5bc..2400518ca1 100644 --- a/consensus/polybft/eventtracker/event_tracker_fuzz_test.go +++ b/consensus/polybft/eventtracker/event_tracker_fuzz_test.go @@ -16,7 +16,7 @@ type getNewStateF struct { LastProcessed uint64 BatchSize uint64 NumBlockConfirmations uint64 - MaxBackLogSize uint64 + NumBlocksToReconcile uint64 } func FuzzGetNewState(f *testing.F) { @@ -27,7 +27,7 @@ func FuzzGetNewState(f *testing.F) { LastProcessed: 9, BatchSize: 5, NumBlockConfirmations: 3, - MaxBackLogSize: 1000, + NumBlocksToReconcile: 1000, }, { Address: types.Address(types.StringToAddress("1").Bytes()), @@ -35,7 +35,7 @@ func FuzzGetNewState(f *testing.F) { LastProcessed: 29, BatchSize: 5, NumBlockConfirmations: 3, - MaxBackLogSize: 1000, + NumBlocksToReconcile: 1000, }, { Address: types.Address(types.StringToAddress("2").Bytes()), @@ -43,7 +43,7 @@ func FuzzGetNewState(f *testing.F) { LastProcessed: 10, BatchSize: 10, NumBlockConfirmations: 3, - MaxBackLogSize: 15, + NumBlocksToReconcile: 15, }, } @@ -74,12 +74,13 @@ func FuzzGetNewState(f *testing.F) { } providerMock.On("GetLogs", mock.Anything).Return(logs, nil) - testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.MaxBackLogSize) + testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.NumBlocksToReconcile) testConfig.BlockProvider = providerMock eventTracker := &EventTracker{ config: testConfig, blockContainer: NewTrackerBlockContainer(data.LastProcessed), + store: newTestTrackerStore(t), } require.NoError(t, eventTracker.getNewState(ðgo.Block{Number: data.Number})) diff --git a/consensus/polybft/eventtracker/event_tracker_test.go b/consensus/polybft/eventtracker/event_tracker_test.go index 619a8d4a9c..907f7da651 100644 --- a/consensus/polybft/eventtracker/event_tracker_test.go +++ b/consensus/polybft/eventtracker/event_tracker_test.go @@ -89,7 +89,7 @@ func TestEventTracker_TrackBlock(t *testing.T) { t.Run("Add block by block - no confirmed blocks", func(t *testing.T) { t.Parallel() - tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), newTestTrackerStore(t), 0) require.NoError(t, err) @@ -112,7 +112,7 @@ func TestEventTracker_TrackBlock(t *testing.T) { // check that the last processed block is 0, since we did not have any confirmed blocks require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlockLocked()) - lastProcessedBlockInStore, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedBlockInStore, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, uint64(0), lastProcessedBlockInStore) @@ -131,7 +131,8 @@ func TestEventTracker_TrackBlock(t *testing.T) { blockProviderMock := new(mockProvider) blockProviderMock.On("GetLogs", mock.Anything).Return([]*ethgo.Log{}, nil).Once() - tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), + newTestTrackerStore(t), 0) require.NoError(t, err) tracker.config.BlockProvider = blockProviderMock @@ -164,7 +165,7 @@ func TestEventTracker_TrackBlock(t *testing.T) { // check if the last confirmed block processed is as expected require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock()) // check if the last confirmed block is saved in db as well - lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock) // check that in memory cache removed processed confirmed logs @@ -197,7 +198,8 @@ func TestEventTracker_TrackBlock(t *testing.T) { blockProviderMock := new(mockProvider) blockProviderMock.On("GetLogs", mock.Anything).Return(logs, nil).Once() - tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), + newTestTrackerStore(t), 0) require.NoError(t, err) tracker.config.BlockProvider = blockProviderMock @@ -230,12 +232,12 @@ func TestEventTracker_TrackBlock(t *testing.T) { // check if the last confirmed block processed is as expected require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock()) // check if the last confirmed block is saved in db as well - lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock) // check if we have logs in store for _, log := range logs { - logFromDB, err := tracker.config.Store.GetLog(log.BlockNumber, log.LogIndex) + logFromDB, err := tracker.store.GetLog(log.BlockNumber, log.LogIndex) require.NoError(t, err) require.Equal(t, log.Address, logFromDB.Address) require.Equal(t, log.BlockNumber, log.BlockNumber) @@ -265,7 +267,8 @@ func TestEventTracker_TrackBlock(t *testing.T) { blockProviderMock := new(mockProvider) blockProviderMock.On("GetLogs", mock.Anything).Return(nil, errors.New("some error occurred")).Once() - tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), + newTestTrackerStore(t), 0) require.NoError(t, err) tracker.config.BlockProvider = blockProviderMock @@ -298,7 +301,7 @@ func TestEventTracker_TrackBlock(t *testing.T) { // check if the last confirmed block processed is as expected, in this case 0, because an error occurred require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlock()) // check if the last confirmed block is saved in db as well - lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, uint64(0), lastProcessedConfirmedBlock) // check that in memory cache nothing got removed, and that we have the latest block as well @@ -335,7 +338,8 @@ func TestEventTracker_TrackBlock(t *testing.T) { // just mock the call, it will use the provider.blocks map to handle proper returns blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks)) - tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), + newTestTrackerStore(t), 0) require.NoError(t, err) tracker.config.BlockProvider = blockProviderMock @@ -369,11 +373,11 @@ func TestEventTracker_TrackBlock(t *testing.T) { expectedLastProcessed := numOfMissedBlocks + 1 - numBlockConfirmations require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock()) // check if the last confirmed block is saved in db as well - lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock) // check if we have logs in store - logsFromDB, err := tracker.config.Store.GetAllLogs() + logsFromDB, err := tracker.store.GetAllLogs() require.NoError(t, err) require.Len(t, logsFromDB, len(logs)) @@ -420,7 +424,8 @@ func TestEventTracker_TrackBlock(t *testing.T) { // just mock the call, it will use the provider.blocks map to handle proper returns blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks + numOfCachedBlocks)) - tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), + newTestTrackerStore(t), 0) require.NoError(t, err) tracker.config.BlockProvider = blockProviderMock @@ -467,11 +472,11 @@ func TestEventTracker_TrackBlock(t *testing.T) { expectedLastProcessed := numOfMissedBlocks + numOfCachedBlocks + 1 - numBlockConfirmations require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock()) // check if the last confirmed block is saved in db as well - lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock) // check if we have logs in store - logsFromDB, err := tracker.config.Store.GetAllLogs() + logsFromDB, err := tracker.store.GetAllLogs() require.NoError(t, err) require.Len(t, logsFromDB, len(logs)) @@ -515,7 +520,8 @@ func TestEventTracker_TrackBlock(t *testing.T) { // just mock the call, it will use the provider.blocks map to handle proper returns blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfCachedBlocks)) - tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0) + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), + newTestTrackerStore(t), 0) require.NoError(t, err) tracker.config.BlockProvider = blockProviderMock @@ -560,11 +566,11 @@ func TestEventTracker_TrackBlock(t *testing.T) { expectedLastProcessed := numOfCachedBlocks + 1 - numBlockConfirmations require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock()) // check if the last confirmed block is saved in db as well - lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock() + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() require.NoError(t, err) require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock) // check if we have logs in store - logsFromDB, err := tracker.config.Store.GetAllLogs() + logsFromDB, err := tracker.store.GetAllLogs() require.NoError(t, err) require.Len(t, logsFromDB, len(logs)) @@ -598,7 +604,6 @@ func createTestTrackerConfig(t *testing.T, numBlockConfirmations, batchSize, LogFilter: map[ethgo.Address][]ethgo.Hash{ ethgo.ZeroAddress: {stateSyncEvent.Sig()}, }, - Store: newTestTrackerStore(t), EventSubscriber: new(mockEventSubscriber), BlockProvider: new(mockProvider), } diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index 805b65e14d..ae92b4cd81 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -106,6 +106,7 @@ type stateSyncManager struct { nextCommittedIndex uint64 runtime Runtime + tracker *eventtracker.EventTracker } // topic is an interface for p2p message gossiping @@ -141,6 +142,10 @@ func (s *stateSyncManager) Init() error { func (s *stateSyncManager) Close() { close(s.closeCh) + + if s.tracker != nil { + s.tracker.Close() + } } // initTracker sets up and starts the event tracker implementation @@ -164,17 +169,18 @@ func (s *stateSyncManager) initTracker() error { NumOfBlocksToReconcile: s.config.trackerBlocksToReconcile, PollInterval: s.config.blockTrackerPollInterval, Logger: s.logger, - Store: store, EventSubscriber: s, BlockProvider: clt.Eth(), LogFilter: map[ethgo.Address][]ethgo.Hash{ ethgo.Address(s.config.stateSenderAddr): {stateSyncEvent.Sig()}, }, - }, s.config.stateSenderStartBlock) + }, store, s.config.stateSenderStartBlock) if err != nil { return err } + s.tracker = tracker + return tracker.Start() }