Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Remove executed state sync events and proofs from the store #1977

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (c *consensusRuntime) initStateSyncManager(logger hcf.Logger) error {
blockTrackerPollInterval: c.config.PolyBFTConfig.BlockTrackerPollInterval.Duration,
},
c,
c.config.blockchain,
)

c.stateSyncManager = stateSyncManager
Expand Down
22 changes: 22 additions & 0 deletions consensus/polybft/state_store_state_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ func (s *StateSyncStore) insertStateSyncEvent(event *contractsapi.StateSyncedEve
})
}

// removeStateSyncEventsAndProofs removes state sync events and their proofs from the buckets in db
func (s *StateSyncStore) removeStateSyncEventsAndProofs(stateSyncEventIDs []uint64) error {
return s.db.Update(func(tx *bolt.Tx) error {
eventsBucket := tx.Bucket(stateSyncEventsBucket)
proofsBucket := tx.Bucket(stateSyncProofsBucket)

for _, stateSyncEventID := range stateSyncEventIDs {
stateSyncEventIDKey := common.EncodeUint64ToBytes(stateSyncEventID)

if err := eventsBucket.Delete(stateSyncEventIDKey); err != nil {
return fmt.Errorf("failed to remove state sync event (ID=%d): %w", stateSyncEventID, err)
}

if err := proofsBucket.Delete(stateSyncEventIDKey); err != nil {
return fmt.Errorf("failed to remove state sync event proof (ID=%d): %w", stateSyncEventID, err)
}
}

return nil
})
}

// list iterates through all events in events bucket in db, un-marshals them, and returns as array
func (s *StateSyncStore) list() ([]*contractsapi.StateSyncedEvent, error) {
events := []*contractsapi.StateSyncedEvent{}
Expand Down
53 changes: 45 additions & 8 deletions consensus/polybft/state_sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer"
"github.com/0xPolygon/polygon-edge/consensus/polybft/validator"
"github.com/0xPolygon/polygon-edge/consensus/polybft/wallet"
"github.com/0xPolygon/polygon-edge/contracts"
"github.com/0xPolygon/polygon-edge/tracker"
"github.com/0xPolygon/polygon-edge/types"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -91,6 +92,9 @@ type stateSyncManager struct {
nextCommittedIndex uint64

runtime Runtime

// eventsGetter gets StateSyncResult events (missed or current) from blocks
eventsGetter *eventsGetter[*contractsapi.StateSyncResultEvent]
}

// topic is an interface for p2p message gossiping
Expand All @@ -101,13 +105,27 @@ type topic interface {

// newStateSyncManager creates a new instance of state sync manager
func newStateSyncManager(logger hclog.Logger, state *State, config *stateSyncConfig,
runtime Runtime) *stateSyncManager {
runtime Runtime, blockchain blockchainBackend) *stateSyncManager {
eventsGetter := &eventsGetter[*contractsapi.StateSyncResultEvent]{
blockchain: blockchain,
isValidLogFn: func(l *types.Log) bool {
return l.Address == contracts.StateReceiverContract
},
parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.StateSyncResultEvent, bool, error) {
var stateSyncResultEvent contractsapi.StateSyncResultEvent
matches, err := stateSyncResultEvent.ParseLog(l)

return &stateSyncResultEvent, matches, err
},
}

return &stateSyncManager{
logger: logger,
state: state,
config: config,
closeCh: make(chan struct{}),
runtime: runtime,
logger: logger,
state: state,
config: config,
closeCh: make(chan struct{}),
runtime: runtime,
eventsGetter: eventsGetter,
}
}

Expand Down Expand Up @@ -399,14 +417,33 @@ func (s *stateSyncManager) PostEpoch(req *PostEpochRequest) error {
}

// PostBlock notifies state sync manager that a block was finalized,
// so that it can build state sync proofs if a block has a commitment submission transaction
// so that it can build state sync proofs if a block has a commitment submission transaction.
// Additionally, it will remove any processed state sync events and their proofs from the store.
func (s *stateSyncManager) PostBlock(req *PostBlockRequest) error {
events, err := s.eventsGetter.getEventsFromReceipts(req.FullBlock.Block.Header, req.FullBlock.Receipts)
dusan-maksimovic marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
s.logger.Info("failed to retrieve processed state sync result events from block", "error", err)
} else {
processedStateSyncEventIDs := make([]uint64, 0, len(events))
for _, event := range events {
if event.Status {
processedStateSyncEventIDs = append(processedStateSyncEventIDs, event.Counter.Uint64())
}
}

if len(processedStateSyncEventIDs) > 0 {
if err = s.state.StateSyncStore.removeStateSyncEventsAndProofs(processedStateSyncEventIDs); err != nil {
s.logger.Info("failed to remove processed state sync events data from store", "error", err)
}
}
}

commitment, err := getCommitmentMessageSignedTx(req.FullBlock.Block.Transactions)
if err != nil {
return err
}

// no commitment message -> this is not end of epoch block
// no commitment message -> this is not end of sprint block
if commitment == nil {
return nil
}
Expand Down
109 changes: 108 additions & 1 deletion consensus/polybft/state_sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi"
bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer"
"github.com/0xPolygon/polygon-edge/consensus/polybft/validator"
"github.com/0xPolygon/polygon-edge/contracts"
"github.com/0xPolygon/polygon-edge/helper/common"
"github.com/0xPolygon/polygon-edge/merkle-tree"
"github.com/0xPolygon/polygon-edge/types"
)
Expand All @@ -33,6 +35,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime

