Skip to content

Commit

Permalink
fix(pruning): upstream pruning fixes (#1204)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Nov 8, 2024
1 parent 34e29f6 commit 484af6e
Show file tree
Hide file tree
Showing 39 changed files with 991 additions and 342 deletions.
2 changes: 1 addition & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
select {
case m.pruningC <- retainHeight:
default:
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
m.logger.Debug("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Manager struct {
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
IndexerService *txindex.IndexerService
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewManager(
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
indexerService: indexerService,
IndexerService: indexerService,
Retriever: dalc.(da.BatchRetriever),
logger: logger,
blockCache: &Cache{
Expand Down
1 change: 1 addition & 0 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestInitialState(t *testing.T) {
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BlockSyncRequestIntervalTime: 30 * time.Second,
BlockSyncEnabled: true,
}, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger)
assert.NoError(err)
assert.NotNil(p2pClient)
Expand Down
58 changes: 23 additions & 35 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,30 @@ package block

import (
"context"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
)

func (m *Manager) PruneBlocks(retainHeight uint64) error {
if m.IsSequencer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future
return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w",
retainHeight,
m.NextHeightToSubmit(),
gerrc.ErrInvalidArgument)
// PruneBlocks prune all block related data from dymint store up to (but not including) retainHeight.
func (m *Manager) PruneBlocks(retainHeight uint64) {
// logging pruning result
logResult := func(err error, source string, retainHeight uint64, pruned uint64) {
if err != nil {
m.logger.Error("pruning", "from", source, "retain height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned", "from", source, "retain height", retainHeight, "pruned", pruned)
}
}

// prune blocks from blocksync store
err := m.p2pClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}
pruned, err := m.p2pClient.RemoveBlocks(context.Background(), retainHeight)
logResult(err, "blocksync", retainHeight, pruned)

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}
// prune indexed block and txs and associated events
pruned, err = m.IndexerService.Prune(retainHeight, m.Store)
logResult(err, "indexer", retainHeight, pruned)

// prune blocks from dymint store
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
}

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("save state: %w", err)
}

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return nil
pruned, err = m.Store.PruneStore(retainHeight, m.logger)
logResult(err, "dymint store", retainHeight, pruned)
}

func (m *Manager) PruningLoop(ctx context.Context) error {
Expand All @@ -49,11 +34,14 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case retainHeight := <-m.pruningC:
err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

var pruningHeight uint64
if m.IsSequencer() { // do not delete anything that we might submit in future
pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight))
} else {
pruningHeight = uint64(retainHeight)
}
m.PruneBlocks(pruningHeight)
}
}
}
24 changes: 18 additions & 6 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"testing"

"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
)

Expand Down Expand Up @@ -45,13 +47,23 @@ func TestPruningRetainHeight(t *testing.T) {
_, _, err = manager.ProduceAndGossipBlock(ctx, true)
require.NoError(err)
}
validRetainHeight := manager.NextHeightToSubmit() // the max possible valid retain height

validatePruning := func(i uint64, expectedPruned uint64, pruned uint64, err error) {
if i <= validRetainHeight {
require.NoError(err)
assert.Equal(t, expectedPruned, pruned)
} else {
require.Error(gerrc.ErrInvalidArgument)
}
}
for i := validRetainHeight; i < manager.State.Height(); i++ {
baseHeight := uint64(1)
expectedPruned := validRetainHeight - baseHeight

pruned, err := manager.Store.PruneStore(validRetainHeight, log.NewNopLogger())
validatePruning(i, expectedPruned, pruned, err)

validRetainHeight := lastSubmitted + 1 // the max possible valid retain height
for i := validRetainHeight + 1; i < manager.State.Height(); i++ {
err = manager.PruneBlocks(i)
require.Error(err) // cannot prune blocks before they have been submitted
}

err = manager.PruneBlocks(validRetainHeight)
require.NoError(err)
}
12 changes: 4 additions & 8 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,10 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) {
}

