diff --git a/consensus/polybft/consensus_runtime.go b/consensus/polybft/consensus_runtime.go index 9ba342b95b..ab4607dcc1 100644 --- a/consensus/polybft/consensus_runtime.go +++ b/consensus/polybft/consensus_runtime.go @@ -182,6 +182,7 @@ func (c *consensusRuntime) initStateSyncManager(logger hcf.Logger) error { blockTrackerPollInterval: c.config.PolyBFTConfig.BlockTrackerPollInterval.Duration, }, c, + c.config.blockchain, ) c.stateSyncManager = stateSyncManager diff --git a/consensus/polybft/state_store_state_sync.go b/consensus/polybft/state_store_state_sync.go index d15606500b..b70d28faa2 100644 --- a/consensus/polybft/state_store_state_sync.go +++ b/consensus/polybft/state_store_state_sync.go @@ -76,6 +76,28 @@ func (s *StateSyncStore) insertStateSyncEvent(event *contractsapi.StateSyncedEve }) } +// removeStateSyncEventsAndProofs removes state sync events and their proofs from the buckets in db +func (s *StateSyncStore) removeStateSyncEventsAndProofs(stateSyncEventIDs []uint64) error { + return s.db.Update(func(tx *bolt.Tx) error { + eventsBucket := tx.Bucket(stateSyncEventsBucket) + proofsBucket := tx.Bucket(stateSyncProofsBucket) + + for _, stateSyncEventID := range stateSyncEventIDs { + stateSyncEventIDKey := common.EncodeUint64ToBytes(stateSyncEventID) + + if err := eventsBucket.Delete(stateSyncEventIDKey); err != nil { + return fmt.Errorf("failed to remove state sync event (ID=%d): %w", stateSyncEventID, err) + } + + if err := proofsBucket.Delete(stateSyncEventIDKey); err != nil { + return fmt.Errorf("failed to remove state sync event proof (ID=%d): %w", stateSyncEventID, err) + } + } + + return nil + }) +} + // list iterates through all events in events bucket in db, un-marshals them, and returns as array func (s *StateSyncStore) list() ([]*contractsapi.StateSyncedEvent, error) { events := []*contractsapi.StateSyncedEvent{} diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index 0581950b7d..beccc1b4ba 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -16,6 +16,7 @@ import ( bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer" "github.com/0xPolygon/polygon-edge/consensus/polybft/validator" "github.com/0xPolygon/polygon-edge/consensus/polybft/wallet" + "github.com/0xPolygon/polygon-edge/contracts" "github.com/0xPolygon/polygon-edge/tracker" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" @@ -91,6 +92,9 @@ type stateSyncManager struct { nextCommittedIndex uint64 runtime Runtime + + // eventsGetter gets StateSyncResult events (missed or current) from blocks + eventsGetter *eventsGetter[*contractsapi.StateSyncResultEvent] } // topic is an interface for p2p message gossiping @@ -101,13 +105,27 @@ type topic interface { // newStateSyncManager creates a new instance of state sync manager func newStateSyncManager(logger hclog.Logger, state *State, config *stateSyncConfig, - runtime Runtime) *stateSyncManager { + runtime Runtime, blockchain blockchainBackend) *stateSyncManager { + eventsGetter := &eventsGetter[*contractsapi.StateSyncResultEvent]{ + blockchain: blockchain, + isValidLogFn: func(l *types.Log) bool { + return l.Address == contracts.StateReceiverContract + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.StateSyncResultEvent, bool, error) { + var stateSyncResultEvent contractsapi.StateSyncResultEvent + matches, err := stateSyncResultEvent.ParseLog(l) + + return &stateSyncResultEvent, matches, err + }, + } + return &stateSyncManager{ - logger: logger, - state: state, - config: config, - closeCh: make(chan struct{}), - runtime: runtime, + logger: logger, + state: state, + config: config, + closeCh: make(chan struct{}), + runtime: runtime, + eventsGetter: eventsGetter, } } @@ -399,14 +417,33 @@ func (s *stateSyncManager) PostEpoch(req *PostEpochRequest) error { } // PostBlock notifies state sync manager that a block was finalized, -// so that it can build state sync proofs if a block has a commitment submission transaction +// so that it can build state sync proofs if a block has a commitment submission transaction. +// Additionally, it will remove any processed state sync events and their proofs from the store. func (s *stateSyncManager) PostBlock(req *PostBlockRequest) error { + events, err := s.eventsGetter.getEventsFromReceipts(req.FullBlock.Block.Header, req.FullBlock.Receipts) + if err != nil { + s.logger.Info("failed to retrieve processed state sync result events from block", "error", err) + } else { + processedStateSyncEventIDs := make([]uint64, 0, len(events)) + for _, event := range events { + if event.Status { + processedStateSyncEventIDs = append(processedStateSyncEventIDs, event.Counter.Uint64()) + } + } + + if len(processedStateSyncEventIDs) > 0 { + if err = s.state.StateSyncStore.removeStateSyncEventsAndProofs(processedStateSyncEventIDs); err != nil { + s.logger.Info("failed to remove processed state sync events data from store", "error", err) + } + } + } + commitment, err := getCommitmentMessageSignedTx(req.FullBlock.Block.Transactions) if err != nil { return err } - // no commitment message -> this is not end of epoch block + // no commitment message -> this is not end of sprint block if commitment == nil { return nil } diff --git a/consensus/polybft/state_sync_manager_test.go b/consensus/polybft/state_sync_manager_test.go index 7482d2ee08..13ab8f8e46 100644 --- a/consensus/polybft/state_sync_manager_test.go +++ b/consensus/polybft/state_sync_manager_test.go @@ -18,6 +18,8 @@ import ( "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer" "github.com/0xPolygon/polygon-edge/consensus/polybft/validator" + "github.com/0xPolygon/polygon-edge/contracts" + "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/merkle-tree" "github.com/0xPolygon/polygon-edge/types" ) @@ -33,6 +35,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime topic := &mockTopic{} + blockchainBackend := new(blockchainMock) s := newStateSyncManager(hclog.NewNullLogger(), state, &stateSyncConfig{ stateSenderAddr: types.Address{}, @@ -41,7 +44,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime topic: topic, key: key.Key(), maxCommitmentSize: maxCommitmentSize, - }, runtime) + }, runtime, blockchainBackend) t.Cleanup(func() { os.RemoveAll(tmpDir) @@ -324,6 +327,90 @@ func TestStateSyncerManager_BuildProofs(t *testing.T) { } } +func TestStateSyncerManager_RemoveProcessedEventsAndProofs(t *testing.T) { + const stateSyncEventsCount = 5 + + vals := validator.NewTestValidators(t, 5) + + s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true}) + + for _, event := range generateStateSyncEvents(t, stateSyncEventsCount, 0) { + require.NoError(t, s.state.StateSyncStore.insertStateSyncEvent(event)) + } + + require.NoError(t, s.buildCommitment()) + require.Len(t, s.pendingCommitments, 1) + + mockMsg := &CommitmentMessageSigned{ + Message: &contractsapi.StateSyncCommitment{ + StartID: s.pendingCommitments[0].StartID, + EndID: s.pendingCommitments[0].EndID, + }, + } + + txData, err := mockMsg.EncodeAbi() + require.NoError(t, err) + + tx := createStateTransactionWithData(1, types.Address{}, txData) + + req := &PostBlockRequest{ + FullBlock: &types.FullBlock{ + Block: &types.Block{ + Header: &types.Header{Number: 1}, + Transactions: []*types.Transaction{tx}, + }, + }, + } + + // PostBlock() inserts commitment and proofs into the store + require.NoError(t, s.PostBlock(req)) + + // check the state after executing first PostBlock() + require.Equal(t, mockMsg.Message.EndID.Uint64()+1, s.nextCommittedIndex) + + stateSyncEventsBefore, err := s.state.StateSyncStore.list() + require.NoError(t, err) + require.Equal(t, stateSyncEventsCount, len(stateSyncEventsBefore)) + + for i := 0; i < stateSyncEventsCount; i++ { + proof, err := s.state.StateSyncStore.getStateSyncProof(uint64(i)) + require.NoError(t, err) + require.NotNil(t, proof) + } + + // create second PostBlockRequest to remove processed events and proofs from the store + req = &PostBlockRequest{ + FullBlock: &types.FullBlock{ + Block: &types.Block{ + Header: &types.Header{Number: 2}, + }, + }, + } + + // add receipts with executed StateSyncResult logs + receiptSuccess := types.ReceiptSuccess + + req.FullBlock.Receipts = make([]*types.Receipt, stateSyncEventsCount) + for i := uint64(0); i < stateSyncEventsCount; i++ { + req.FullBlock.Receipts[i] = &types.Receipt{ + Status: &receiptSuccess, + Logs: []*types.Log{createTestLogForStateSyncResultEvent(t, i)}} + } + + require.NoError(t, s.PostBlock(req)) + + // all state sync events and their proofs should be removed from the store + stateSyncEventsAfter, err := s.state.StateSyncStore.list() + require.NoError(t, err) + require.Equal(t, 0, len(stateSyncEventsAfter)) + + for i := uint64(0); i < stateSyncEventsCount; i++ { + proof, err := s.state.StateSyncStore.getStateSyncProof(i) + require.NoError(t, err) + require.Nil(t, proof) + } +} + func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { t.Parallel() @@ -571,6 +658,26 @@ func TestStateSyncManager_GetProofs_NoProof_BuildProofs(t *testing.T) { require.NoError(t, commitment.VerifyStateSyncProof(proof.Data, stateSync)) } +func createTestLogForStateSyncResultEvent(t *testing.T, stateSyncEventID uint64) *types.Log { + t.Helper() + + var stateSyncResultEvent contractsapi.StateSyncResultEvent + + topics := make([]types.Hash, 3) + topics[0] = types.Hash(stateSyncResultEvent.Sig()) + topics[1] = types.BytesToHash(common.EncodeUint64ToBytes(stateSyncEventID)) + topics[2] = types.BytesToHash(common.EncodeUint64ToBytes(1)) // Status = true + someType := abi.MustNewType("tuple(string field1, string field2)") + encodedData, err := someType.Encode(map[string]string{"field1": "value1", "field2": "value2"}) + require.NoError(t, err) + + return &types.Log{ + Address: contracts.StateReceiverContract, + Topics: topics, + Data: encodedData, + } +} + type mockTopic struct { published proto.Message }