From 5055ae7d93b6e81007f223f4bc99fadc8bb01434 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Wed, 25 Sep 2024 12:25:56 +0200 Subject: [PATCH] feat(manager): run dymint store block pruning in background (#1053) --- block/block.go | 20 +- block/manager.go | 34 ++- block/manager_test.go | 2 +- block/pruning.go | 26 ++- block/pruning_test.go | 1 + indexers/blockindexer/block.go | 4 + indexers/blockindexer/kv/kv.go | 136 ++++++++++- indexers/blockindexer/kv/kv_test.go | 83 +++++++ indexers/blockindexer/kv/util.go | 10 + indexers/blockindexer/null/null.go | 6 + indexers/txindex/indexer.go | 5 + indexers/txindex/indexer_service.go | 13 ++ indexers/txindex/indexer_service_test.go | 11 + indexers/txindex/kv/kv.go | 129 ++++++++++- indexers/txindex/kv/kv_test.go | 89 ++++++++ indexers/txindex/kv/utils.go | 12 + indexers/txindex/null/null.go | 6 + node/node.go | 1 + proto/types/dymint/dymint.proto | 4 + testutil/block.go | 21 +- types/pb/dymint/dymint.pb.go | 278 ++++++++++++++++++----- 21 files changed, 804 insertions(+), 87 deletions(-) diff --git a/block/block.go b/block/block.go index a876302e8..ecd544146 100644 --- a/block/block.go +++ b/block/block.go @@ -69,6 +69,18 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta return fmt.Errorf("commit block: %w", err) } + // Prune old heights, if requested by ABCI app. + // retainHeight is determined by currentHeight - min-retain-blocks (app.toml config). + // Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks. + + if 0 < retainHeight { + select { + case m.pruningC <- retainHeight: + default: + m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight) + } + } + // Update the state with the new app hash, and store height from the commit. // Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit. m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height) @@ -96,14 +108,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta types.RollappHeightGauge.Set(float64(block.Header.Height)) - // Prune old heights, if requested by ABCI app. - if 0 < retainHeight { - _, err := m.PruneBlocks(uint64(retainHeight)) - if err != nil { - m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err) - } - } - m.blockCache.Delete(block.Header.Height) if switchRole { diff --git a/block/manager.go b/block/manager.go index e4ece70e0..dd1994bd2 100644 --- a/block/manager.go +++ b/block/manager.go @@ -11,6 +11,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/da/registry" + "github.com/dymensionxyz/dymint/indexers/txindex" "github.com/dymensionxyz/dymint/store" uerrors "github.com/dymensionxyz/dymint/utils/errors" uevent "github.com/dymensionxyz/dymint/utils/event" @@ -77,6 +78,12 @@ type Manager struct { // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA TargetHeight atomic.Uint64 + + // channel used to send the retain height to the pruning background loop + pruningC chan int64 + + // indexer + indexerService *txindex.IndexerService } // NewManager creates new block Manager. @@ -92,6 +99,7 @@ func NewManager( pubsub *pubsub.Server, p2pClient *p2p.Client, dalcKV *store.PrefixKV, + indexerService *txindex.IndexerService, logger log.Logger, ) (*Manager, error) { localAddress, err := types.GetAddress(localKey) @@ -104,18 +112,20 @@ func NewManager( } m := &Manager{ - Pubsub: pubsub, - P2PClient: p2pClient, - LocalKey: localKey, - Conf: conf.BlockManagerConfig, - Genesis: genesis, - Store: store, - Executor: exec, - SLClient: settlementClient, - logger: logger.With("module", "block_manager"), + Pubsub: pubsub, + P2PClient: p2pClient, + LocalKey: localKey, + Conf: conf.BlockManagerConfig, + Genesis: genesis, + Store: store, + Executor: exec, + SLClient: settlementClient, + indexerService: indexerService, + logger: logger.With("module", "block_manager"), blockCache: &Cache{ cache: make(map[uint64]types.CachedBlock), }, + pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. } err = m.LoadStateOnInit(store, genesis, logger) @@ -158,6 +168,11 @@ func (m *Manager) Start(ctx context.Context) error { isProposer := m.IsProposer() m.logger.Info("starting block manager", "proposer", isProposer) + eg, ctx := errgroup.WithContext(ctx) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.PruningLoop(ctx) + }) + /* ----------------------------- full node mode ----------------------------- */ if !isProposer { // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. @@ -203,7 +218,6 @@ func (m *Manager) Start(ctx context.Context) error { // channel to signal sequencer rotation started rotateSequencerC := make(chan string, 1) - eg, ctx := errgroup.WithContext(ctx) uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.SubmitLoop(ctx, bytesProducedC) }) diff --git a/block/manager_test.go b/block/manager_test.go index 25a9cbf69..287757189 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -109,7 +109,7 @@ func TestInitialState(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, settlementlc, - nil, pubsubServer, p2pClient, nil, logger) + nil, pubsubServer, p2pClient, nil, nil, logger) assert.NoError(err) assert.NotNil(agg) assert.Equal(c.expectedChainID, agg.State.ChainID) diff --git a/block/pruning.go b/block/pruning.go index a1cfaeb7d..d4ab18cc8 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -13,17 +13,24 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) { retainHeight = nextSubmissionHeight } + // prune blocks from blocksync store err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight) if err != nil { m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err) } + + // prune blocks from indexer store + err = m.indexerService.Prune(m.State.BaseHeight, retainHeight) + if err != nil { + m.logger.Error("pruning indexer", "retain_height", retainHeight, "err", err) + } + + // prune blocks from dymint store pruned, err := m.Store.PruneStore(m.State.BaseHeight, retainHeight, m.logger) if err != nil { return 0, fmt.Errorf("prune block store: %w", err) } - // TODO: prune state/indexer and state/txindexer?? - m.State.BaseHeight = retainHeight _, err = m.Store.SaveState(m.State, nil) if err != nil { @@ -34,3 +41,18 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) { return pruned, nil } + +func (m *Manager) PruningLoop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case retainHeight := <-m.pruningC: + _, err := m.PruneBlocks(uint64(retainHeight)) + if err != nil { + m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err) + } + + } + } +} diff --git a/block/pruning_test.go b/block/pruning_test.go index 9590be35b..e008634a0 100644 --- a/block/pruning_test.go +++ b/block/pruning_test.go @@ -76,4 +76,5 @@ func TestPruningRetainHeight(t *testing.T) { require.Error(gerrc.ErrInvalidArgument) } } + } diff --git a/indexers/blockindexer/block.go b/indexers/blockindexer/block.go index a3a2abc5b..08d2f6d16 100644 --- a/indexers/blockindexer/block.go +++ b/indexers/blockindexer/block.go @@ -3,6 +3,7 @@ package indexer import ( "context" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" ) @@ -19,4 +20,7 @@ type BlockIndexer interface { // Search performs a query for block heights that match a given BeginBlock // and Endblock event search criteria. Search(ctx context.Context, q *query.Query) ([]int64, error) + + // Delete indexed block entries up to (but not including) a height. It returns number of entries pruned. + Prune(from, to uint64, logger log.Logger) (uint64, error) } diff --git a/indexers/blockindexer/kv/kv.go b/indexers/blockindexer/kv/kv.go index cc10fbe2d..65164ec36 100644 --- a/indexers/blockindexer/kv/kv.go +++ b/indexers/blockindexer/kv/kv.go @@ -8,6 +8,8 @@ import ( "strconv" "strings" + "github.com/tendermint/tendermint/libs/log" + "github.com/dymensionxyz/gerr-cosmos/gerrc" "github.com/google/orderedcode" @@ -17,6 +19,8 @@ import ( indexer "github.com/dymensionxyz/dymint/indexers/blockindexer" "github.com/dymensionxyz/dymint/store" + "github.com/dymensionxyz/dymint/types/pb/dymint" + dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint" tmtypes "github.com/tendermint/tendermint/types" ) @@ -61,7 +65,6 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error { defer batch.Discard() height := bh.Header.Height - // 1. index by height key, err := heightKey(height) if err != nil { @@ -72,15 +75,21 @@ func (idx *BlockerIndexer) Index(bh tmtypes.EventDataNewBlockHeader) error { } // 2. index BeginBlock events - if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil { + beginKeys, err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height) + if err != nil { return fmt.Errorf("index BeginBlock events: %w", err) } - // 3. index EndBlock events - if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil { + endKeys, err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height) + if err != nil { return fmt.Errorf("index EndBlock events: %w", err) } + // 4. index all eventkeys by height key for easy pruning + err = idx.addEventKeys(height, &beginKeys, &endKeys, batch) + if err != nil { + return err + } return batch.Commit() } @@ -481,9 +490,9 @@ func (idx *BlockerIndexer) match( return filteredHeights, nil } -func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) error { +func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, typ string, height int64) (dmtypes.EventKeys, error) { heightBz := int64ToBytes(height) - + keys := dmtypes.EventKeys{} for _, event := range events { // only index events with a non-empty type if len(event.Type) == 0 { @@ -498,21 +507,130 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event, // index iff the event specified index:true and it's not a reserved event compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) if compositeKey == tmtypes.BlockHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) + return dmtypes.EventKeys{}, fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) } if attr.GetIndex() { key, err := eventKey(compositeKey, typ, string(attr.Value), height) if err != nil { - return fmt.Errorf("create block index key: %w", err) + return dmtypes.EventKeys{}, fmt.Errorf("create block index key: %w", err) } if err := batch.Set(key, heightBz); err != nil { - return err + return dmtypes.EventKeys{}, err } + keys.Keys = append(keys.Keys, key) + } + } + } + + return keys, nil +} + +func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) { + if from <= 0 { + return 0, fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument) + } + + if to <= from { + return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument) + } + blocksPruned, err := idx.pruneBlocks(from, to, logger) + return blocksPruned, err +} + +func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint64, error) { + pruned := uint64(0) + batch := idx.store.NewBatch() + defer batch.Discard() + + flush := func(batch store.KVBatch, height int64) error { + err := batch.Commit() + if err != nil { + return fmt.Errorf("flush batch to disk: height %d: %w", height, err) + } + return nil + } + + for h := int64(from); h < int64(to); h++ { + ok, err := idx.Has(h) + if err != nil { + logger.Debug("pruning block indexer checking height", "err", err) + continue + } + if !ok { + continue + } + key, err := heightKey(h) + if err != nil { + logger.Debug("pruning block indexer getting height key", "err", err) + continue + } + if err := batch.Delete(key); err != nil { + logger.Debug("pruning block indexer deleting height key", "err", err) + continue + } + if err := idx.pruneEvents(h, batch); err != nil { + logger.Debug("pruning block indexer events", "err", err) + continue + } + pruned++ + + // flush every 1000 blocks to avoid batches becoming too large + if pruned%1000 == 0 && pruned > 0 { + err := flush(batch, h) + if err != nil { + return 0, err } + batch.Discard() + batch = idx.store.NewBatch() } } + err := flush(batch, int64(to)) + if err != nil { + return 0, err + } + + return pruned, nil +} + +func (idx *BlockerIndexer) pruneEvents(height int64, batch store.KVBatch) error { + eventKey, err := eventHeightKey(height) + if err != nil { + return err + } + keysList, err := idx.store.Get(eventKey) + if err != nil { + return err + } + eventKeys := &dymint.EventKeys{} + err = eventKeys.Unmarshal(keysList) + if err != nil { + return err + } + for _, key := range eventKeys.Keys { + err := batch.Delete(key) + if err != nil { + return err + } + } + return nil +} + +func (idx *BlockerIndexer) addEventKeys(height int64, beginKeys *dymint.EventKeys, endKeys *dymint.EventKeys, batch store.KVBatch) error { + eventKeys := beginKeys + eventKeys.Keys = append(eventKeys.Keys, endKeys.Keys...) + eventKeyHeight, err := eventHeightKey(height) + if err != nil { + return err + } + eventKeysBytes, err := eventKeys.Marshal() + if err != nil { + return err + } + if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil { + return err + } return nil } diff --git a/indexers/blockindexer/kv/kv_test.go b/indexers/blockindexer/kv/kv_test.go index 55bb88539..b7a8a17f6 100644 --- a/indexers/blockindexer/kv/kv_test.go +++ b/indexers/blockindexer/kv/kv_test.go @@ -7,8 +7,10 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" + "golang.org/x/exp/rand" blockidxkv "github.com/dymensionxyz/dymint/indexers/blockindexer/kv" "github.com/dymensionxyz/dymint/store" @@ -93,6 +95,10 @@ func TestBlockIndexer(t *testing.T) { q *query.Query results []int64 }{ + "block.height < 5": { + q: query.MustParse("block.height < 2"), + results: []int64{1}, + }, "block.height = 100": { q: query.MustParse("block.height = 100"), results: []int64{}, @@ -140,3 +146,80 @@ func TestBlockIndexer(t *testing.T) { }) } } + +func TestBlockIndexerPruning(t *testing.T) { + + // init the block indexer + prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events")) + indexer := blockidxkv.New(prefixStore) + numBlocks := uint64(100) + + // index block data + for i := uint64(1); i <= numBlocks; i++ { + indexer.Index(types.EventDataNewBlockHeader{ + Header: types.Header{Height: int64(i)}, + ResultBeginBlock: getBeginBlock(), + ResultEndBlock: getEndBlock(), + }) + } + + // query all blocks and receive events for all block heights + queryString := fmt.Sprintf("block.height <= %d", numBlocks) + q := query.MustParse(queryString) + results, err := indexer.Search(context.Background(), q) + require.NoError(t, err) + require.Equal(t, numBlocks, uint64(len(results))) + + // prune indexer for all heights + pruned, err := indexer.Prune(1, numBlocks+1, log.NewNopLogger()) + require.NoError(t, err) + require.Equal(t, numBlocks, pruned) + + // check the query returns empty + results, err = indexer.Search(context.Background(), q) + require.NoError(t, err) + require.Equal(t, 0, len(results)) + +} + +func getBeginBlock() abci.ResponseBeginBlock { + if rand.Intn(2) == 1 { + return abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "begin_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("proposer"), + Value: []byte("FCAA001"), + Index: true, + }, + }, + }, + }, + } + } else { + return abci.ResponseBeginBlock{} + } +} + +func getEndBlock() abci.ResponseEndBlock { + if rand.Intn(2) == 1 { + return abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: []byte("foo"), + Value: []byte("value"), + Index: true, + }, + }, + }, + }, + } + } else { + return abci.ResponseEndBlock{} + } +} diff --git a/indexers/blockindexer/kv/util.go b/indexers/blockindexer/kv/util.go index bde07a541..d1423d07a 100644 --- a/indexers/blockindexer/kv/util.go +++ b/indexers/blockindexer/kv/util.go @@ -10,6 +10,8 @@ import ( "github.com/tendermint/tendermint/types" ) +const BlockEventHeightKey = "blockevent.height" + func intInSlice(a int, list []int) bool { for _, b := range list { if b == a { @@ -39,6 +41,14 @@ func heightKey(height int64) ([]byte, error) { ) } +func eventHeightKey(height int64) ([]byte, error) { + return orderedcode.Append( + nil, + BlockEventHeightKey, + height, + ) +} + func eventKey(compositeKey, typ, eventValue string, height int64) ([]byte, error) { return orderedcode.Append( nil, diff --git a/indexers/blockindexer/null/null.go b/indexers/blockindexer/null/null.go index 62658e00e..e6ee3335f 100644 --- a/indexers/blockindexer/null/null.go +++ b/indexers/blockindexer/null/null.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/tendermint/tendermint/libs/log" + indexer "github.com/dymensionxyz/dymint/indexers/blockindexer" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" @@ -25,3 +27,7 @@ func (idx *BlockerIndexer) Index(types.EventDataNewBlockHeader) error { func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) { return []int64{}, nil } + +func (idx *BlockerIndexer) Prune(from, to uint64, logger log.Logger) (uint64, error) { + return 0, nil +} diff --git a/indexers/txindex/indexer.go b/indexers/txindex/indexer.go index 6d9f6807d..694b90d86 100644 --- a/indexers/txindex/indexer.go +++ b/indexers/txindex/indexer.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/tendermint/tendermint/libs/log" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" ) @@ -22,6 +24,9 @@ type TxIndexer interface { // Search allows you to query for transactions. Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) + + // Delete index entries for the heights between from (included) and to (not included). It returns heights pruned + Prune(from, to uint64, logger log.Logger) (uint64, error) } // Batch groups together multiple Index operations to be performed at the same time. diff --git a/indexers/txindex/indexer_service.go b/indexers/txindex/indexer_service.go index 0a619e758..88a79b8ae 100644 --- a/indexers/txindex/indexer_service.go +++ b/indexers/txindex/indexer_service.go @@ -97,3 +97,16 @@ func (is *IndexerService) OnStop() { _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) } } + +// Prune removes tx and blocks indexed up to (but not including) a height. +func (is *IndexerService) Prune(from, to uint64) error { + _, err := is.blockIdxr.Prune(from, to, is.Logger) + if err != nil { + return err + } + _, err = is.txIdxr.Prune(from, to, is.Logger) + if err != nil { + return err + } + return nil +} diff --git a/indexers/txindex/indexer_service_test.go b/indexers/txindex/indexer_service_test.go index 3f68eb23b..abd281605 100644 --- a/indexers/txindex/indexer_service_test.go +++ b/indexers/txindex/indexer_service_test.go @@ -78,4 +78,15 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { res, err = txIndexer.Get(types.Tx("bar").Hash()) require.NoError(t, err) require.Equal(t, txResult2, res) + + blocksPruned, err := blockIndexer.Prune(1, 2, log.NewNopLogger()) + require.NoError(t, err) + expectedBlocksPruned := uint64(1) + require.Equal(t, expectedBlocksPruned, blocksPruned) + + txPruned, err := txIndexer.Prune(1, 2, log.NewNopLogger()) + require.NoError(t, err) + expectedTxPruned := uint64(2) + require.Equal(t, expectedTxPruned, txPruned) + } diff --git a/indexers/txindex/kv/kv.go b/indexers/txindex/kv/kv.go index 6c3435e4e..d9b54508f 100644 --- a/indexers/txindex/kv/kv.go +++ b/indexers/txindex/kv/kv.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/libs/log" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -18,6 +19,8 @@ import ( "github.com/dymensionxyz/dymint/indexers/txindex" "github.com/dymensionxyz/dymint/store" "github.com/dymensionxyz/dymint/types" + "github.com/dymensionxyz/dymint/types/pb/dymint" + dmtypes "github.com/dymensionxyz/dymint/types/pb/dymint" ) const ( @@ -74,7 +77,12 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { hash := types.Tx(result.Tx).Hash() // index tx by events - err := txi.indexEvents(result, hash, storeBatch) + eventKeys, err := txi.indexEvents(result, hash, storeBatch) + if err != nil { + return err + } + + err = txi.addEventKeys(result.Height, &eventKeys, storeBatch) if err != nil { return err } @@ -110,11 +118,17 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { hash := types.Tx(result.Tx).Hash() // index tx by events - err := txi.indexEvents(result, hash, b) + eventKeys, err := txi.indexEvents(result, hash, b) if err != nil { return err } + // add event keys height index + err = txi.addEventKeys(result.Height, &eventKeys, b) + if err != nil { + return nil + } + // index by height (always) err = b.Set(keyForHeight(result), hash) if err != nil { @@ -134,7 +148,8 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { return b.Commit() } -func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store store.KVBatch) error { +func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store store.KVBatch) (dmtypes.EventKeys, error) { + eventKeys := dmtypes.EventKeys{} for _, event := range result.Result.Events { // only index events with a non-empty type if len(event.Type) == 0 { @@ -151,13 +166,14 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store store. if attr.GetIndex() { err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash) if err != nil { - return err + return dmtypes.EventKeys{}, err } + eventKeys.Keys = append(eventKeys.Keys, keyForEvent(compositeTag, attr.Value, result)) } } } - return nil + return eventKeys, nil } // Search performs a search using the given query. @@ -553,6 +569,102 @@ LOOP: return filteredHashes } +func (txi *TxIndex) Prune(from, to uint64, logger log.Logger) (uint64, error) { + pruned, err := txi.pruneTxs(from, to) + if err != nil { + return 0, err + } + return pruned, nil +} + +func (txi *TxIndex) pruneTxs(from, to uint64) (uint64, error) { + pruned := uint64(0) + batch := txi.store.NewBatch() + defer batch.Discard() + + flush := func(batch store.KVBatch, height int64) error { + err := batch.Commit() + if err != nil { + return fmt.Errorf("flush batch to disk: height %d: %w", height, err) + } + return nil + } + + for h := from; h < to; h++ { + + it := txi.store.PrefixIterator(prefixForHeight(int64(h))) + defer it.Discard() + + for ; it.Valid(); it.Next() { + if err := batch.Delete(it.Key()); err != nil { + continue + } + if err := batch.Delete(it.Value()); err != nil { + continue + } + if err := txi.pruneEvents(h, batch); err != nil { + continue + } + pruned++ + // flush every 1000 txs to avoid batches becoming too large + if pruned%1000 == 0 && pruned > 0 { + err := flush(batch, int64(h)) + if err != nil { + return 0, err + } + batch.Discard() + batch = txi.store.NewBatch() + } + } + } + + err := flush(batch, int64(to)) + if err != nil { + return 0, err + } + + return pruned, nil +} + +func (txi *TxIndex) pruneEvents(height uint64, batch store.KVBatch) error { + eventKey, err := eventHeightKey(int64(height)) + if err != nil { + return err + } + keysList, err := txi.store.Get(eventKey) + if err != nil { + return err + } + eventKeys := &dymint.EventKeys{} + err = eventKeys.Unmarshal(keysList) + if err != nil { + return err + } + for _, key := range eventKeys.Keys { + err := batch.Delete(key) + if err != nil { + return err + } + } + return nil +} + +func (txi *TxIndex) addEventKeys(height int64, eventKeys *dymint.EventKeys, batch store.KVBatch) error { + // index event keys by height + eventKeyHeight, err := eventHeightKey(height) + if err != nil { + return err + } + eventKeysBytes, err := eventKeys.Marshal() + if err != nil { + return err + } + if err := batch.Set(eventKeyHeight, eventKeysBytes); err != nil { + return err + } + return nil +} + // Keys func isTagKey(key []byte) bool { @@ -596,3 +708,10 @@ func startKey(fields ...interface{}) []byte { } return b.Bytes() } + +func prefixForHeight(height int64) []byte { + return []byte(fmt.Sprintf("%s/%d/", + tmtypes.TxHeightKey, + height, + )) +} diff --git a/indexers/txindex/kv/kv_test.go b/indexers/txindex/kv/kv_test.go index 023bb1f84..60caf7f54 100644 --- a/indexers/txindex/kv/kv_test.go +++ b/indexers/txindex/kv/kv_test.go @@ -9,8 +9,10 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub/query" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/types" @@ -312,6 +314,51 @@ func TestTxSearchMultipleTxs(t *testing.T) { require.Len(t, results, 3) } +func TestTxIndexerPruning(t *testing.T) { + + // init the block indexer + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + numBlocks := uint64(100) + + txsWithEvents := 0 + + // index tx event data + for i := uint64(1); i <= numBlocks; i++ { + events := getNEvents(rand.Intn(10)) + txResult := getRandomTxResult(int64(i), events) + err := indexer.Index(txResult) + require.NoError(t, err) + if len(events) > 0 { + txsWithEvents++ + } + } + + q := query.MustParse("account.number = 1") + // query all blocks and receive events for all txs + results, err := indexer.Search(context.Background(), q) + require.NoError(t, err) + require.Equal(t, txsWithEvents, len(results)) + + // prune indexer for all heights + pruned, err := indexer.Prune(1, numBlocks+1, log.NewNopLogger()) + require.NoError(t, err) + require.Equal(t, uint64(numBlocks), pruned) + + // check the query returns empty + results, err = indexer.Search(context.Background(), q) + require.NoError(t, err) + require.Equal(t, 0, len(results)) + + conditions, err := q.Conditions() + require.NoError(t, err) + // check there are no unlinked events matching the query that are not found with Search + for _, c := range conditions { + results := indexer.match(context.Background(), c, startKeyForCondition(c, 0), nil, true) + require.Equal(t, 0, len(results)) + } + +} + func txResultWithEvents(events []abci.Event) *abci.TxResult { tx := types.Tx("HELLO WORLD") return &abci.TxResult{ @@ -326,6 +373,48 @@ func txResultWithEvents(events []abci.Event) *abci.TxResult { }, } } +func getRandomTxResult(height int64, events []abci.Event) *abci.TxResult { + tx := types.Tx(randomTxHash()) + return &abci.TxResult{ + Height: height, + Index: 0, + Tx: tx, + Result: abci.ResponseDeliverTx{ + Data: []byte{0}, + Code: abci.CodeTypeOK, + Log: "", + Events: events, + }, + } +} + +func getNEvents(n int) []abci.Event { + events := []abci.Event{} + for i := 0; i < n; i++ { + event := abci.Event{ + Type: "account", + Attributes: []abci.EventAttribute{ + { + Key: []byte("number"), + Value: []byte("1"), + Index: true, + }, + }, + } + events = append(events, event) + } + return events +} + +func randomTxHash() string { + symbols := "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ" + + b := make([]byte, 64) + for i := range b { + b[i] = symbols[rand.Int63()%int64(len(symbols))] + } + return string(b) +} func benchmarkTxIndex(txsCount int64, b *testing.B) { dir, err := os.MkdirTemp("", "tx_index_db") diff --git a/indexers/txindex/kv/utils.go b/indexers/txindex/kv/utils.go index 48362bfbc..73cb223f2 100644 --- a/indexers/txindex/kv/utils.go +++ b/indexers/txindex/kv/utils.go @@ -1,5 +1,9 @@ package kv +import "github.com/google/orderedcode" + +const TxEventHeightKey = "txevent.height" + // IntInSlice returns true if a is found in the list. func intInSlice(a int, list []int) bool { for _, b := range list { @@ -9,3 +13,11 @@ func intInSlice(a int, list []int) bool { } return false } + +func eventHeightKey(height int64) ([]byte, error) { + return orderedcode.Append( + nil, + TxEventHeightKey, + height, + ) +} diff --git a/indexers/txindex/null/null.go b/indexers/txindex/null/null.go index 4cf66cbbf..426b08099 100644 --- a/indexers/txindex/null/null.go +++ b/indexers/txindex/null/null.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/tendermint/tendermint/libs/log" + "github.com/dymensionxyz/dymint/indexers/txindex" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -32,3 +34,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { return []*abci.TxResult{}, nil } + +func (txi *TxIndex) Prune(from, to uint64, logger log.Logger) (uint64, error) { + return 0, nil +} diff --git a/node/node.go b/node/node.go index f620f8929..680ddb761 100644 --- a/node/node.go +++ b/node/node.go @@ -160,6 +160,7 @@ func NewNode( pubsubServer, nil, // p2p client is set later dalcKV, + indexerService, logger, ) if err != nil { diff --git a/proto/types/dymint/dymint.proto b/proto/types/dymint/dymint.proto index 80e733097..e741491ad 100755 --- a/proto/types/dymint/dymint.proto +++ b/proto/types/dymint/dymint.proto @@ -98,4 +98,8 @@ message Sequencer { message SequencerSet { repeated Sequencer sequencers = 1; Sequencer proposer = 2; +} + +message EventKeys { + repeated bytes keys = 1; } \ No newline at end of file diff --git a/testutil/block.go b/testutil/block.go index 88d964dc5..99dc407c3 100644 --- a/testutil/block.go +++ b/testutil/block.go @@ -7,6 +7,8 @@ import ( "time" "github.com/dymensionxyz/dymint/block" + "github.com/dymensionxyz/dymint/indexers/txindex" + "github.com/dymensionxyz/dymint/indexers/txindex/kv" "github.com/dymensionxyz/dymint/p2p" "github.com/dymensionxyz/dymint/settlement" "github.com/ipfs/go-datastore" @@ -20,6 +22,7 @@ import ( "github.com/dymensionxyz/dymint/config" "github.com/dymensionxyz/dymint/da" localda "github.com/dymensionxyz/dymint/da/local" + blockidxkv "github.com/dymensionxyz/dymint/indexers/blockindexer/kv" mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1" nodemempool "github.com/dymensionxyz/dymint/node/mempool" slregistry "github.com/dymensionxyz/dymint/settlement/registry" @@ -104,8 +107,14 @@ func GetManagerWithProposerKey(conf config.BlockManagerConfig, proposerKey crypt BlockManagerConfig: conf, DAConfig: "", } + + indexer, err := createIndexerService() + if err != nil { + return nil, err + } + manager, err := block.NewManager(proposerKey, config, genesis, managerStore, mp, proxyApp, settlementlc, nil, - pubsubServer, p2pClient, nil, logger) + pubsubServer, p2pClient, nil, indexer, logger) if err != nil { return nil, err } @@ -161,3 +170,13 @@ func GetManagerConfig() config.BlockManagerConfig { BatchSkew: 10, } } + +func createIndexerService() (*txindex.IndexerService, error) { + kvStore := store.NewDefaultInMemoryKVStore() + txIndexer := kv.NewTxIndex(kvStore) + blockIndexer := blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events"))) + + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, nil) + + return indexerService, nil +} diff --git a/types/pb/dymint/dymint.pb.go b/types/pb/dymint/dymint.pb.go index f3894f5d5..45b8ad00c 100644 --- a/types/pb/dymint/dymint.pb.go +++ b/types/pb/dymint/dymint.pb.go @@ -610,6 +610,50 @@ func (m *SequencerSet) GetProposer() *Sequencer { return nil } +type EventKeys struct { + Keys [][]byte `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` +} + +func (m *EventKeys) Reset() { *m = EventKeys{} } +func (m *EventKeys) String() string { return proto.CompactTextString(m) } +func (*EventKeys) ProtoMessage() {} +func (*EventKeys) Descriptor() ([]byte, []int) { + return fileDescriptor_fe69c538ded4b87f, []int{8} +} +func (m *EventKeys) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *EventKeys) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_EventKeys.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *EventKeys) XXX_Merge(src proto.Message) { + xxx_messageInfo_EventKeys.Merge(m, src) +} +func (m *EventKeys) XXX_Size() int { + return m.Size() +} +func (m *EventKeys) XXX_DiscardUnknown() { + xxx_messageInfo_EventKeys.DiscardUnknown(m) +} + +var xxx_messageInfo_EventKeys proto.InternalMessageInfo + +func (m *EventKeys) GetKeys() [][]byte { + if m != nil { + return m.Keys + } + return nil +} + func init() { proto.RegisterType((*Version)(nil), "dymint.Version") proto.RegisterType((*Header)(nil), "dymint.Header") @@ -619,62 +663,65 @@ func init() { proto.RegisterType((*Batch)(nil), "dymint.Batch") proto.RegisterType((*Sequencer)(nil), "dymint.Sequencer") proto.RegisterType((*SequencerSet)(nil), "dymint.SequencerSet") + proto.RegisterType((*EventKeys)(nil), "dymint.EventKeys") } func init() { proto.RegisterFile("types/dymint/dymint.proto", fileDescriptor_fe69c538ded4b87f) } var fileDescriptor_fe69c538ded4b87f = []byte{ - // 797 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x95, 0x4d, 0x4f, 0xdb, 0x48, - 0x18, 0xc7, 0x31, 0x09, 0x79, 0x79, 0x1c, 0x02, 0x99, 0x5d, 0x21, 0x07, 0xb4, 0xd9, 0x60, 0x09, - 0x14, 0x56, 0xc2, 0x88, 0xac, 0x56, 0xda, 0xbd, 0xac, 0xb4, 0x6c, 0x2b, 0x85, 0xeb, 0x44, 0xe2, - 0xd0, 0x4b, 0x34, 0xb1, 0x47, 0xb1, 0xd5, 0xf8, 0xa5, 0x9e, 0x09, 0x82, 0x1e, 0xb9, 0xf5, 0xd6, - 0x73, 0x6f, 0xfd, 0x36, 0x3d, 0x72, 0xec, 0xb1, 0x82, 0x2f, 0x52, 0xcd, 0x33, 0x63, 0xc7, 0x50, - 0x7a, 0x01, 0xcf, 0xff, 0xff, 0xf3, 0xcc, 0xe3, 0xe7, 0x65, 0x02, 0x7d, 0x79, 0x9b, 0x71, 0x71, - 0x16, 0xdc, 0xc6, 0x51, 0x22, 0xcd, 0x3f, 0x2f, 0xcb, 0x53, 0x99, 0x92, 0x86, 0x5e, 0xed, 0x1f, - 0x6a, 0x44, 0xf2, 0x24, 0xe0, 0x39, 0x62, 0x6c, 0xee, 0x47, 0x67, 0xa8, 0x6a, 0x74, 0xdf, 0xfd, - 0x01, 0x31, 0x42, 0x85, 0x39, 0xfe, 0x09, 0x73, 0xcd, 0x96, 0x51, 0xc0, 0x64, 0x9a, 0x6b, 0xce, - 0x3d, 0x87, 0xe6, 0x15, 0xcf, 0x45, 0x94, 0x26, 0xe4, 0x57, 0xd8, 0x9a, 0x2f, 0x53, 0xff, 0xad, - 0x63, 0x0d, 0xad, 0x51, 0x9d, 0xea, 0x05, 0xd9, 0x85, 0x1a, 0xcb, 0x32, 0x67, 0x13, 0x35, 0xf5, - 0xe8, 0xde, 0xd5, 0xa1, 0x31, 0xe1, 0x2c, 0xe0, 0x39, 0x39, 0x81, 0xe6, 0xb5, 0x7e, 0x1b, 0x5f, - 0xb2, 0xc7, 0x3b, 0x9e, 0xf9, 0x28, 0xb3, 0x29, 0x2d, 0x7c, 0x72, 0x04, 0x9d, 0x84, 0xc5, 0x5c, - 0x64, 0xcc, 0xe7, 0xb3, 0x28, 0xc0, 0x0d, 0x3b, 0x17, 0x9b, 0x8e, 0x45, 0xed, 0x52, 0xbf, 0x0c, - 0xc8, 0x1e, 0x34, 0x42, 0x1e, 0x2d, 0x42, 0xe9, 0xd4, 0xf0, 0x44, 0xb3, 0x22, 0x04, 0xea, 0x32, - 0x8a, 0xb9, 0x53, 0x47, 0x15, 0x9f, 0xc9, 0x08, 0x76, 0x97, 0x4c, 0xc8, 0x59, 0x88, 0xc1, 0xcc, - 0x42, 0x26, 0x42, 0x67, 0x4b, 0x6d, 0x4b, 0xbb, 0x4a, 0xd7, 0x31, 0x4e, 0x98, 0x08, 0x4b, 0xd2, - 0x4f, 0xe3, 0x38, 0x92, 0x9a, 0x6c, 0xac, 0xc9, 0xff, 0x51, 0x46, 0xf2, 0x00, 0xda, 0x01, 0x93, - 0x4c, 0x23, 0x4d, 0x44, 0x5a, 0x4a, 0x40, 0xf3, 0x08, 0xba, 0x7e, 0x9a, 0x08, 0x9e, 0x88, 0x95, - 0xd0, 0x44, 0x0b, 0x89, 0xed, 0x52, 0x45, 0xac, 0x0f, 0x2d, 0x96, 0x65, 0x1a, 0x68, 0x23, 0xd0, - 0x64, 0x59, 0x86, 0xd6, 0x1f, 0xd0, 0xc3, 0x40, 0x72, 0x2e, 0x56, 0x4b, 0x69, 0x36, 0x01, 0x64, - 0x76, 0x94, 0x41, 0xb5, 0x8e, 0xec, 0x09, 0xec, 0x66, 0x79, 0x9a, 0xa5, 0x82, 0xe7, 0x33, 0x16, - 0x04, 0x39, 0x17, 0xc2, 0xb1, 0x35, 0x5a, 0xe8, 0xff, 0x69, 0x59, 0x05, 0x26, 0xf8, 0xbb, 0x15, - 0x4f, 0xfc, 0x22, 0x0f, 0x1d, 0x1d, 0x58, 0xa9, 0xe2, 0x8e, 0x1e, 0xfc, 0x92, 0xf0, 0x1b, 0x39, - 0x7b, 0xc6, 0x76, 0x91, 0xed, 0x29, 0x6b, 0xfa, 0x84, 0xef, 0x43, 0xcb, 0x0f, 0x59, 0x94, 0xa8, - 0x7a, 0x6d, 0x0f, 0xad, 0x51, 0x9b, 0x36, 0x71, 0x7d, 0x19, 0xb8, 0x9f, 0x2d, 0x68, 0xe8, 0xb4, - 0x55, 0x4a, 0x66, 0x3d, 0x29, 0xd9, 0xef, 0x60, 0x57, 0x2b, 0x83, 0x05, 0xa7, 0x10, 0xae, 0xab, - 0x32, 0x00, 0x10, 0xd1, 0x22, 0x61, 0x72, 0x95, 0x73, 0xe1, 0xd4, 0x86, 0x35, 0xe5, 0xaf, 0x15, - 0xf2, 0x2f, 0x74, 0x64, 0x3c, 0x2b, 0x05, 0xac, 0xbd, 0x3d, 0x3e, 0xf0, 0xd6, 0x4d, 0xed, 0xe9, - 0x96, 0xd7, 0x81, 0x4c, 0xa3, 0x05, 0xb5, 0x65, 0x3c, 0x2d, 0x78, 0xf7, 0x83, 0x05, 0xf5, 0x57, - 0x4c, 0x32, 0xd5, 0xc3, 0xf2, 0x46, 0x38, 0x16, 0x9e, 0xa0, 0x1e, 0xc9, 0xdf, 0xe0, 0x44, 0x89, - 0xe4, 0x79, 0xcc, 0x83, 0x88, 0x49, 0x3e, 0x13, 0x52, 0xfd, 0xcd, 0xd3, 0x54, 0x0a, 0x67, 0x13, - 0xb1, 0xbd, 0xaa, 0x3f, 0x55, 0x36, 0x55, 0x2e, 0xf9, 0x0b, 0x5a, 0xfc, 0x3a, 0x0a, 0x54, 0x92, - 0x30, 0x64, 0x7b, 0xdc, 0xaf, 0x06, 0xa4, 0x86, 0xd5, 0x7b, 0x6d, 0x00, 0x5a, 0xa2, 0xee, 0x9d, - 0x05, 0x5b, 0x17, 0x38, 0x50, 0xc7, 0x2a, 0x5d, 0x2a, 0x07, 0x66, 0x64, 0xba, 0xc5, 0xc8, 0xe8, - 0x7e, 0xa5, 0xc6, 0x25, 0x43, 0xa8, 0xab, 0xc6, 0xc3, 0xbc, 0xd9, 0xe3, 0x4e, 0x41, 0xa9, 0x0f, - 0xa2, 0xe8, 0x90, 0x33, 0xb0, 0x2b, 0x5d, 0x8d, 0x03, 0x53, 0xd9, 0x4e, 0x27, 0x85, 0xc2, 0xba, - 0xc1, 0xdd, 0x4f, 0x2a, 0x08, 0x26, 0xfd, 0x90, 0x1c, 0x42, 0x47, 0x48, 0x96, 0xab, 0xd9, 0xa9, - 0x54, 0xce, 0x46, 0x6d, 0xa2, 0xcb, 0xf7, 0x1b, 0x00, 0x4f, 0x82, 0x02, 0xd0, 0xf3, 0xdf, 0xe6, - 0x49, 0x60, 0xec, 0x23, 0x68, 0xe0, 0x05, 0x21, 0x4c, 0x16, 0xb6, 0x8b, 0x73, 0xf1, 0x2b, 0xa9, - 0x31, 0xc9, 0x08, 0x9a, 0x3a, 0x3c, 0xe1, 0xd4, 0x91, 0x7b, 0x1e, 0x5f, 0x61, 0xbb, 0x2b, 0x68, - 0x97, 0xdd, 0x47, 0x4e, 0x81, 0x08, 0x2e, 0xe5, 0x92, 0xc7, 0x3c, 0x91, 0x65, 0xf7, 0x5b, 0xd8, - 0x83, 0xbd, 0xb5, 0x53, 0xf4, 0xff, 0x3f, 0xd0, 0x2e, 0x2f, 0x36, 0x93, 0xb0, 0x17, 0xda, 0xe4, - 0xaa, 0x40, 0xe8, 0x9a, 0x76, 0x33, 0xe8, 0x94, 0xc7, 0x4e, 0xb9, 0x24, 0xe7, 0x00, 0xe5, 0x78, - 0xe8, 0x96, 0xb1, 0xc7, 0xbd, 0x22, 0xe6, 0x92, 0xa4, 0x15, 0x88, 0x9c, 0x42, 0xab, 0x18, 0x48, - 0x73, 0xf8, 0x0b, 0x2f, 0x94, 0xc8, 0xc5, 0xe4, 0xcb, 0xc3, 0xc0, 0xba, 0x7f, 0x18, 0x58, 0xdf, - 0x1e, 0x06, 0xd6, 0xc7, 0xc7, 0xc1, 0xc6, 0xfd, 0xe3, 0x60, 0xe3, 0xeb, 0xe3, 0x60, 0xe3, 0x8d, - 0xb7, 0x88, 0x64, 0xb8, 0x9a, 0x7b, 0x7e, 0x1a, 0xab, 0x1f, 0x07, 0x9e, 0xa8, 0x9b, 0xf3, 0xe6, - 0xf6, 0x7d, 0xf1, 0x83, 0xa1, 0xaf, 0xf0, 0x6c, 0x6e, 0xd6, 0xf3, 0x06, 0xde, 0xe1, 0x7f, 0x7e, - 0x0f, 0x00, 0x00, 0xff, 0xff, 0xc4, 0xde, 0x10, 0x30, 0x57, 0x06, 0x00, 0x00, + // 819 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x95, 0xbf, 0x6f, 0xdb, 0x46, + 0x14, 0xc7, 0x4d, 0x4b, 0xd6, 0x8f, 0x47, 0x59, 0xb1, 0xae, 0x45, 0x40, 0x25, 0xa8, 0xa2, 0x10, + 0x70, 0xa0, 0x14, 0x08, 0x8d, 0xa8, 0x28, 0xd0, 0x2e, 0x05, 0xea, 0x36, 0x80, 0x82, 0x6e, 0x27, + 0x20, 0x43, 0x17, 0xe1, 0x44, 0x3e, 0x88, 0x44, 0xc4, 0x23, 0xcb, 0x3b, 0x09, 0x56, 0xc7, 0x6c, + 0xdd, 0x3a, 0x77, 0xeb, 0x7f, 0xd3, 0x31, 0x63, 0xc7, 0xc2, 0xfe, 0x47, 0x8a, 0x7b, 0x47, 0x52, + 0x8c, 0xeb, 0x2e, 0x36, 0xef, 0xfb, 0xfd, 0xf0, 0xee, 0xf1, 0xfd, 0x38, 0xc1, 0x58, 0x1f, 0x72, + 0x54, 0x57, 0xd1, 0x21, 0x4d, 0xa4, 0x2e, 0xff, 0x05, 0x79, 0x91, 0xe9, 0x8c, 0x75, 0xec, 0xea, + 0xc9, 0x73, 0x8b, 0x68, 0x94, 0x11, 0x16, 0x84, 0x89, 0x75, 0x98, 0x5c, 0x91, 0x6a, 0xd1, 0x27, + 0xfe, 0x7f, 0x90, 0x52, 0x68, 0x30, 0x2f, 0xfe, 0x87, 0xd9, 0x8b, 0x6d, 0x12, 0x09, 0x9d, 0x15, + 0x96, 0xf3, 0x5f, 0x43, 0xf7, 0x1d, 0x16, 0x2a, 0xc9, 0x24, 0xfb, 0x1c, 0xce, 0xd6, 0xdb, 0x2c, + 0x7c, 0xef, 0x39, 0x53, 0x67, 0xd6, 0xe6, 0x76, 0xc1, 0x2e, 0xa0, 0x25, 0xf2, 0xdc, 0x3b, 0x25, + 0xcd, 0x3c, 0xfa, 0x1f, 0xda, 0xd0, 0x59, 0xa0, 0x88, 0xb0, 0x60, 0x2f, 0xa1, 0xbb, 0xb7, 0x6f, + 0xd3, 0x4b, 0xee, 0xfc, 0x51, 0x50, 0x7e, 0x54, 0xb9, 0x29, 0xaf, 0x7c, 0x76, 0x09, 0x03, 0x29, + 0x52, 0x54, 0xb9, 0x08, 0x71, 0x95, 0x44, 0xb4, 0xe1, 0xe0, 0xfa, 0xd4, 0x73, 0xb8, 0x5b, 0xeb, + 0x6f, 0x23, 0xf6, 0x18, 0x3a, 0x31, 0x26, 0x9b, 0x58, 0x7b, 0x2d, 0x3a, 0xb1, 0x5c, 0x31, 0x06, + 0x6d, 0x9d, 0xa4, 0xe8, 0xb5, 0x49, 0xa5, 0x67, 0x36, 0x83, 0x8b, 0xad, 0x50, 0x7a, 0x15, 0x53, + 0x30, 0xab, 0x58, 0xa8, 0xd8, 0x3b, 0x33, 0xdb, 0xf2, 0xa1, 0xd1, 0x6d, 0x8c, 0x0b, 0xa1, 0xe2, + 0x9a, 0x0c, 0xb3, 0x34, 0x4d, 0xb4, 0x25, 0x3b, 0x47, 0xf2, 0x07, 0x92, 0x89, 0x7c, 0x0a, 0xfd, + 0x48, 0x68, 0x61, 0x91, 0x2e, 0x21, 0x3d, 0x23, 0x90, 0x79, 0x09, 0xc3, 0x30, 0x93, 0x0a, 0xa5, + 0xda, 0x29, 0x4b, 0xf4, 0x88, 0x38, 0xaf, 0x55, 0xc2, 0xc6, 0xd0, 0x13, 0x79, 0x6e, 0x81, 0x3e, + 0x01, 0x5d, 0x91, 0xe7, 0x64, 0x7d, 0x09, 0x23, 0x0a, 0xa4, 0x40, 0xb5, 0xdb, 0xea, 0x72, 0x13, + 0x20, 0xe6, 0x91, 0x31, 0xb8, 0xd5, 0x89, 0x7d, 0x09, 0x17, 0x79, 0x91, 0xe5, 0x99, 0xc2, 0x62, + 0x25, 0xa2, 0xa8, 0x40, 0xa5, 0x3c, 0xd7, 0xa2, 0x95, 0xfe, 0xbd, 0x95, 0x4d, 0x60, 0x0a, 0x7f, + 0xd9, 0xa1, 0x0c, 0xab, 0x3c, 0x0c, 0x6c, 0x60, 0xb5, 0x4a, 0x3b, 0x06, 0xf0, 0x99, 0xc4, 0x1b, + 0xbd, 0xba, 0xc7, 0x0e, 0x89, 0x1d, 0x19, 0x6b, 0xf9, 0x09, 0x3f, 0x86, 0x5e, 0x18, 0x8b, 0x44, + 0x9a, 0x7a, 0x9d, 0x4f, 0x9d, 0x59, 0x9f, 0x77, 0x69, 0xfd, 0x36, 0xf2, 0xff, 0x74, 0xa0, 0x63, + 0xd3, 0xd6, 0x28, 0x99, 0xf3, 0x49, 0xc9, 0x9e, 0x81, 0xdb, 0xac, 0x0c, 0x15, 0x9c, 0x43, 0x7c, + 0xac, 0xca, 0x04, 0x40, 0x25, 0x1b, 0x29, 0xf4, 0xae, 0x40, 0xe5, 0xb5, 0xa6, 0x2d, 0xe3, 0x1f, + 0x15, 0xf6, 0x1d, 0x0c, 0x74, 0xba, 0xaa, 0x05, 0xaa, 0xbd, 0x3b, 0x7f, 0x1a, 0x1c, 0x9b, 0x3a, + 0xb0, 0x2d, 0x6f, 0x03, 0x59, 0x26, 0x1b, 0xee, 0xea, 0x74, 0x59, 0xf1, 0xfe, 0x6f, 0x0e, 0xb4, + 0x7f, 0x14, 0x5a, 0x98, 0x1e, 0xd6, 0x37, 0xca, 0x73, 0xe8, 0x04, 0xf3, 0xc8, 0xbe, 0x01, 0x2f, + 0x91, 0x1a, 0x8b, 0x14, 0xa3, 0x44, 0x68, 0x5c, 0x29, 0x6d, 0xfe, 0x16, 0x59, 0xa6, 0x95, 0x77, + 0x4a, 0xd8, 0xe3, 0xa6, 0xbf, 0x34, 0x36, 0x37, 0x2e, 0xfb, 0x1a, 0x7a, 0xb8, 0x4f, 0x22, 0x93, + 0x24, 0x0a, 0xd9, 0x9d, 0x8f, 0x9b, 0x01, 0x99, 0x61, 0x0d, 0xde, 0x94, 0x00, 0xaf, 0x51, 0xff, + 0x83, 0x03, 0x67, 0xd7, 0x34, 0x50, 0x2f, 0x4c, 0xba, 0x4c, 0x0e, 0xca, 0x91, 0x19, 0x56, 0x23, + 0x63, 0xfb, 0x95, 0x97, 0x2e, 0x9b, 0x42, 0xdb, 0x34, 0x1e, 0xe5, 0xcd, 0x9d, 0x0f, 0x2a, 0xca, + 0x7c, 0x10, 0x27, 0x87, 0x5d, 0x81, 0xdb, 0xe8, 0x6a, 0x1a, 0x98, 0xc6, 0x76, 0x36, 0x29, 0x1c, + 0x8e, 0x0d, 0xee, 0xff, 0x61, 0x82, 0x10, 0x3a, 0x8c, 0xd9, 0x73, 0x18, 0x28, 0x2d, 0x0a, 0x33, + 0x3b, 0x8d, 0xca, 0xb9, 0xa4, 0x2d, 0x6c, 0xf9, 0xbe, 0x00, 0x40, 0x19, 0x55, 0x80, 0x9d, 0xff, + 0x3e, 0xca, 0xa8, 0xb4, 0x2f, 0xa1, 0x43, 0x17, 0x84, 0x2a, 0xb3, 0x70, 0x5e, 0x9d, 0x4b, 0x5f, + 0xc9, 0x4b, 0x93, 0xcd, 0xa0, 0x6b, 0xc3, 0x53, 0x5e, 0x9b, 0xb8, 0xfb, 0xf1, 0x55, 0xb6, 0xbf, + 0x83, 0x7e, 0xdd, 0x7d, 0xec, 0x15, 0x30, 0x85, 0x5a, 0x6f, 0x31, 0x45, 0xa9, 0xeb, 0xee, 0x77, + 0xa8, 0x07, 0x47, 0x47, 0xa7, 0xea, 0xff, 0x6f, 0xa1, 0x5f, 0x5f, 0x6c, 0x65, 0xc2, 0x1e, 0x68, + 0x93, 0x77, 0x15, 0xc2, 0x8f, 0xb4, 0x9f, 0xc3, 0xa0, 0x3e, 0x76, 0x89, 0x9a, 0xbd, 0x06, 0xa8, + 0xc7, 0xc3, 0xb6, 0x8c, 0x3b, 0x1f, 0x55, 0x31, 0xd7, 0x24, 0x6f, 0x40, 0xec, 0x15, 0xf4, 0xaa, + 0x81, 0x2c, 0x0f, 0x7f, 0xe0, 0x85, 0x1a, 0xf1, 0x9f, 0x41, 0xff, 0xcd, 0x1e, 0xa5, 0xfe, 0x09, + 0x0f, 0xca, 0xdc, 0x6b, 0xef, 0xf1, 0x50, 0xf5, 0x26, 0x3d, 0x5f, 0x2f, 0xfe, 0xba, 0x9d, 0x38, + 0x1f, 0x6f, 0x27, 0xce, 0x3f, 0xb7, 0x13, 0xe7, 0xf7, 0xbb, 0xc9, 0xc9, 0xc7, 0xbb, 0xc9, 0xc9, + 0xdf, 0x77, 0x93, 0x93, 0x9f, 0x83, 0x4d, 0xa2, 0xe3, 0xdd, 0x3a, 0x08, 0xb3, 0xd4, 0xfc, 0x7a, + 0xa0, 0x34, 0x57, 0xeb, 0xcd, 0xe1, 0xd7, 0xea, 0x17, 0xc5, 0xde, 0xf1, 0xf9, 0xba, 0x5c, 0xaf, + 0x3b, 0x74, 0xc9, 0x7f, 0xf5, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf7, 0x34, 0x75, 0x40, 0x78, + 0x06, 0x00, 0x00, } func (m *Version) Marshal() (dAtA []byte, err error) { @@ -1154,6 +1201,38 @@ func (m *SequencerSet) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *EventKeys) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *EventKeys) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *EventKeys) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Keys) > 0 { + for iNdEx := len(m.Keys) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Keys[iNdEx]) + copy(dAtA[i:], m.Keys[iNdEx]) + i = encodeVarintDymint(dAtA, i, uint64(len(m.Keys[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func encodeVarintDymint(dAtA []byte, offset int, v uint64) int { offset -= sovDymint(v) base := offset @@ -1380,6 +1459,21 @@ func (m *SequencerSet) Size() (n int) { return n } +func (m *EventKeys) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Keys) > 0 { + for _, b := range m.Keys { + l = len(b) + n += 1 + l + sovDymint(uint64(l)) + } + } + return n +} + func sovDymint(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -2841,6 +2935,88 @@ func (m *SequencerSet) Unmarshal(dAtA []byte) error { } return nil } +func (m *EventKeys) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDymint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: EventKeys: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: EventKeys: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Keys", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDymint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDymint + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthDymint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Keys = append(m.Keys, make([]byte, postIndex-iNdEx)) + copy(m.Keys[len(m.Keys)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDymint(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthDymint + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipDymint(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0