Skip to content

Commit

Permalink
feat(manager): run dymint store block pruning in background (#1053)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Sep 25, 2024
1 parent 66f9b35 commit 5055ae7
Show file tree
Hide file tree
Showing 21 changed files with 804 additions and 87 deletions.
20 changes: 12 additions & 8 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("commit block: %w", err)
}

// Prune old heights, if requested by ABCI app.
// retainHeight is determined by currentHeight - min-retain-blocks (app.toml config).
// Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks.

if 0 < retainHeight {
select {
case m.pruningC <- retainHeight:
default:
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
Expand Down Expand Up @@ -96,14 +108,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

types.RollappHeightGauge.Set(float64(block.Header.Height))

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
_, err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
}

m.blockCache.Delete(block.Header.Height)

if switchRole {
Expand Down
34 changes: 24 additions & 10 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
Expand Down Expand Up @@ -77,6 +78,12 @@ type Manager struct {

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// channel used to send the retain height to the pruning background loop
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
}

// NewManager creates new block Manager.
Expand All @@ -92,6 +99,7 @@ func NewManager(
pubsub *pubsub.Server,
p2pClient *p2p.Client,
dalcKV *store.PrefixKV,
indexerService *txindex.IndexerService,
logger log.Logger,
) (*Manager, error) {
localAddress, err := types.GetAddress(localKey)
Expand All @@ -104,18 +112,20 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
logger: logger.With("module", "block_manager"),
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
indexerService: indexerService,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
}

err = m.LoadStateOnInit(store, genesis, logger)
Expand Down Expand Up @@ -158,6 +168,11 @@ func (m *Manager) Start(ctx context.Context) error {
isProposer := m.IsProposer()
m.logger.Info("starting block manager", "proposer", isProposer)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.PruningLoop(ctx)
})

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
Expand Down Expand Up @@ -203,7 +218,6 @@ func (m *Manager) Start(ctx context.Context) error {
// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestInitialState(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, settlementlc,
nil, pubsubServer, p2pClient, nil, logger)
nil, pubsubServer, p2pClient, nil, nil, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
Expand Down
26 changes: 24 additions & 2 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {
retainHeight = nextSubmissionHeight
}

// prune blocks from blocksync store
err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}

// prune blocks from dymint store
pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger)
if err != nil {
return 0, fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
Expand All @@ -34,3 +41,18 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {

return pruned, nil
}

func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case retainHeight := <-m.pruningC:
_, err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

}
}
}
1 change: 1 addition & 0 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ func TestPruningRetainHeight(t *testing.T) {
require.Error(gerrc.ErrInvalidArgument)
}
}

}
4 changes: 4 additions & 0 deletions indexers/blockindexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package indexer
import (
"context"

"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -19,4 +20,7 @@ type BlockIndexer interface {
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)

// Delete indexed block entries up to (but not including) a height. It returns number of entries pruned.
Prune(from, to uint64, logger log.Logger) (uint64, error)
}
136 changes: 127 additions & 9 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strconv"
"strings"

"github.com/tendermint/tendermint/libs/log"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/google/orderedcode"

Expand All @@ -17,6 +19,8 @@ import (
indexer "github.com/dymensionxyz/dymint/indexers/blockindexer"
"github.com/dymensionxyz/dymint/store"

"github.com/dymensionxyz/dymint/types/pb/dymint"
dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -61,7 +65,6 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
defer batch.Discard()

height := bh.Header.Height

// 1. index by height
key, err := heightKey(height)
if err != nil {
Expand All @@ -72,15 +75,21 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
}

// 2. index BeginBlock events
if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
beginKeys, err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height)
if err != nil {
return fmt.Errorf("index BeginBlock events: %w", err)
}

// 3. index EndBlock events
if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
endKeys, err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height)
if err != nil {
return fmt.Errorf("index EndBlock events: %w", err)
}

// 4. index all eventkeys by height key for easy pruning
err = idx.addEventKeys(height, &beginKeys, &endKeys, batch)
if err != nil {
return err
}
return batch.Commit()
}

Expand Down Expand Up @@ -481,9 +490,9 @@ func (idx *BlockerIndexer) match(
return filteredHeights, nil
}

func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) error {
func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) (dmtypes.EventKeys, error) {
heightBz := int64ToBytes(height)

keys := dmtypes.EventKeys{}
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
Expand All @@ -498,21 +507,130 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == tmtypes.BlockHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
return dmtypes.EventKeys{}, fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}

if attr.GetIndex() {
key, err := eventKey(compositeKey, typ, string(attr.Value), height)
if err != nil {
return fmt.Errorf("create block index key: %w", err)
return dmtypes.EventKeys{}, fmt.Errorf("create block index key: %w", err)
}

if err := batch.Set(key, heightBz); err != nil {
return err
return dmtypes.EventKeys{}, err
}
keys.Keys = append(keys.Keys, key)
}
}
}

return keys, nil
}

func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
blocksPruned, err := idx.pruneBlocks(from, to, logger)
return blocksPruned, err
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) {
pruned := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

flush := func(batch store.KVBatch, height int64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("flush batch to disk: height %d: %w", height, err)
}
return nil
}

for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "err", err)
continue
}
if !ok {
continue
}
key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
continue
}
if err := batch.Delete(key); err != nil {
logger.Debug("pruning block indexer deleting height key", "err", err)
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
logger.Debug("pruning block indexer events", "err", err)
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}

err := flush(batch, int64(to))
if err != nil {
return 0, err
}

return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
eventKey, err := eventHeightKey(height)
if err != nil {
return err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
}
}
return nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
eventKeys := beginKeys
eventKeys.Keys = append(eventKeys.Keys, endKeys.Keys...)
eventKeyHeight, err := eventHeightKey(height)
if err != nil {
return err
}
eventKeysBytes, err := eventKeys.Marshal()
if err != nil {
return err
}
if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 5055ae7

Please sign in to comment.