topic := &mockTopic{}

blockchainBackend := new(blockchainMock)
s := newStateSyncManager(hclog.NewNullLogger(), state,
&stateSyncConfig{
stateSenderAddr: types.Address{},
Expand All @@ -41,7 +44,7 @@ func newTestStateSyncManager(t *testing.T, key *validator.TestValidator, runtime
topic: topic,
key: key.Key(),
maxCommitmentSize: maxCommitmentSize,
}, runtime)
}, runtime, blockchainBackend)

t.Cleanup(func() {
os.RemoveAll(tmpDir)
Expand Down Expand Up @@ -324,6 +327,90 @@ func TestStateSyncerManager_BuildProofs(t *testing.T) {
}
}

func TestStateSyncerManager_RemoveProcessedEventsAndProofs(t *testing.T) {
const stateSyncEventsCount = 5

vals := validator.NewTestValidators(t, 5)

s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true})

for _, event := range generateStateSyncEvents(t, stateSyncEventsCount, 0) {
require.NoError(t, s.state.StateSyncStore.insertStateSyncEvent(event))
}

require.NoError(t, s.buildCommitment())
require.Len(t, s.pendingCommitments, 1)

mockMsg := &CommitmentMessageSigned{
Message: &contractsapi.StateSyncCommitment{
StartID: s.pendingCommitments[0].StartID,
EndID: s.pendingCommitments[0].EndID,
},
}

txData, err := mockMsg.EncodeAbi()
require.NoError(t, err)

tx := createStateTransactionWithData(1, types.Address{}, txData)

req := &PostBlockRequest{
FullBlock: &types.FullBlock{
Block: &types.Block{
Header: &types.Header{Number: 1},
Transactions: []*types.Transaction{tx},
},
},
}

// PostBlock() inserts commitment and proofs into the store
require.NoError(t, s.PostBlock(req))

// check the state after executing first PostBlock()
require.Equal(t, mockMsg.Message.EndID.Uint64()+1, s.nextCommittedIndex)

stateSyncEventsBefore, err := s.state.StateSyncStore.list()
require.NoError(t, err)
require.Equal(t, stateSyncEventsCount, len(stateSyncEventsBefore))

for i := 0; i < stateSyncEventsCount; i++ {
proof, err := s.state.StateSyncStore.getStateSyncProof(uint64(i))
require.NoError(t, err)
require.NotNil(t, proof)
}

// create second PostBlockRequest to remove processed events and proofs from the store
req = &PostBlockRequest{
FullBlock: &types.FullBlock{
Block: &types.Block{
Header: &types.Header{Number: 2},
},
},
}

// add receipts with executed StateSyncResult logs
receiptSuccess := types.ReceiptSuccess

req.FullBlock.Receipts = make([]*types.Receipt, stateSyncEventsCount)
for i := uint64(0); i < stateSyncEventsCount; i++ {
req.FullBlock.Receipts[i] = &types.Receipt{
Status: &receiptSuccess,
Logs: []*types.Log{createTestLogForStateSyncResultEvent(t, i)}}
}

require.NoError(t, s.PostBlock(req))

// all state sync events and their proofs should be removed from the store
stateSyncEventsAfter, err := s.state.StateSyncStore.list()
require.NoError(t, err)
require.Equal(t, 0, len(stateSyncEventsAfter))

for i := uint64(0); i < stateSyncEventsCount; i++ {
proof, err := s.state.StateSyncStore.getStateSyncProof(i)
require.NoError(t, err)
require.Nil(t, proof)
}
}

func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -571,6 +658,26 @@ func TestStateSyncManager_GetProofs_NoProof_BuildProofs(t *testing.T) {
require.NoError(t, commitment.VerifyStateSyncProof(proof.Data, stateSync))
}

func createTestLogForStateSyncResultEvent(t *testing.T, stateSyncEventID uint64) *types.Log {
t.Helper()

var stateSyncResultEvent contractsapi.StateSyncResultEvent

topics := make([]types.Hash, 3)
topics[0] = types.Hash(stateSyncResultEvent.Sig())
topics[1] = types.BytesToHash(common.EncodeUint64ToBytes(stateSyncEventID))
topics[2] = types.BytesToHash(common.EncodeUint64ToBytes(1)) // Status = true
someType := abi.MustNewType("tuple(string field1, string field2)")
encodedData, err := someType.Encode(map[string]string{"field1": "value1", "field2": "value2"})
require.NoError(t, err)

return &types.Log{
Address: contracts.StateReceiverContract,
Topics: topics,
Data: encodedData,
}
}

type mockTopic struct {
published proto.Message
}
Expand Down
Loading