Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create delegation reverse index over multiple blocks at 1000 items per block #622

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/integration/staking/keeper/determinstic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestGRPCValidatorDelegations(t *testing.T) {
ValidatorAddr: validator.OperatorAddress,
}

testdata.DeterministicIterations(f.ctx, t, req, f.queryClient.ValidatorDelegations, 14475, false)
testdata.DeterministicIterations(f.ctx, t, req, f.queryClient.ValidatorDelegations, 17484, false)
}

func TestGRPCValidatorUnbondingDelegations(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions x/staking/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
abci "github.com/cometbft/cometbft/abci/types"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/staking/types"
)

Expand All @@ -19,5 +20,19 @@
// EndBlocker called at every block, update validator set
func (k *Keeper) EndBlocker(ctx context.Context) ([]abci.ValidatorUpdate, error) {
defer telemetry.ModuleMeasureSince(types.ModuleName, telemetry.Now(), telemetry.MetricKeyEndBlocker)

// TODO: Remove migration code and panic catch in the next upgrade
// Wrap the migration call in a function that can recover from panics
func() {
defer func() {
if r := recover(); r != nil {
k.Logger(sdk.UnwrapSDKContext(ctx)).Error("Panic in MigrateDelegationsByValidatorIndex", "recover", r)
}
}()

// Only migrate 10000 items per block to make the migration as fast as possible
k.MigrateDelegationsByValidatorIndex(sdk.UnwrapSDKContext(ctx), 10000)

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
}()

return k.BlockValidatorUpdates(ctx)
}
7 changes: 7 additions & 0 deletions x/staking/keeper/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"context"
"fmt"
"strings"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -108,6 +109,12 @@ func (k Querier) ValidatorDelegations(ctx context.Context, req *types.QueryValid
pageRes *query.PageResponse
)
pageRes, err = query.Paginate(delStore, req.Pagination, func(delAddr, value []byte) error {
// Check the store to see if there is a value stored under the key
key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
if key != nil {
// Users will never see this error as if there is an error the function defaults to the legacy implementation below
return fmt.Errorf("store migration is not finished, try again later")
}
Comment on lines +112 to +117
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO to remove after v26, so we don't keep code that people a year from now will not remember why it was added.

bz := store.Get(types.GetDelegationKey(delAddr, valAddr))

var delegation types.Delegation
Expand Down
3 changes: 3 additions & 0 deletions x/staking/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

"cosmossdk.io/core/store"
"cosmossdk.io/math"
storetypes "cosmossdk.io/store/types"

Expand Down Expand Up @@ -40,6 +41,7 @@ type KeeperTestSuite struct {
accountKeeper *stakingtestutil.MockAccountKeeper
queryClient stakingtypes.QueryClient
msgServer stakingtypes.MsgServer
storeService store.KVStoreService
}

func (s *KeeperTestSuite) SetupTest() {
Expand Down Expand Up @@ -73,6 +75,7 @@ func (s *KeeperTestSuite) SetupTest() {
s.stakingKeeper = keeper
s.bankKeeper = bankKeeper
s.accountKeeper = accountKeeper
s.storeService = storeService

stakingtypes.RegisterInterfaces(encCfg.InterfaceRegistry)
queryHelper := baseapp.NewQueryServerTestHelper(ctx, encCfg.InterfaceRegistry)
Expand Down
86 changes: 86 additions & 0 deletions x/staking/keeper/validator_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package keeper

import (
"fmt"

"cosmossdk.io/store/prefix"

"github.com/cosmos/cosmos-sdk/runtime"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/staking/types"
)

// MigrateDelegationsByValidatorIndex is a migration that runs over multiple blocks,
// this is necessary as to build the reverse index we need to iterate over a large set
// of delegations.
func (k Keeper) MigrateDelegationsByValidatorIndex(ctx sdk.Context, iterationLimit int) error {
store := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx))
valStore := prefix.NewStore(store, types.DelegationKey)

// Check the store to see if there is a value stored under the key
key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
if key == nil {
return nil
}

// Initialize the counter to 0
iterationCounter := 0

// Start the iterator from the key that is in the store
iterator := valStore.Iterator(key, nil)
defer iterator.Close()

for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()

// Parse the index to use setting the reverse index
del, val, err := ParseDelegationKey(key)
if err != nil {
return err
}

// Set the reverse index in the store
store.Set(types.GetDelegationsByValKey(val, del), []byte{})

iterationCounter++
if iterationCounter >= iterationLimit {
ctx.Logger().Info(fmt.Sprintf("Migrated %d delegations, next key %x", iterationLimit, key))

// Set the key in the store after it has been processed
store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key)
break
}
}

// If the iterator is invalid we have processed the full store
if !iterator.Valid() {
ctx.Logger().Info("Migration completed")
store.Delete(types.NextMigrateDelegationsByValidatorIndexKey)
}

return nil
}

