Skip to content

Commit

Permalink
feat(manager): run dymint store block pruning in background (#1053)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5055ae7)
  • Loading branch information
srene committed Sep 25, 2024
1 parent 2582371 commit ca0ffba
Show file tree
Hide file tree
Showing 20 changed files with 806 additions and 76 deletions.
7 changes: 4 additions & 3 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}
// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
select {
case m.pruningC <- retainHeight:
default:
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}
return nil
Expand Down
40 changes: 30 additions & 10 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

<<<<<<< HEAD
=======
"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
>>>>>>> 5055ae7 (feat(manager): run dymint store block pruning in background (#1053))
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
Expand Down Expand Up @@ -74,6 +79,12 @@ type Manager struct {

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// channel used to send the retain height to the pruning background loop
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
}

// NewManager creates new block Manager.
Expand All @@ -89,6 +100,8 @@ func NewManager(
eventBus *tmtypes.EventBus,
pubsub *pubsub.Server,
p2pClient *p2p.Client,
dalcKV *store.PrefixKV,
indexerService *txindex.IndexerService,
logger types.Logger,
) (*Manager, error) {
localAddress, err := types.GetAddress(localKey)
Expand All @@ -101,20 +114,22 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
SLClient: settlementClient,
indexerService: indexerService,
Retriever: dalc.(da.BatchRetriever),
logger: logger,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
}

err = m.LoadStateOnInit(store, genesis, logger)
Expand Down Expand Up @@ -146,9 +161,14 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

eg, ctx := errgroup.WithContext(ctx)
eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.PruningLoop(ctx)
})

if isSequencer {

eg, ctx := errgroup.WithContext(ctx)

// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestInitialState(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
dalc := testutil.GetMockDALC(logger)
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
nil, pubsubServer, p2pClient, nil, nil, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
Expand Down
28 changes: 25 additions & 3 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error {
gerrc.ErrInvalidArgument)
}

err := m.p2pClient.RemoveBlocks(context.TODO(), m.State.BaseHeight, retainHeight)
// prune blocks from blocksync store
err := m.p2pClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}

// prune blocks from dymint store
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
Expand All @@ -35,3 +42,18 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error {
m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return nil
}

func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case retainHeight := <-m.pruningC:
err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

}
}
}
4 changes: 4 additions & 0 deletions indexers/blockindexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package indexer
import (
"context"

"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -19,4 +20,7 @@ type BlockIndexer interface {
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)

// Delete indexed block entries up to (but not including) a height. It returns number of entries pruned.
Prune(from, to uint64, logger log.Logger) (uint64, error)
}
136 changes: 127 additions & 9 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strconv"
"strings"

"github.com/tendermint/tendermint/libs/log"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/google/orderedcode"

Expand All @@ -17,6 +19,8 @@ import (
indexer "github.com/dymensionxyz/dymint/indexers/blockindexer"
"github.com/dymensionxyz/dymint/store"

"github.com/dymensionxyz/dymint/types/pb/dymint"
dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -61,7 +65,6 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
defer batch.Discard()

height := bh.Header.Height

// 1. index by height
key, err := heightKey(height)
if err != nil {
Expand All @@ -72,15 +75,21 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
}

// 2. index BeginBlock events
if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
beginKeys, err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height)
if err != nil {
return fmt.Errorf("index BeginBlock events: %w", err)
}

// 3. index EndBlock events
if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
endKeys, err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height)
if err != nil {
return fmt.Errorf("index EndBlock events: %w", err)
}

// 4. index all eventkeys by height key for easy pruning
err = idx.addEventKeys(height, &beginKeys, &endKeys, batch)
if err != nil {
return err
}
return batch.Commit()
}

Expand Down Expand Up @@ -481,9 +490,9 @@ func (idx *BlockerIndexer) match(
return filteredHeights, nil
}

func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) error {
func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) (dmtypes.EventKeys, error) {
heightBz := int64ToBytes(height)

keys := dmtypes.EventKeys{}
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
Expand All @@ -498,21 +507,130 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == tmtypes.BlockHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
return dmtypes.EventKeys{}, fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}

if attr.GetIndex() {
key, err := eventKey(compositeKey, typ, string(attr.Value), height)
if err != nil {
return fmt.Errorf("create block index key: %w", err)
return dmtypes.EventKeys{}, fmt.Errorf("create block index key: %w", err)
}

if err := batch.Set(key, heightBz); err != nil {
return err
return dmtypes.EventKeys{}, err
}
keys.Keys = append(keys.Keys, key)
}
}
}

return keys, nil
}

func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to, logger)
return blocksPruned, err
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) {
pruned := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

flush := func(batch store.KVBatch, height int64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("flush batch to disk: height %d: %w", height, err)
}
return nil
}

for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "err", err)
continue
}
if !ok {
continue
}
key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
continue
}
if err := batch.Delete(key); err != nil {
logger.Debug("pruning block indexer deleting height key", "err", err)
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
logger.Debug("pruning block indexer events", "err", err)
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}

err := flush(batch, int64(to))
if err != nil {
return 0, err
}

return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
eventKey, err := eventHeightKey(height)
if err != nil {
return err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
}
}
return nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
eventKeys := beginKeys
eventKeys.Keys = append(eventKeys.Keys, endKeys.Keys...)
eventKeyHeight, err := eventHeightKey(height)
if err != nil {
return err
}
eventKeysBytes, err := eventKeys.Marshal()
if err != nil {
return err
}
if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil {
return err
}
return nil
}
Loading

0 comments on commit ca0ffba

Please sign in to comment.