diff --git a/block/block.go b/block/block.go index 9af9e5c11..a876302e8 100644 --- a/block/block.go +++ b/block/block.go @@ -98,7 +98,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta // Prune old heights, if requested by ABCI app. if 0 < retainHeight { - err = m.PruneBlocks(uint64(retainHeight)) + _, err := m.PruneBlocks(uint64(retainHeight)) if err != nil { m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err) } diff --git a/block/manager.go b/block/manager.go index cc17905df..e4ece70e0 100644 --- a/block/manager.go +++ b/block/manager.go @@ -177,7 +177,7 @@ func (m *Manager) Start(ctx context.Context) error { } /* ----------------------------- sequencer mode ----------------------------- */ - // Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settelement and by the time we query the last batch, this batch wasn't accepted yet. + // Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet. go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger) // Sequencer must wait till DA is synced to start submitting blobs diff --git a/block/pruning.go b/block/pruning.go index daa3b3dcf..a1cfaeb7d 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -3,25 +3,23 @@ package block import ( "context" "fmt" - - "github.com/dymensionxyz/gerr-cosmos/gerrc" ) -func (m *Manager) PruneBlocks(retainHeight uint64) error { - if m.IsProposer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future - return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w", - retainHeight, - m.NextHeightToSubmit(), - gerrc.ErrInvalidArgument) +// PruneBlocks prune all block related data from dymint store up to (but not including) retainHeight. It returns the number of blocks pruned, used for testing. +func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) { + nextSubmissionHeight := m.NextHeightToSubmit() + if m.IsProposer() && nextSubmissionHeight < retainHeight { // do not delete anything that we might submit in future + m.logger.Debug("cannot prune blocks before they have been submitted. using height last submitted height for pruning", "retain_height", retainHeight, "height_to_submit", m.NextHeightToSubmit()) + retainHeight = nextSubmissionHeight } err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight) if err != nil { m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err) } - pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight) + pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger) if err != nil { - return fmt.Errorf("prune block store: %w", err) + return 0, fmt.Errorf("prune block store: %w", err) } // TODO: prune state/indexer and state/txindexer?? @@ -29,9 +27,10 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error { m.State.BaseHeight = retainHeight _, err = m.Store.SaveState(m.State, nil) if err != nil { - return fmt.Errorf("save state: %w", err) + return 0, fmt.Errorf("save state: %w", err) } m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight) - return nil + + return pruned, nil } diff --git a/block/pruning_test.go b/block/pruning_test.go index 3f333f0db..9590be35b 100644 --- a/block/pruning_test.go +++ b/block/pruning_test.go @@ -7,6 +7,7 @@ import ( "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/testutil" "github.com/dymensionxyz/dymint/version" + "github.com/dymensionxyz/gerr-cosmos/gerrc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -64,13 +65,15 @@ func TestPruningRetainHeight(t *testing.T) { _, _, err = manager.ProduceApplyGossipBlock(ctx, true) require.NoError(err) } - - validRetainHeight := lastSubmitted + 1 // the max possible valid retain height - for i := validRetainHeight + 1; i < manager.State.Height(); i++ { - err = manager.PruneBlocks(i) - require.Error(err) // cannot prune blocks before they have been submitted + validRetainHeight := manager.NextHeightToSubmit() // the max possible valid retain height + for i := validRetainHeight; i < manager.State.Height(); i++ { + expectedPruned := validRetainHeight - manager.State.BaseHeight + pruned, err := manager.PruneBlocks(i) + if i <= validRetainHeight { + require.NoError(err) + assert.Equal(t, expectedPruned, pruned) + } else { + require.Error(gerrc.ErrInvalidArgument) + } } - - err = manager.PruneBlocks(validRetainHeight) - require.NoError(err) } diff --git a/block/submit.go b/block/submit.go index e582b12d5..2547379a9 100644 --- a/block/submit.go +++ b/block/submit.go @@ -117,8 +117,8 @@ func SubmitLoopInner( logger.Error("Create and submit batch", "err", err, "pending", pending) panic(err) } - // this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted. - // we panic here cause restarting may reset the last batch submitted counter and the sequencer can potentially resume submitting batches. + // this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted. + // we panic here cause restarting may reset the last batch submitted counter and the sequencer can potentially resume submitting batches. if errors.Is(err, gerrc.ErrAlreadyExists) { logger.Debug("Batch already accepted", "err", err, "pending", pending) panic(err) @@ -273,7 +273,7 @@ func (m *Manager) GetUnsubmittedBlocks() uint64 { return m.State.Height() - m.LastSubmittedHeight.Load() } -// UpdateLastSubmittedHeight will update last height submitted height upon events. +// UpdateLastSubmittedHeight will update last height submitted height upon events. // This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer. func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) { eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted) diff --git a/p2p/client.go b/p2p/client.go index 2e9c6eb57..640c50a29 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -3,6 +3,7 @@ package p2p import ( "context" "encoding/hex" + "errors" "fmt" "strconv" "strings" @@ -230,14 +231,17 @@ func (c *Client) RemoveBlocks(ctx context.Context, from, to uint64) error { } for h := from; h < to; h++ { - cid, err := c.store.LoadBlockCid(h) + if errors.Is(err, gerrc.ErrNotFound) { + continue + } if err != nil { - return fmt.Errorf("load block id from store %d: %w", h, err) + c.logger.Error("load blocksync block id from store", "height", h, "error", err) + continue } err = c.blocksync.DeleteBlock(ctx, cid) if err != nil { - return fmt.Errorf("remove block height %d: %w", h, err) + c.logger.Error("remove blocksync block", "height", h, "err", err) } } return nil diff --git a/store/pruning.go b/store/pruning.go index 45ab0e208..bb242cb5c 100644 --- a/store/pruning.go +++ b/store/pruning.go @@ -3,11 +3,12 @@ package store import ( "fmt" + "github.com/dymensionxyz/dymint/types" "github.com/dymensionxyz/gerr-cosmos/gerrc" ) -// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned. -func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) { +// PruneStore removes blocks up to (but not including) a height. It returns number of blocks pruned. +func (s *DefaultStore) PruneStore(from, to uint64, logger types.Logger) (uint64, error) { if from <= 0 { return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument) } @@ -16,7 +17,84 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) { return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument) } + prunedBlocks, err := s.pruneBlocks(from, to, logger) + if err != nil { + logger.Error("pruning blocks", "from", from, "to", to, "blocks pruned", prunedBlocks, "err", err) + } + + prunedResponses, err := s.pruneResponses(from, to, logger) + if err != nil { + logger.Error("pruning responses", "from", from, "to", to, "responses pruned", prunedResponses, "err", err) + } + + prunedSequencers, err := s.pruneSequencers(from, to, logger) + if err != nil { + logger.Error("pruning sequencers", "from", from, "to", to, "sequencers pruned", prunedSequencers, "err", err) + } + + prunedCids, err := s.pruneCids(from, to, logger) + if err != nil { + logger.Error("pruning block sync identifiers", "from", from, "to", to, "cids pruned", prunedCids, "err", err) + } + + return prunedBlocks, nil +} + +// pruneBlocks prunes all store entries that are stored along blocks (blocks,commit and block hash) +func (s *DefaultStore) pruneBlocks(from, to uint64, logger types.Logger) (uint64, error) { + pruneBlocks := func(batch KVBatch, height uint64) error { + hash, err := s.loadHashFromIndex(height) + if err != nil { + return err + } + if err := batch.Delete(getBlockKey(hash)); err != nil { + return err + } + if err := batch.Delete(getCommitKey(hash)); err != nil { + return err + } + if err := batch.Delete(getIndexKey(height)); err != nil { + return err + } + return nil + } + + prunedBlocks, err := s.pruneHeights(from, to, pruneBlocks, logger) + return prunedBlocks, err +} + +// pruneResponses prunes block execution responses from store +func (s *DefaultStore) pruneResponses(from, to uint64, logger types.Logger) (uint64, error) { + pruneResponses := func(batch KVBatch, height uint64) error { + return batch.Delete(getResponsesKey(height)) + } + + prunedResponses, err := s.pruneHeights(from, to, pruneResponses, logger) + return prunedResponses, err +} + +// pruneSequencers prunes sequencers from store +func (s *DefaultStore) pruneSequencers(from, to uint64, logger types.Logger) (uint64, error) { + pruneSequencers := func(batch KVBatch, height uint64) error { + return batch.Delete(getSequencersKey(height)) + } + prunedSequencers, err := s.pruneHeights(from, to, pruneSequencers, logger) + return prunedSequencers, err +} + +// pruneCids prunes content identifiers from store +func (s *DefaultStore) pruneCids(from, to uint64, logger types.Logger) (uint64, error) { + pruneCids := func(batch KVBatch, height uint64) error { + return batch.Delete(getCidKey(height)) + } + prunedCids, err := s.pruneHeights(from, to, pruneCids, logger) + return prunedCids, err +} + +// pruneHeights is the common function for all pruning that iterates through all heights and prunes according to the pruning function set +func (s *DefaultStore) pruneHeights(from, to uint64, prune func(batch KVBatch, height uint64) error, logger types.Logger) (uint64, error) { pruned := uint64(0) + batch := s.db.NewBatch() defer batch.Discard() @@ -29,29 +107,11 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) { } for h := from; h < to; h++ { - hash, err := s.loadHashFromIndex(h) + err := prune(batch, h) if err != nil { + logger.Debug("unable to prune", "height", h, "err", err) continue } - if err := batch.Delete(getBlockKey(hash)); err != nil { - return 0, err - } - if err := batch.Delete(getCommitKey(hash)); err != nil { - return 0, err - } - if err := batch.Delete(getIndexKey(h)); err != nil { - return 0, err - } - if err := batch.Delete(getResponsesKey(h)); err != nil { - return 0, err - } - if err := batch.Delete(getSequencersKey(h)); err != nil { - return 0, err - } - if err := batch.Delete(getCidKey(h)); err != nil { - return 0, err - } - pruned++ // flush every 1000 blocks to avoid batches becoming too large @@ -63,11 +123,12 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) { batch.Discard() batch = s.db.NewBatch() } - } + } err := flush(batch, to) if err != nil { return 0, err } + return pruned, nil } diff --git a/store/pruning_test.go b/store/pruning_test.go index f814989af..f24423f77 100644 --- a/store/pruning_test.go +++ b/store/pruning_test.go @@ -9,6 +9,9 @@ import ( "github.com/ipfs/go-cid" mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/proto/tendermint/state" + "golang.org/x/exp/rand" ) func TestStorePruning(t *testing.T) { @@ -60,37 +63,74 @@ func TestStorePruning(t *testing.T) { assert := assert.New(t) bstore := store.New(store.NewDefaultInMemoryKVStore()) - savedHeights := make(map[uint64]bool) + savedBlockHeights := make(map[uint64]bool) + savedRespHeights := make(map[uint64]bool) + savedSeqHeights := make(map[uint64]bool) + savedCidHeights := make(map[uint64]bool) + for _, block := range c.blocks { - _, err := bstore.SaveBlock(block, &types.Commit{}, nil) - assert.NoError(err) - savedHeights[block.Header.Height] = true - blockBytes, err := block.MarshalBinary() + + _, err := bstore.SaveBlock(block, &types.Commit{Height: block.Header.Height}, nil) assert.NoError(err) - // Create a cid manually by specifying the 'prefix' parameters - pref := &cid.Prefix{ - Codec: cid.DagProtobuf, - MhLength: -1, - MhType: mh.SHA2_256, - Version: 1, + savedBlockHeights[block.Header.Height] = true + + // generate and store block responses randomly for block heights + if randBool() { + _, err = bstore.SaveBlockResponses(block.Header.Height, &state.ABCIResponses{}, nil) + savedRespHeights[block.Header.Height] = true + assert.NoError(err) + } + + // generate and store sequencers randomly for block heights + if randBool() { + _, err = bstore.SaveSequencers(block.Header.Height, &types.SequencerSet{}, nil) + savedSeqHeights[block.Header.Height] = true + assert.NoError(err) + } + + // generate and store cids randomly for block heights + if randBool() { + // generate cid from block + blockBytes, err := block.MarshalBinary() + assert.NoError(err) + // Create a cid manually by specifying the 'prefix' parameters + pref := &cid.Prefix{ + Codec: cid.DagProtobuf, + MhLength: -1, + MhType: mh.SHA2_256, + Version: 1, + } + cid, err := pref.Sum(blockBytes) + assert.NoError(err) + _, err = bstore.SaveBlockCid(block.Header.Height, cid, nil) + assert.NoError(err) + savedCidHeights[block.Header.Height] = true } - cid, err := pref.Sum(blockBytes) + + } + + // Validate everything is saved + for k := range savedBlockHeights { + _, err := bstore.LoadBlock(k) assert.NoError(err) - _, err = bstore.SaveBlockCid(block.Header.Height, cid, nil) + } + + for k := range savedRespHeights { + _, err := bstore.LoadBlockResponses(k) assert.NoError(err) + } - // TODO: add block responses and commits + for k := range savedSeqHeights { + _, err := bstore.LoadSequencers(k) + assert.NoError(err) } - // And then feed it some data - // expectedCid, err := pref.Sum(block) - // Validate all blocks are saved - for k := range savedHeights { - _, err := bstore.LoadBlock(k) + for k := range savedCidHeights { + _, err := bstore.LoadBlockCid(k) assert.NoError(err) } - _, err := bstore.PruneBlocks(c.from, c.to) + _, err := bstore.PruneStore(c.from, c.to, log.NewNopLogger()) if c.shouldError { assert.Error(err) return @@ -99,27 +139,54 @@ func TestStorePruning(t *testing.T) { assert.NoError(err) // Validate only blocks in the range are pruned - for k := range savedHeights { + for k := range savedBlockHeights { if k >= c.from && k < c.to { // k < c.to is the exclusion test _, err := bstore.LoadBlock(k) assert.Error(err, "Block at height %d should be pruned", k) - _, err = bstore.LoadBlockResponses(k) - assert.Error(err, "BlockResponse at height %d should be pruned", k) - _, err = bstore.LoadCommit(k) assert.Error(err, "Commit at height %d should be pruned", k) - _, err = bstore.LoadBlockCid(k) - assert.Error(err, "Cid at height %d should be pruned", k) - } else { _, err := bstore.LoadBlock(k) assert.NoError(err) - _, err = bstore.LoadBlockCid(k) + _, err = bstore.LoadCommit(k) + assert.NoError(err) + + } + } + + // Validate only block responses in the range are pruned + for k := range savedRespHeights { + if k >= c.from && k < c.to { // k < c.to is the exclusion test + _, err = bstore.LoadBlockResponses(k) + assert.Error(err, "Block response at height %d should be pruned", k) + } else { + _, err = bstore.LoadBlockResponses(k) + assert.NoError(err) + } + } + + // Validate only sequencers in the range are pruned + for k := range savedSeqHeights { + if k >= c.from && k < c.to { // k < c.to is the exclusion test + _, err = bstore.LoadSequencers(k) + assert.Error(err, "Block cid at height %d should be pruned", k) + } else { + _, err = bstore.LoadSequencers(k) assert.NoError(err) + } + } + // Validate only block cids in the range are pruned + for k := range savedCidHeights { + if k >= c.from && k < c.to { // k < c.to is the exclusion test + _, err = bstore.LoadBlockCid(k) + assert.Error(err, "Block cid at height %d should be pruned", k) + } else { + _, err = bstore.LoadBlockCid(k) + assert.NoError(err) } } }) @@ -127,3 +194,7 @@ func TestStorePruning(t *testing.T) { } // TODO: prune twice + +func randBool() bool { + return rand.Intn(2) == 0 +} diff --git a/store/storeIface.go b/store/storeIface.go index 9013a5242..3061fc936 100644 --- a/store/storeIface.go +++ b/store/storeIface.go @@ -71,7 +71,7 @@ type Store interface { LoadSequencers(height uint64) (*types.SequencerSet, error) - PruneBlocks(from, to uint64) (uint64, error) + PruneStore(from, to uint64, logger types.Logger) (uint64, error) Close() error