Skip to content

Commit

Permalink
fix(manager/indexer): pruning fixes (#1147)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Nov 7, 2024
1 parent 3c0b798 commit e5e8857
Show file tree
Hide file tree
Showing 28 changed files with 435 additions and 329 deletions.
2 changes: 1 addition & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
select {
case m.pruningC <- retainHeight:
default:
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
m.logger.Debug("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

Expand Down
4 changes: 2 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Manager struct {
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
IndexerService *txindex.IndexerService

// used to fetch blocks from DA. Sequencer will only fetch batches in case it requires to re-sync (in case of rollback). Full-node will fetch batches for syncing and validation.
Retriever da.BatchRetriever
Expand Down Expand Up @@ -160,7 +160,7 @@ func NewManager(
Executor: exec,
Sequencers: types.NewSequencerSet(),
SLClient: settlementClient,
indexerService: indexerService,
IndexerService: indexerService,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
Expand Down
1 change: 1 addition & 0 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestInitialState(t *testing.T) {
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BlockSyncRequestIntervalTime: 30 * time.Second,
BlockSyncEnabled: true,
}, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger)
assert.NoError(err)
assert.NotNil(p2pClient)
Expand Down
51 changes: 19 additions & 32 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,30 @@ package block

import (
"context"
"fmt"
)

// PruneBlocks prune all block related data from dymint store up to (but not including) retainHeight. It returns the number of blocks pruned, used for testing.
func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {
// prune blocks from blocksync store
err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}

// prune blocks from dymint store
pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger)
if err != nil {
return 0, fmt.Errorf("prune block store: %w", err)
// Prune function prune all block related data from dymint store and blocksync store up to (but not including) retainHeight.
func (m *Manager) Prune(retainHeight uint64) {
// logging pruning result
logResult := func(err error, source string, retainHeight uint64, pruned uint64) {
if err != nil {
m.logger.Error("pruning", "from", source, "retain height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned", "from", source, "retain height", retainHeight, "pruned", pruned)
}
}

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return 0, fmt.Errorf("save state: %w", err)
}
// prune blocks from blocksync store
pruned, err := m.P2PClient.RemoveBlocks(context.Background(), retainHeight)
logResult(err, "blocksync", retainHeight, pruned)

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
// prune indexed block and txs and associated events
pruned, err = m.IndexerService.Prune(retainHeight, m.Store)
logResult(err, "indexer", retainHeight, pruned)

return pruned, nil
// prune blocks from dymint store
pruned, err = m.Store.PruneStore(retainHeight, m.logger)
logResult(err, "dymint store", retainHeight, pruned)
}

func (m *Manager) PruningLoop(ctx context.Context) error {
Expand All @@ -48,12 +40,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
} else { // do not delete anything that is not validated yet
pruningHeight = min(m.SettlementValidator.NextValidationHeight(), uint64(retainHeight))
}

_, err := m.PruneBlocks(pruningHeight)
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

m.Prune(pruningHeight)
}
}
}
17 changes: 14 additions & 3 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,26 @@ func TestPruningRetainHeight(t *testing.T) {
require.NoError(err)
}
validRetainHeight := manager.NextHeightToSubmit() // the max possible valid retain height
for i := validRetainHeight; i < manager.State.Height(); i++ {
expectedPruned := validRetainHeight - manager.State.BaseHeight
pruned, err := manager.PruneBlocks(i)

validatePruning := func(i uint64, expectedPruned uint64, pruned uint64, err error) {
if i <= validRetainHeight {
require.NoError(err)
assert.Equal(t, expectedPruned, pruned)
} else {
require.Error(gerrc.ErrInvalidArgument)
}
}
for i := validRetainHeight; i < manager.State.Height(); i++ {

baseHeight := uint64(1)
expectedPruned := validRetainHeight - baseHeight

pruned, err := manager.Store.PruneStore(validRetainHeight, log.NewNopLogger())
validatePruning(i, expectedPruned, pruned, err)

pruned, err = manager.P2PClient.RemoveBlocks(ctx, validRetainHeight)
validatePruning(i, expectedPruned, pruned, err)

}

}
1 change: 0 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) {
ChainID: genDoc.ChainID,

InitialHeight: uint64(genDoc.InitialHeight),
BaseHeight: uint64(genDoc.InitialHeight),
ConsensusParams: *genDoc.ConsensusParams,
}
s.SetHeight(0)
Expand Down
50 changes: 28 additions & 22 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,19 +528,12 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
}

