Skip to content

Commit

Permalink
Remove executed state sync events and proofs from the store (#1977)
Browse files Browse the repository at this point in the history
Remove executed state sync events and proofs from the store
  • Loading branch information
dusan-maksimovic authored Oct 12, 2023
1 parent e0d2891 commit 83e2a53
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 9 deletions.
1 change: 1 addition & 0 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (c *consensusRuntime) initStateSyncManager(logger hcf.Logger) error {
blockTrackerPollInterval: c.config.PolyBFTConfig.BlockTrackerPollInterval.Duration,
},
c,
c.config.blockchain,
)

c.stateSyncManager = stateSyncManager
Expand Down
22 changes: 22 additions & 0 deletions consensus/polybft/state_store_state_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ func (s *StateSyncStore) insertStateSyncEvent(event *contractsapi.StateSyncedEve
})
}

// removeStateSyncEventsAndProofs removes state sync events and their proofs from the buckets in db
func (s *StateSyncStore) removeStateSyncEventsAndProofs(stateSyncEventIDs []uint64) error {
return s.db.Update(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(stateSyncEventsBucket)
proofsBucket := tx.Bucket(stateSyncProofsBucket)

for _, stateSyncEventID := range stateSyncEventIDs {
stateSyncEventIDKey := common.EncodeUint64ToBytes(stateSyncEventID)

if err := eventsBucket.Delete(stateSyncEventIDKey); err != nil {
return fmt.Errorf("failed to remove state sync event (ID=%d): %w", stateSyncEventID, err)
}

if err := proofsBucket.Delete(stateSyncEventIDKey); err != nil {
return fmt.Errorf("failed to remove state sync event proof (ID=%d): %w", stateSyncEventID, err)
}
}

return nil
})
}

// list iterates through all events in events bucket in db, un-marshals them, and returns as array
func (s *StateSyncStore) list() ([]*contractsapi.StateSyncedEvent, error) {
events := []*contractsapi.StateSyncedEvent{}
Expand Down
53 changes: 45 additions & 8 deletions consensus/polybft/state_sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer"
"github.com/0xPolygon/polygon-edge/consensus/polybft/validator"
"github.com/0xPolygon/polygon-edge/consensus/polybft/wallet"
"github.com/0xPolygon/polygon-edge/contracts"
"github.com/0xPolygon/polygon-edge/tracker"
"github.com/0xPolygon/polygon-edge/types"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -91,6 +92,9 @@ type stateSyncManager struct {
nextCommittedIndex uint64

runtime Runtime

// eventsGetter gets StateSyncResult events (missed or current) from blocks
eventsGetter *eventsGetter[*contractsapi.StateSyncResultEvent]
}

// topic is an interface for p2p message gossiping
Expand All @@ -101,13 +105,27 @@ type topic interface {

// newStateSyncManager creates a new instance of state sync manager
func newStateSyncManager(logger hclog.Logger, state *State, config *stateSyncConfig,
runtime Runtime) *stateSyncManager {
runtime Runtime, blockchain blockchainBackend) *stateSyncManager {
eventsGetter := &eventsGetter[*contractsapi.StateSyncResultEvent]{
blockchain: blockchain,
isValidLogFn: func(l *types.Log) bool {
return l.Address == contracts.StateReceiverContract
},
parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.StateSyncResultEvent, bool, error) {
var stateSyncResultEvent contractsapi.StateSyncResultEvent
matches, err := stateSyncResultEvent.ParseLog(l)

return &stateSyncResultEvent, matches, err
},
}

return &stateSyncManager{
logger: logger,
state: state,
config: config,
closeCh: make(chan struct{}),
runtime: runtime,
logger: logger,
state: state,
config: config,
closeCh: make(chan struct{}),
runtime: runtime,
eventsGetter: eventsGetter,
}
}

Expand Down Expand Up @@ -399,14 +417,33 @@ func (s *stateSyncManager) PostEpoch(req *PostEpochRequest) error {
}

// PostBlock notifies state sync manager that a block was finalized,
// so that it can build state sync proofs if a block has a commitment submission transaction
// so that it can build state sync proofs if a block has a commitment submission transaction.
// Additionally, it will remove any processed state sync events and their proofs from the store.
func (s *stateSyncManager) PostBlock(req *PostBlockRequest) error {
events, err := s.eventsGetter.getEventsFromReceipts(req.FullBlock.Block.Header, req.FullBlock.Receipts)
if err != nil {
s.logger.Info("failed to retrieve processed state sync result events from block", "error", err)
} else {
processedStateSyncEventIDs := make([]uint64, 0, len(events))
for _, event := range events {
if event.Status {
processedStateSyncEventIDs = append(processedStateSyncEventIDs, event.Counter.Uint64())
}
}

if len(processedStateSyncEventIDs) > 0 {
if err = s.state.StateSyncStore.removeStateSyncEventsAndProofs(processedStateSyncEventIDs); err != nil {
s.logger.Info("failed to remove processed state sync events data from store", "error", err)
}
}
}

commitment, err := getCommitmentMessageSignedTx(req.FullBlock.Block.Transactions)
if err != nil {
return err
}

// no commitment message -> this is not end of epoch block
// no commitment message -> this is not end of sprint block
if commitment == nil {
return nil
}
Expand Down
109 changes: 108 additions & 1 deletion consensus/polybft/state_sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi"
bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer"
"github.com/0xPolygon/polygon-edge/consensus/polybft/validator"
"github.com/0xPolygon/polygon-edge/contracts"
"github.com/0xPolygon/polygon-edge/helper/common"
"github.com/0xPolygon/polygon-edge/merkle-tree"
"github.com/0xPolygon/polygon-edge/types"
)
Expand All @@ -33,6 +35,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime

topic := &mockTopic{}

blockchainBackend := new(blockchainMock)
s := newStateSyncManager(hclog.NewNullLogger(), state,
&stateSyncConfig{
stateSenderAddr: types.Address{},
Expand All @@ -41,7 +44,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime
topic: topic,
key: key.Key(),
maxCommitmentSize: maxCommitmentSize,
}, runtime)
}, runtime, blockchainBackend)

