Skip to content

Commit

Permalink
feat(store): indexer pruning (#1061)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Sep 25, 2024
1 parent 37a73e2 commit 68dcabf
Show file tree
Hide file tree
Showing 19 changed files with 750 additions and 79 deletions.
24 changes: 15 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
Expand Down Expand Up @@ -80,6 +81,9 @@ type Manager struct {

// channel used to send the retain height to the pruning background loop
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
}

// NewManager creates new block Manager.
Expand All @@ -95,6 +99,7 @@ func NewManager(
pubsub *pubsub.Server,
p2pClient *p2p.Client,
dalcKV *store.PrefixKV,
indexerService *txindex.IndexerService,
logger log.Logger,
) (*Manager, error) {
localAddress, err := types.GetAddress(localKey)
Expand All @@ -107,15 +112,16 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
logger: logger.With("module", "block_manager"),
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
indexerService: indexerService,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestInitialState(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, settlementlc,
nil, pubsubServer, p2pClient, nil, logger)
nil, pubsubServer, p2pClient, nil, nil, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
Expand Down
13 changes: 10 additions & 3 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {
retainHeight = nextSubmissionHeight
}

//
// prune blocks from blocksync store
err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}

// prune blocks from dymint store
pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger)
if err != nil {
return 0, fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
Expand All @@ -46,6 +52,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

}
}
}
3 changes: 3 additions & 0 deletions indexers/blockindexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ type BlockIndexer interface {
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)

// Delete indexed block entries up to (but not including) a height. It returns number of entries pruned.
Prune(from, to uint64) (uint64, error)
}
127 changes: 118 additions & 9 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
indexer "github.com/dymensionxyz/dymint/indexers/blockindexer"
"github.com/dymensionxyz/dymint/store"

"github.com/dymensionxyz/dymint/types/pb/dymint"
dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -61,7 +63,6 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
defer batch.Discard()

height := bh.Header.Height

// 1. index by height
key, err := heightKey(height)
if err != nil {
Expand All @@ -72,15 +73,21 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
}

// 2. index BeginBlock events
if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
beginKeys, err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height)
if err != nil {
return fmt.Errorf("index BeginBlock events: %w", err)
}

// 3. index EndBlock events
if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
endKeys, err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height)
if err != nil {
return fmt.Errorf("index EndBlock events: %w", err)
}

// 4. index all eventkeys by height key for easy pruning
err = idx.addEventKeys(height, &beginKeys, &endKeys, batch)
if err != nil {
return err
}
return batch.Commit()
}

Expand Down Expand Up @@ -481,9 +488,9 @@ func (idx *BlockerIndexer) match(
return filteredHeights, nil
}

func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) error {
func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) (dmtypes.EventKeys, error) {
heightBz := int64ToBytes(height)

keys := dmtypes.EventKeys{}
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
Expand All @@ -498,21 +505,123 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == tmtypes.BlockHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
return dmtypes.EventKeys{}, fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}

if attr.GetIndex() {
key, err := eventKey(compositeKey, typ, string(attr.Value), height)
if err != nil {
return fmt.Errorf("create block index key: %w", err)
return dmtypes.EventKeys{}, fmt.Errorf("create block index key: %w", err)
}

if err := batch.Set(key, heightBz); err != nil {
return err
return dmtypes.EventKeys{}, err
}
keys.Keys = append(keys.Keys, key)
}
}
}

return keys, nil
}

func (idx *BlockerIndexer) Prune(from, to uint64) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to)
return blocksPruned, err
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64) (uint64, error) {
pruned := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

flush := func(batch store.KVBatch, height int64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("flush batch to disk: height %d: %w", height, err)
}
return nil
}

for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil || !ok {
continue
}
key, err := heightKey(h)
if err != nil {
continue
}
if err := batch.Delete(key); err != nil {
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}

err := flush(batch, int64(to))
if err != nil {
return 0, err
}

return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
eventKey, err := eventHeightKey(height)
if err != nil {
return err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
}
}
return nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
eventKeys := beginKeys
eventKeys.Keys = append(eventKeys.Keys, endKeys.Keys...)
eventKeyHeight, err := eventHeightKey(height)
if err != nil {
return err
}
eventKeysBytes, err := eventKeys.Marshal()
if err != nil {
return err
}
if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil {
return err
}
return nil
}
82 changes: 82 additions & 0 deletions indexers/blockindexer/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
"golang.org/x/exp/rand"

blockidxkv "github.com/dymensionxyz/dymint/indexers/blockindexer/kv"
"github.com/dymensionxyz/dymint/store"
Expand Down Expand Up @@ -93,6 +94,10 @@ func TestBlockIndexer(t *testing.T) {
q *query.Query
results []int64
}{
"block.height < 5": {
q: query.MustParse("block.height < 2"),
results: []int64{1},
},
"block.height = 100": {
q: query.MustParse("block.height = 100"),
results: []int64{},
Expand Down Expand Up @@ -140,3 +145,80 @@ func TestBlockIndexer(t *testing.T) {
})
}
}

func TestBlockIndexerPruning(t *testing.T) {

// init the block indexer
prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events"))
indexer := blockidxkv.New(prefixStore)
numBlocks := uint64(100)

// index block data
for i := uint64(1); i <= numBlocks; i++ {
indexer.Index(types.EventDataNewBlockHeader{
Header: types.Header{Height: int64(i)},
ResultBeginBlock: getBeginBlock(),
ResultEndBlock: getEndBlock(),
})
}

// query all blocks and receive events for all block heights
queryString := fmt.Sprintf("block.height <= %d", numBlocks)
q := query.MustParse(queryString)
results, err := indexer.Search(context.Background(), q)
require.NoError(t, err)
require.Equal(t, numBlocks, uint64(len(results)))

// prune indexer for all heights
pruned, err := indexer.Prune(1, numBlocks+1)
require.NoError(t, err)
require.Equal(t, numBlocks, pruned)

// check the query returns empty
results, err = indexer.Search(context.Background(), q)
require.NoError(t, err)
require.Equal(t, 0, len(results))

}

func getBeginBlock() abci.ResponseBeginBlock {
if rand.Intn(2) == 1 {
return abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "begin_event",
Attributes: []abci.EventAttribute{
{
Key: []byte("proposer"),
Value: []byte("FCAA001"),
Index: true,
},
},
},
},
}
} else {
return abci.ResponseBeginBlock{}
}
}

func getEndBlock() abci.ResponseEndBlock {
if rand.Intn(2) == 1 {
return abci.ResponseEndBlock{
Events: []abci.Event{
{
Type: "end_event",
Attributes: []abci.EventAttribute{
{
Key: []byte("foo"),
Value: []byte("value"),
Index: true,
},
},
},
},
}
} else {
return abci.ResponseEndBlock{}
}
}
Loading

0 comments on commit 68dcabf

Please sign in to comment.