// ParseDelegationKey parses given key and returns delagator, validator address bytes
func ParseDelegationKey(bz []byte) (sdk.AccAddress, sdk.ValAddress, error) {
delAddrLen := bz[0]
bz = bz[1:] // remove the length byte of delegator address.
if len(bz) == 0 {
return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz)
}

del := bz[:int(delAddrLen)]
bz = bz[int(delAddrLen):] // remove the length byte of a delegator address
if len(bz) == 0 {
return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz)
}
Comment on lines +66 to +76
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im confused by the comments here. It looks like we remove the length bye of the delegator address, and then the comments are saying we do it again on the same bz that we already removed the length byte on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is an incorrect comment what happens is:

  1. remove the length prefix
  2. remove the delegator address


bz = bz[1:] // remove the validator address bytes.
if len(bz) == 0 {
return nil, nil, fmt.Errorf("no bytes left to parse validator address: %X", bz)
}

val := bz

return del, val, nil
}
150 changes: 150 additions & 0 deletions x/staking/keeper/validator_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package keeper_test

import (
"cosmossdk.io/core/store"
sdkmath "cosmossdk.io/math"
storetypes "cosmossdk.io/store/types"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/runtime"
"github.com/cosmos/cosmos-sdk/testutil/sims"
sdk "github.com/cosmos/cosmos-sdk/types"
moduletestutil "github.com/cosmos/cosmos-sdk/types/module/testutil"
"github.com/cosmos/cosmos-sdk/x/staking"
"github.com/cosmos/cosmos-sdk/x/staking/types"
)

// TestDelegationsByValidatorMigration tests the multi block migration of the reverse delegation index
func (s *KeeperTestSuite) TestDelegationsByValidatorMigration() {
require := s.Require()
ctx, keeper := s.ctx, s.stakingKeeper
store := s.storeService.OpenKVStore(ctx)
storeInit := runtime.KVStoreAdapter(store)
cdc := moduletestutil.MakeTestEncodingConfig(staking.AppModuleBasic{}).Codec

accAddrs := sims.CreateIncrementalAccounts(15)
valAddrs := sims.ConvertAddrsToValAddrs(accAddrs[0:1])
var addedDels []types.Delegation

// start at 1 as 0 addr is the validator addr
for i := 1; i < 11; i++ {
del1 := types.NewDelegation(accAddrs[i].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
store.Set(types.GetDelegationKey(accAddrs[i], valAddrs[0]), types.MustMarshalDelegation(cdc, del1))
addedDels = append(addedDels, del1)
}

// number of items we migrate per migration
migrationCadence := 6

// set the key in the store, this happens on the original migration
iterator := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey)
for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()
store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key[1:])
break
}

// before migration the state of delegations by val index should be empty
dels := getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(dels), 0)

// run the first round of migrations first 6, 10 in store
err := keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence)
require.NoError(err)

// after migration the state of delegations by val index should not be empty
dels = getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(dels), migrationCadence)
require.NotEqual(len(dels), len(addedDels))

// check that the next value needed from the store is present
next, err := store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
require.NoError(err)
require.NotNil(next)

// delegate to a validator while the migration is in progress
delagationWhileMigrationInProgress := types.NewDelegation(accAddrs[12].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.SetDelegation(ctx, delagationWhileMigrationInProgress)
addedDels = append(addedDels, delagationWhileMigrationInProgress)

// remove a delegation from a validator while the migration is in progress that has been processed
removeDelagationWhileMigrationInProgress := types.NewDelegation(accAddrs[3].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgress)
// index in the array is 2
addedDels = deleteElement(addedDels, 2)

// remove the index on the off chance this happens during the migration
removeDelagationWhileMigrationInProgressNextIndex := types.NewDelegation(accAddrs[6].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgressNextIndex)
// index in the array is 4, as we've removed one item
addedDels = deleteElement(addedDels, 4)

// remove a delegation from a validator while the migration is in progress that has not been processed
removeDelagationWhileMigrationInProgressNotProcessed := types.NewDelegation(accAddrs[10].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100))
keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgressNotProcessed)
// index in the array is 7, as we've removed 2 items
addedDels = deleteElement(addedDels, 7)