t.Cleanup(func() {
os.RemoveAll(tmpDir)
Expand Down Expand Up @@ -324,6 +327,90 @@ func TestStateSyncerManager_BuildProofs(t *testing.T) {
}
}

func TestStateSyncerManager_RemoveProcessedEventsAndProofs(t *testing.T) {
const stateSyncEventsCount = 5

vals := validator.NewTestValidators(t, 5)

s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true})

for _, event := range generateStateSyncEvents(t, stateSyncEventsCount, 0) {
require.NoError(t, s.state.StateSyncStore.insertStateSyncEvent(event))
}

require.NoError(t, s.buildCommitment())
require.Len(t, s.pendingCommitments, 1)

mockMsg := &CommitmentMessageSigned{
Message: &contractsapi.StateSyncCommitment{
StartID: s.pendingCommitments[0].StartID,
EndID: s.pendingCommitments[0].EndID,
},
}

txData, err := mockMsg.EncodeAbi()
require.NoError(t, err)

tx := createStateTransactionWithData(1, types.Address{}, txData)

req := &PostBlockRequest{
FullBlock: &types.FullBlock{
Block: &types.Block{
Header: &types.Header{Number: 1},
Transactions: []*types.Transaction{tx},
},
},
}

// PostBlock() inserts commitment and proofs into the store
require.NoError(t, s.PostBlock(req))

// check the state after executing first PostBlock()
require.Equal(t, mockMsg.Message.EndID.Uint64()+1, s.nextCommittedIndex)

stateSyncEventsBefore, err := s.state.StateSyncStore.list()
require.NoError(t, err)
require.Equal(t, stateSyncEventsCount, len(stateSyncEventsBefore))

for i := 0; i < stateSyncEventsCount; i++ {
proof, err := s.state.StateSyncStore.getStateSyncProof(uint64(i))
require.NoError(t, err)
require.NotNil(t, proof)
}

// create second PostBlockRequest to remove processed events and proofs from the store
req = &PostBlockRequest{
FullBlock: &types.FullBlock{
Block: &types.Block{
Header: &types.Header{Number: 2},
},
},
}

// add receipts with executed StateSyncResult logs
receiptSuccess := types.ReceiptSuccess

req.FullBlock.Receipts = make([]*types.Receipt, stateSyncEventsCount)
for i := uint64(0); i < stateSyncEventsCount; i++ {
req.FullBlock.Receipts[i] = &types.Receipt{
Status: &receiptSuccess,
Logs: []*types.Log{createTestLogForStateSyncResultEvent(t, i)}}
}

require.NoError(t, s.PostBlock(req))

// all state sync events and their proofs should be removed from the store
stateSyncEventsAfter, err := s.state.StateSyncStore.list()
require.NoError(t, err)
require.Equal(t, 0, len(stateSyncEventsAfter))

for i := uint64(0); i < stateSyncEventsCount; i++ {
proof, err := s.state.StateSyncStore.getStateSyncProof(i)
require.NoError(t, err)
require.Nil(t, proof)
}
}

func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -571,6 +658,26 @@ func TestStateSyncManager_GetProofs_NoProof_BuildProofs(t *testing.T) {
require.NoError(t, commitment.VerifyStateSyncProof(proof.Data, stateSync))
}

func createTestLogForStateSyncResultEvent(t *testing.T, stateSyncEventID uint64) *types.Log {
t.Helper()

var stateSyncResultEvent contractsapi.StateSyncResultEvent

topics := make([]types.Hash, 3)
topics[0] = types.Hash(stateSyncResultEvent.Sig())
topics[1] = types.BytesToHash(common.EncodeUint64ToBytes(stateSyncEventID))
topics[2] = types.BytesToHash(common.EncodeUint64ToBytes(1)) // Status = true
someType := abi.MustNewType("tuple(string field1, string field2)")
encodedData, err := someType.Encode(map[string]string{"field1": "value1", "field2": "value2"})
require.NoError(t, err)

return &types.Log{
Address: contracts.StateReceiverContract,
Topics: topics,
Data: encodedData,
}
}

type mockTopic struct {
published proto.Message
}
Expand Down

0 comments on commit 83e2a53

Please sign in to comment.