Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): run dymint store block pruning in background #1053

Merged
merged 9 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("commit block: %w", err)
}

// Prune old heights, if requested by ABCI app.
// retainHeight is determined by currentHeight - min-retain-blocks (app.toml config).
// Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks.

if 0 < retainHeight {
select {
case m.pruningC <- retainHeight:
default:
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
Expand Down Expand Up @@ -96,14 +108,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

types.RollappHeightGauge.Set(float64(block.Header.Height))

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
_, err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
}

m.blockCache.Delete(block.Header.Height)

if switchRole {
Expand Down
34 changes: 24 additions & 10 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
Expand Down Expand Up @@ -77,6 +78,12 @@ type Manager struct {

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// channel used to send the retain height to the pruning background loop
pruningC chan int64

// indexer
indexerService *txindex.IndexerService
}

// NewManager creates new block Manager.
Expand All @@ -92,6 +99,7 @@ func NewManager(
pubsub *pubsub.Server,
p2pClient *p2p.Client,
dalcKV *store.PrefixKV,
indexerService *txindex.IndexerService,
logger log.Logger,
) (*Manager, error) {
localAddress, err := types.GetAddress(localKey)
Expand All @@ -104,18 +112,20 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
logger: logger.With("module", "block_manager"),
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf.BlockManagerConfig,
Genesis: genesis,
Store: store,
Executor: exec,
SLClient: settlementClient,
indexerService: indexerService,
logger: logger.With("module", "block_manager"),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the different between using size 10 and size 0 here? isn't 0 effectively the same, but simpler?

Copy link
Contributor Author

@srene srene Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo is very similar but not exactly the same. in very active rollapps is probably the same. in not that active rollapps it gives some more chance to finish and prune later blocks that otherwise can be left unpruned till new blocks are created, but i agree there is not a big difference.

}

err = m.LoadStateOnInit(store, genesis, logger)
Expand Down Expand Up @@ -158,6 +168,11 @@ func (m *Manager) Start(ctx context.Context) error {
isProposer := m.IsProposer()
m.logger.Info("starting block manager", "proposer", isProposer)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.PruningLoop(ctx)
})

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
Expand Down Expand Up @@ -203,7 +218,6 @@ func (m *Manager) Start(ctx context.Context) error {
// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestInitialState(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, settlementlc,
nil, pubsubServer, p2pClient, nil, logger)
nil, pubsubServer, p2pClient, nil, nil, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
Expand Down
26 changes: 24 additions & 2 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {
retainHeight = nextSubmissionHeight
}

// prune blocks from blocksync store
err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

// prune blocks from indexer store
err = m.indexerService.Prune(m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err)
}

// prune blocks from dymint store
pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger)
if err != nil {
return 0, fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
Expand All @@ -34,3 +41,18 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {

return pruned, nil
}

func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case retainHeight := <-m.pruningC:
_, err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}

}
}
}
1 change: 1 addition & 0 deletions block/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ func TestPruningRetainHeight(t *testing.T) {
require.Error(gerrc.ErrInvalidArgument)
}
}

}
4 changes: 4 additions & 0 deletions indexers/blockindexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package indexer
import (
"context"

"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -19,4 +20,7 @@ type BlockIndexer interface {
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)

// Delete indexed block entries up to (but not including) a height. It returns number of entries pruned.
Prune(from, to uint64, logger log.Logger) (uint64, error)
}
136 changes: 127 additions & 9 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strconv"
"strings"

"github.com/tendermint/tendermint/libs/log"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/google/orderedcode"

Expand All @@ -17,6 +19,8 @@ import (
indexer "github.com/dymensionxyz/dymint/indexers/blockindexer"
"github.com/dymensionxyz/dymint/store"

"github.com/dymensionxyz/dymint/types/pb/dymint"
dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -61,7 +65,6 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
defer batch.Discard()

height := bh.Header.Height

// 1. index by height
key, err := heightKey(height)
if err != nil {
Expand All @@ -72,15 +75,21 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error {
}

// 2. index BeginBlock events
if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
beginKeys, err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height)
if err != nil {
return fmt.Errorf("index BeginBlock events: %w", err)
}

// 3. index EndBlock events
if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
endKeys, err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height)
if err != nil {
return fmt.Errorf("index EndBlock events: %w", err)
}

// 4. index all eventkeys by height key for easy pruning
err = idx.addEventKeys(height, &beginKeys, &endKeys, batch)
if err != nil {
return err
}
return batch.Commit()
}

Expand Down Expand Up @@ -481,9 +490,9 @@ func (idx *BlockerIndexer) match(
return filteredHeights, nil
}

func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) error {
func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) (dmtypes.EventKeys, error) {
heightBz := int64ToBytes(height)

keys := dmtypes.EventKeys{}
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
Expand All @@ -498,21 +507,130 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if compositeKey == tmtypes.BlockHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
return dmtypes.EventKeys{}, fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}

if attr.GetIndex() {
key, err := eventKey(compositeKey, typ, string(attr.Value), height)
if err != nil {
return fmt.Errorf("create block index key: %w", err)
return dmtypes.EventKeys{}, fmt.Errorf("create block index key: %w", err)
}

if err := batch.Set(key, heightBz); err != nil {
return err
return dmtypes.EventKeys{}, err
}
keys.Keys = append(keys.Keys, key)
}
}
}

return keys, nil
}

func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) {
if from <= 0 {
return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument)
}

if to <= from {
return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument)
}
Comment on lines +531 to +537
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really necessary?

Copy link
Contributor Author

@srene srene Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it does not harm, and it avoids executing unnecessary code, although it should not happen.

blocksPruned, err := idx.pruneBlocks(from, to, logger)
return blocksPruned, err
}

func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) {
pruned := uint64(0)
batch := idx.store.NewBatch()
defer batch.Discard()

flush := func(batch store.KVBatch, height int64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("flush batch to disk: height %d: %w", height, err)
}
return nil
}

for h := int64(from); h < int64(to); h++ {
ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "err", err)
continue
}
if !ok {
continue
}
key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
continue
}
if err := batch.Delete(key); err != nil {
logger.Debug("pruning block indexer deleting height key", "err", err)
continue
}
if err := idx.pruneEvents(h, batch); err != nil {
logger.Debug("pruning block indexer events", "err", err)
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}

err := flush(batch, int64(to))
if err != nil {
return 0, err
}

return pruned, nil
}

func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error {
eventKey, err := eventHeightKey(height)
if err != nil {
return err
}
keysList, err := idx.store.Get(eventKey)
if err != nil {
return err
}
eventKeys := &dymint.EventKeys{}
err = eventKeys.Unmarshal(keysList)
if err != nil {
return err
}
for _, key := range eventKeys.Keys {
err := batch.Delete(key)
if err != nil {
return err
}
}
return nil
}

func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error {
eventKeys := beginKeys
eventKeys.Keys = append(eventKeys.Keys, endKeys.Keys...)
eventKeyHeight, err := eventHeightKey(height)
if err != nil {
return err
}
eventKeysBytes, err := eventKeys.Marshal()
if err != nil {
return err
}
if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil {
return err
}
return nil
}
Loading
Loading