// while migrating get state of delegations by val index should be increased by 1
delagationWhileMigrationInProgressCount := getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(delagationWhileMigrationInProgressCount), migrationCadence-1)

// run the second round of migrations
err = keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence)
require.NoError(err)

// after migration the state of delegations by val index equal all delegations
dels = getValDelegations(cdc, store, valAddrs[0])
require.Equal(len(dels), len(addedDels))
require.Equal(dels, addedDels)

// check that the next value needed from the store is empty
next, err = store.Get(types.NextMigrateDelegationsByValidatorIndexKey)
require.NoError(err)
require.Nil(next)

// Iterate over the store by delegation key
delKeyCount := 0
iteratorDel := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey)
for ; iteratorDel.Valid(); iteratorDel.Next() {
delKeyCount++
}

// Iterate over the store by validator key
valKeyCount := 0
iteratorVal := storetypes.KVStorePrefixIterator(storeInit, types.DelegationByValIndexKey)
for ; iteratorVal.Valid(); iteratorVal.Next() {
valKeyCount++
}

// Make sure the store count is the same
require.Equal(valKeyCount, delKeyCount)
}

// deleteElement is a simple helper function to remove items from a slice
func deleteElement(slice []types.Delegation, index int) []types.Delegation {
return append(slice[:index], slice[index+1:]...)
}

// getValidatorDelegations is a helper function to get all delegations using the new v5 staking reverse index
func getValDelegations(cdc codec.Codec, keeperStore store.KVStore, valAddr sdk.ValAddress) []types.Delegation {
var delegations []types.Delegation

store := runtime.KVStoreAdapter(keeperStore)
iterator := storetypes.KVStorePrefixIterator(store, types.GetDelegationsByValPrefixKey(valAddr))

for ; iterator.Valid(); iterator.Next() {
var delegation types.Delegation
valAddr, delAddr, err := types.ParseDelegationsByValKey(iterator.Key())
if err != nil {
panic(err)
}

bz := store.Get(types.GetDelegationKey(delAddr, valAddr))
cdc.MustUnmarshal(bz, &delegation)

delegations = append(delegations, delegation)
}

return delegations
}
7 changes: 4 additions & 3 deletions x/staking/migrations/v5/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (
)

var (
DelegationKey = []byte{0x31} // key for a delegation
HistoricalInfoKey = []byte{0x50} // prefix for the historical info
DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator
DelegationKey = []byte{0x31} // key for a delegation
HistoricalInfoKey = []byte{0x50} // prefix for the historical info
DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator
NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index
czarcas7ic marked this conversation as resolved.
Show resolved Hide resolved
)

// ParseDelegationKey parses given key and returns delagator, validator address bytes
Expand Down
13 changes: 12 additions & 1 deletion x/staking/migrations/v5/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,28 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, cdc codec.BinaryCodec) error {
func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, _ codec.BinaryCodec) error {
iterator := storetypes.KVStorePrefixIterator(store, DelegationKey)
iterationLimit := 1000
iterationCounter := 0

for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()

del, val, err := ParseDelegationKey(key)
if err != nil {
return err
}

store.Set(GetDelegationsByValKey(val, del), []byte{})

iterationCounter++
if iterationCounter >= iterationLimit {
ctx.Logger().Info(fmt.Sprintf("Migrated 1000 delegations, next key %x", key[1:]))
// we set the store to the key sans the DelgationKey as we create a prefix store to iterate per block
store.Set(NextMigrateDelegationsByValidatorIndexKey, key[1:])
break
}
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions x/staking/types/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
ParamsKey = []byte{0x51} // prefix for parameters for module x/staking

DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator

NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index
czarcas7ic marked this conversation as resolved.
Show resolved Hide resolved
)

// UnbondingType defines the type of unbonding operation
Expand Down
Loading