s := types.State{
Version: InitStateVersion,
ChainID: genDoc.ChainID,
InitialHeight: uint64(genDoc.InitialHeight),

BaseHeight: uint64(genDoc.InitialHeight),

LastHeightValidatorsChanged: genDoc.InitialHeight,

Version: InitStateVersion,
ChainID: genDoc.ChainID,
InitialHeight: uint64(genDoc.InitialHeight),
LastHeightValidatorsChanged: genDoc.InitialHeight,
ConsensusParams: *genDoc.ConsensusParams,
LastHeightConsensusParamsChanged: genDoc.InitialHeight,
}
Expand Down
69 changes: 38 additions & 31 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,19 +528,12 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
}

func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to, logger)
return blocksPruned, err
return idx.pruneBlocks(from, to, logger)
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) {
pruned := uint64(0)
toFlush := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

Expand All @@ -553,38 +546,47 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
}

for h := int64(from); h < int64(to); h++ {

// flush every 1000 blocks to avoid batches becoming too large
if toFlush > 1000 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
toFlush = 0
}

ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "err", err)
logger.Debug("pruning block indexer checking height", "height", h, "err", err)
continue
}
if !ok {
continue
}
key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
logger.Debug("pruning block indexer getting height key", "height", h, "err", err)
continue
}
pruned++
toFlush++

if err := batch.Delete(key); err != nil {
logger.Debug("pruning block indexer deleting height key", "err", err)
logger.Debug("pruning block indexer deleting height key", "height", h, "err", err)
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
logger.Debug("pruning block indexer events", "err", err)
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
prunedEvents, err := idx.pruneEvents(h, logger, batch)
pruned += prunedEvents
toFlush += prunedEvents

if err != nil {
logger.Debug("pruning block indexer events", "height", h, "err", err)
}

}

err := flush(batch, int64(to))
Expand All @@ -595,27 +597,32 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
func (idx *BlockerIndexer) pruneEvents(height int64, logger log.Logger, batch store.KVBatch) (uint64, error) {
pruned := uint64(0)

eventKey, err := eventHeightKey(height)
if err != nil {
return err
return pruned, err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
return pruned, err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
return pruned, err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
logger.Error("pruning block indexer iterate events", "height", height, "err", err)
continue
}
pruned++

}
return nil
return pruned, nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
Expand Down
11 changes: 8 additions & 3 deletions indexers/blockindexer/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,17 @@ func TestBlockIndexerPruning(t *testing.T) {
indexer := blockidxkv.New(prefixStore)
numBlocks := uint64(100)

numEvents := uint64(0)
// index block data
for i := uint64(1); i <= numBlocks; i++ {
beginBlock := getBeginBlock()
endBlock := getEndBlock()
numEvents += uint64(len(beginBlock.Events))
numEvents += uint64(len(endBlock.Events))
indexer.Index(types.EventDataNewBlockHeader{
Header: types.Header{Height: int64(i)},
ResultBeginBlock: getBeginBlock(),
ResultEndBlock: getEndBlock(),
ResultBeginBlock: beginBlock,
ResultEndBlock: endBlock,
})
}

Expand All @@ -173,7 +178,7 @@ func TestBlockIndexerPruning(t *testing.T) {
// prune indexer for all heights
pruned, err := indexer.Prune(1, numBlocks+1, log.NewNopLogger())
require.NoError(t, err)
require.Equal(t, numBlocks, pruned)
require.Equal(t, numBlocks+numEvents, pruned)

// check the query returns empty
results, err = indexer.Search(context.Background(), q)
Expand Down
8 changes: 5 additions & 3 deletions indexers/txindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ type TxIndexer interface {
// Batch groups together multiple Index operations to be performed at the same time.
// NOTE: Batch is NOT thread-safe and must not be modified after starting its execution.
type Batch struct {
Ops []*abci.TxResult
Height int64
Ops []*abci.TxResult
}

// NewBatch creates a new Batch.
func NewBatch(n int64) *Batch {
func NewBatch(n int64, height int64) *Batch {
return &Batch{
Ops: make([]*abci.TxResult, n),
Height: height,
Ops: make([]*abci.TxResult, n),
}
}

Expand Down
Loading

0 comments on commit 484af6e

Please sign in to comment.