func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to, logger)
return blocksPruned, err
return idx.pruneBlocks(from, to, logger)
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) {
pruned := uint64(0)
toFlush := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

Expand All @@ -555,35 +548,43 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "err", err)
logger.Debug("pruning block indexer checking height", "height", h, "err", err)
continue
}
if !ok {
continue
}
key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
logger.Debug("pruning block indexer getting height key", "height", h, "err", err)
continue
}
if err := batch.Delete(key); err != nil {
logger.Debug("pruning block indexer deleting height key", "err", err)
logger.Debug("pruning block indexer deleting height key", "height", h, "err", err)
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
logger.Debug("pruning block indexer events", "err", err)

pruned++

prunedEvents, err := idx.pruneEvents(h, logger, batch)
if err != nil {
logger.Debug("pruning block indexer events", "height", h, "err", err)
continue
}
pruned++
pruned += prunedEvents

toFlush += pruned

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
if toFlush > 1000 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()

toFlush = 0
}
}

Expand All @@ -595,27 +596,32 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
func (idx *BlockerIndexer) pruneEvents(height int64, logger log.Logger, batch store.KVBatch) (uint64, error) {
pruned := uint64(0)

eventKey, err := eventHeightKey(height)
if err != nil {
return err
return pruned, err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
return pruned, err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
return pruned, err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
logger.Error("pruning block indexer iterate events", "height", height, "err", err)
continue
}
pruned++

}
return nil
return pruned, nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
Expand Down
11 changes: 8 additions & 3 deletions indexers/blockindexer/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,17 @@ func TestBlockIndexerPruning(t *testing.T) {
indexer := blockidxkv.New(prefixStore)
numBlocks := uint64(100)

numEvents := uint64(0)
// index block data
for i := uint64(1); i <= numBlocks; i++ {
beginBlock := getBeginBlock()
endBlock := getEndBlock()
numEvents += uint64(len(beginBlock.Events))
numEvents += uint64(len(endBlock.Events))
indexer.Index(types.EventDataNewBlockHeader{
Header: types.Header{Height: int64(i)},
ResultBeginBlock: getBeginBlock(),
ResultEndBlock: getEndBlock(),
ResultBeginBlock: beginBlock,
ResultEndBlock: endBlock,
})
}

Expand All @@ -173,7 +178,7 @@ func TestBlockIndexerPruning(t *testing.T) {
// prune indexer for all heights
pruned, err := indexer.Prune(1, numBlocks+1, log.NewNopLogger())
require.NoError(t, err)
require.Equal(t, numBlocks, pruned)
require.Equal(t, numBlocks+numEvents, pruned)

// check the query returns empty
results, err = indexer.Search(context.Background(), q)
Expand Down
8 changes: 5 additions & 3 deletions indexers/txindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ type TxIndexer interface {
// Batch groups together multiple Index operations to be performed at the same time.
// NOTE: Batch is NOT thread-safe and must not be modified after starting its execution.
type Batch struct {
Ops []*abci.TxResult
Height int64
Ops []*abci.TxResult
}

// NewBatch creates a new Batch.
func NewBatch(n int64) *Batch {
func NewBatch(n int64, height int64) *Batch {
return &Batch{
Ops: make([]*abci.TxResult, n),
Height: height,
Ops: make([]*abci.TxResult, n),
}
}

Expand Down
36 changes: 29 additions & 7 deletions indexers/txindex/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package txindex

import (
"context"
"errors"

indexer "github.com/dymensionxyz/dymint/indexers/blockindexer"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -59,7 +62,7 @@ func (is *IndexerService) OnStart() error {
msg := <-blockHeadersSub.Out()
eventDataHeader, _ := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
batch := NewBatch(eventDataHeader.NumTxs, height)

for i := int64(0); i < eventDataHeader.NumTxs; i++ {
msg2 := <-txsSub.Out()
Expand Down Expand Up @@ -99,14 +102,33 @@ func (is *IndexerService) OnStop() {
}

// Prune removes tx and blocks indexed up to (but not including) a height.
func (is *IndexerService) Prune(from, to uint64) error {
_, err := is.blockIdxr.Prune(from, to, is.Logger)
func (is *IndexerService) Prune(to uint64, s store.Store) (uint64, error) {
// load indexer base height
indexerBaseHeight, err := s.LoadIndexerBaseHeight()

if errors.Is(err, gerrc.ErrNotFound) {
is.Logger.Error("load indexer base height", "err", err)
} else if err != nil {
return 0, err
}

// prune indexed blocks
blockPruned, err := is.blockIdxr.Prune(indexerBaseHeight, to, is.Logger)
if err != nil {
return err
return blockPruned, err
}
_, err = is.txIdxr.Prune(from, to, is.Logger)

// prune indexes txs
txPruned, err := is.txIdxr.Prune(indexerBaseHeight, to, is.Logger)
if err != nil {
return err
return txPruned, err
}
return nil

// store indexer base height
err = s.SaveIndexerBaseHeight(to)
if err != nil {
is.Logger.Error("saving indexer base height", "err", err)
}

return blockPruned + txPruned, nil
}
Loading

0 comments on commit e5e8857

Please sign in to comment.