Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): indexer pruning #1061

Merged
merged 13 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
Expand Down Expand Up @@ -80,6 +81,9 @@ type Manager struct {

// channel used to send the retain height to the pruning background loop
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
}

// NewManager creates new block Manager.
Expand All @@ -95,6 +99,7 @@ func NewManager(
pubsub *pubsub.Server,
p2pClient *p2p.Client,
dalcKV *store.PrefixKV,
indexerService *txindex.IndexerService,
logger log.Logger,
) (*Manager, error) {
localAddress, err := types.GetAddress(localKey)
Expand All @@ -107,15 +112,16 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
logger: logger.With("module", "block_manager"),
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
indexerService: indexerService,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestInitialState(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, settlementlc,
nil, pubsubServer, p2pClient, nil, logger)
nil, pubsubServer, p2pClient, nil, nil, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
Expand Down
12 changes: 9 additions & 3 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error {
gerrc.ErrInvalidArgument)
}

//
// prune blocks from blocksync store
err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}

// prune blocks from dymint store
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("pruning dymint store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
Expand All @@ -48,6 +53,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

}
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.4
require (
cosmossdk.io/errors v1.0.1
github.com/avast/retry-go/v4 v4.5.0
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240826163012-baf4c0b6c915
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240828130104-089b92870cb6
github.com/celestiaorg/go-cnc v0.4.2
github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.12
github.com/cosmos/cosmos-sdk v0.46.16
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240826163012-baf4c0b6c915 h1:aCHDuyf21W+Gxa6WFvqjA8cH7Ux54reGxNKtM1P2BSY=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240826163012-baf4c0b6c915/go.mod h1:UZfG4lOO4ycQ+JLwpyFkSY13NblnyZDyiq2x42FEj80=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240828130104-089b92870cb6 h1:CL3ESQnOsFnErzKCMMhi9m/zmerrKTZM03ZT8zEdjuA=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240828130104-089b92870cb6/go.mod h1:I0b//K7q/RP1g6gtnQ1xi//ocGwrql4gcQeJEOhDifw=
github.com/celestiaorg/go-cnc v0.4.2 h1:7ixf3tevMB7Lvz2mbyRG0ZOK+8qoPm7wNhdgpi8VreU=
github.com/celestiaorg/go-cnc v0.4.2/go.mod h1:zYzvHudSd1iNPuHBMyvZ1YvWou5aT9JXgtch9Tkaf70=
github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I=
Expand Down
3 changes: 3 additions & 0 deletions indexers/blockindexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ type BlockIndexer interface {
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)

// Delete indexed block entries up to (but not including) a height. It returns number of entries pruned.
Prune(from, to uint64) (uint64, error)
}
127 changes: 118 additions & 9 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
indexer "github.com/dymensionxyz/dymint/indexers/blockindexer"
"github.com/dymensionxyz/dymint/store"

"github.com/dymensionxyz/dymint/types/pb/dymint"
dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -61,7 +63,6 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
defer batch.Discard()

height := bh.Header.Height

// 1. index by height
key, err := heightKey(height)
if err != nil {
Expand All @@ -72,15 +73,21 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
}

// 2. index BeginBlock events
if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
beginKeys, err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height)
if err != nil {
return fmt.Errorf("index BeginBlock events: %w", err)
}

// 3. index EndBlock events
if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
endKeys, err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height)
if err != nil {
return fmt.Errorf("index EndBlock events: %w", err)
}

// 4. index all eventkeys by height key for easy pruning
err = idx.addEventKeys(height, &beginKeys, &endKeys, batch)
if err != nil {
return err
}
return batch.Commit()
}

Expand Down Expand Up @@ -481,9 +488,9 @@ func (idx *BlockerIndexer) match(
return filteredHeights, nil
}

func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) error {
func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) (dmtypes.EventKeys, error) {
heightBz := int64ToBytes(height)

keys := dmtypes.EventKeys{}
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
Expand All @@ -498,21 +505,123 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == tmtypes.BlockHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
return dmtypes.EventKeys{}, fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}

if attr.GetIndex() {
key, err := eventKey(compositeKey, typ, string(attr.Value), height)
if err != nil {
return fmt.Errorf("create block index key: %w", err)
return dmtypes.EventKeys{}, fmt.Errorf("create block index key: %w", err)
}

if err := batch.Set(key, heightBz); err != nil {
return err
return dmtypes.EventKeys{}, err
}
keys.Keys = append(keys.Keys, key)
}
}
}

return keys, nil
}

func (idx *BlockerIndexer) Prune(from, to uint64) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to)
return blocksPruned, err
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64) (uint64, error) {
pruned := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

flush := func(batch store.KVBatch, height int64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("flush batch to disk: height %d: %w", height, err)
}
return nil
}

for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil || !ok {
continue
}
key, err := heightKey(h)
if err != nil {
continue
}
if err := batch.Delete(key); err != nil {
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}

err := flush(batch, int64(to))
if err != nil {
return 0, err
}

return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
eventKey, err := eventHeightKey(height)
if err != nil {
return err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
}
}
return nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
eventKeys := beginKeys
eventKeys.Keys = append(eventKeys.Keys, endKeys.Keys...)
eventKeyHeight, err := eventHeightKey(height)
if err != nil {
return err
}
eventKeysBytes, err := eventKeys.Marshal()
if err != nil {
return err
}
if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil {
return err
}
return nil
}
Loading
Loading