Skip to content

Commit

Permalink
return error from EndBlockVSU
Browse files Browse the repository at this point in the history
  • Loading branch information
mpoke committed Sep 3, 2024
1 parent 1588567 commit e125c89
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 39 deletions.
68 changes: 39 additions & 29 deletions x/ccv/provider/keeper/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,25 @@ func (k Keeper) OnTimeoutPacket(ctx sdk.Context, packet channeltypes.Packet) err
// the Validator Set Update sub-protocol
func (k Keeper) EndBlockVSU(ctx sdk.Context) ([]abci.ValidatorUpdate, error) {
// logic to update the provider consensus validator set.
valUpdates := k.ProviderValidatorUpdates(ctx)
valUpdates, err := k.ProviderValidatorUpdates(ctx)
if err != nil {
return []abci.ValidatorUpdate{}, fmt.Errorf("updating the provider consensus validator set: %w", err)
}

if k.BlocksUntilNextEpoch(ctx) == 0 {
// only queue and send VSCPackets at the boundaries of an epoch

// collect validator updates
k.QueueVSCPackets(ctx)
if err := k.QueueVSCPackets(ctx); err != nil {
return []abci.ValidatorUpdate{}, fmt.Errorf("queueing consumer validator updates: %w", err)
}

// try sending VSC packets to all registered consumer chains;
// if the CCV channel is not established for a consumer chain,
// the updates will remain queued until the channel is established
k.SendVSCPackets(ctx)
if err := k.SendVSCPackets(ctx); err != nil {
return []abci.ValidatorUpdate{}, fmt.Errorf("sending consumer validator updates: %w", err)
}
}

return valUpdates, nil
Expand All @@ -82,17 +89,17 @@ func (k Keeper) EndBlockVSU(ctx sdk.Context) ([]abci.ValidatorUpdate, error) {
// It retrieves the bonded validators from the staking module and creates a `ConsumerValidator` object for each validator.
// The maximum number of validators is determined by the `maxValidators` parameter.
// The function returns the difference between the current validator set and the next validator set as a list of `abci.ValidatorUpdate` objects.
func (k Keeper) ProviderValidatorUpdates(ctx sdk.Context) []abci.ValidatorUpdate {
func (k Keeper) ProviderValidatorUpdates(ctx sdk.Context) ([]abci.ValidatorUpdate, error) {
// get the bonded validators from the staking module
bondedValidators, err := k.stakingKeeper.GetBondedValidatorsByPower(ctx)
if err != nil {
panic(fmt.Errorf("failed to get bonded validators: %w", err))
return []abci.ValidatorUpdate{}, fmt.Errorf("getting bonded validators: %w", err)
}

// get the last validator set sent to consensus
currentValidators, err := k.GetLastProviderConsensusValSet(ctx)
if err != nil {
panic(fmt.Errorf("failed to get last provider consensus validator set: %w", err))
return []abci.ValidatorUpdate{}, fmt.Errorf("getting last provider consensus validator set: %w", err)
}

nextValidators := []types.ConsensusValidator{}
Expand All @@ -104,8 +111,8 @@ func (k Keeper) ProviderValidatorUpdates(ctx sdk.Context) []abci.ValidatorUpdate
for _, val := range bondedValidators[:maxValidators] {
nextValidator, err := k.CreateProviderConsensusValidator(ctx, val)
if err != nil {
k.Logger(ctx).Error("error when creating provider consensus validator", "error", err, "validator", val)
continue
return []abci.ValidatorUpdate{},
fmt.Errorf("creating provider consensus validator(%s): %w", val.OperatorAddress, err)
}
nextValidators = append(nextValidators, nextValidator)
}
Expand All @@ -115,7 +122,7 @@ func (k Keeper) ProviderValidatorUpdates(ctx sdk.Context) []abci.ValidatorUpdate

valUpdates := DiffValidators(currentValidators, nextValidators)

return valUpdates
return valUpdates, nil
}

// BlocksUntilNextEpoch returns the number of blocks until the next epoch starts
Expand All @@ -135,17 +142,20 @@ func (k Keeper) BlocksUntilNextEpoch(ctx sdk.Context) int64 {
// VSC packets to the chains with established CCV channels.
// If the CCV channel is not established for a consumer chain,
// the updates will remain queued until the channel is established
func (k Keeper) SendVSCPackets(ctx sdk.Context) {
func (k Keeper) SendVSCPackets(ctx sdk.Context) error {
for _, chainID := range k.GetAllRegisteredConsumerChainIDs(ctx) {
// check if CCV channel is established and send
if channelID, found := k.GetChainToChannel(ctx, chainID); found {
k.SendVSCPacketsToChain(ctx, chainID, channelID)
if err := k.SendVSCPacketsToChain(ctx, chainID, channelID); err != nil {
return fmt.Errorf("sending VSCPacket to consumer, chainID(%s): %w", chainID, err)
}
}
}
return nil
}

// SendVSCPacketsToChain sends all queued VSC packets to the specified chain
func (k Keeper) SendVSCPacketsToChain(ctx sdk.Context, chainID, channelID string) {
func (k Keeper) SendVSCPacketsToChain(ctx sdk.Context, chainID, channelID string) error {
pendingPackets := k.GetPendingVSCPackets(ctx, chainID)
for _, data := range pendingPackets {
// send packet over IBC
Expand All @@ -164,54 +174,52 @@ func (k Keeper) SendVSCPacketsToChain(ctx sdk.Context, chainID, channelID string
// leave the packet data stored to be sent once the client is upgraded
// the client cannot expire during iteration (in the middle of a block)
k.Logger(ctx).Info("IBC client is expired, cannot send VSC, leaving packet data stored:", "chainID", chainID, "vscid", data.ValsetUpdateId)
return
return nil
}
// Not able to send packet over IBC!
k.Logger(ctx).Error("cannot send VSC, removing consumer:", "chainID", chainID, "vscid", data.ValsetUpdateId, "err", err.Error())
// If this happens, most likely the consumer is malicious; remove it
err := k.StopConsumerChain(ctx, chainID, true)
if err != nil {
panic(fmt.Errorf("consumer chain failed to stop: %w", err))
return fmt.Errorf("stopping consumer, chainID(%s): %w", chainID, err)
}
return
return nil
}
}
k.DeletePendingVSCPackets(ctx, chainID)

return nil
}

// QueueVSCPackets queues latest validator updates for every registered consumer chain
// failing to GetLastBondedValidators will cause a panic in EndBlock

// TODO: decide if this func shouldn't return an error to be propagated to BeginBlocker
func (k Keeper) QueueVSCPackets(ctx sdk.Context) {
func (k Keeper) QueueVSCPackets(ctx sdk.Context) error {
valUpdateID := k.GetValidatorSetUpdateId(ctx) // current valset update ID

// get the bonded validators from the staking module
bondedValidators, err := k.GetLastBondedValidators(ctx)
if err != nil {
panic(fmt.Errorf("failed to get last validators: %w", err))
return fmt.Errorf("getting bonded validators: %w", err)
}

// get the provider active validators
activeValidators, err := k.GetLastProviderConsensusActiveValidators(ctx)
if err != nil {
return fmt.Errorf("getting provider active validators: %w", err)
}

for _, chainID := range k.GetAllRegisteredConsumerChainIDs(ctx) {
currentValidators, err := k.GetConsumerValSet(ctx, chainID)
if err != nil {
panic(fmt.Errorf("failed to get consumer validators: %w", err))
return fmt.Errorf("getting consumer validators, chainID(%s): %w", chainID, err)
}
topN, _ := k.GetTopN(ctx, chainID)

if topN > 0 {
// in a Top-N chain, we automatically opt in all validators that belong to the top N
// of the active validators
activeValidators, err := k.GetLastProviderConsensusActiveValidators(ctx)
if err != nil {
// something must be broken in the bonded validators, so we have to panic since there is no realistic way to proceed
panic(fmt.Errorf("failed to get active validators: %w", err))
}

minPower, err := k.ComputeMinPowerInTopN(ctx, activeValidators, topN)
if err != nil {
// we panic, since the only way to proceed would be to opt in all validators, which is not the intended behavior
panic(fmt.Errorf("failed to compute min power to opt in for chain %v: %w", chainID, err))
return fmt.Errorf("computing min power to opt in, chainID(%s): %w", chainID, err)
}

// set the minimal power of validators in the top N in the store
Expand Down Expand Up @@ -239,6 +247,8 @@ func (k Keeper) QueueVSCPackets(ctx sdk.Context) {
}

k.IncrementValidatorSetUpdateId(ctx)

return nil
}

// BeginBlockCIS contains the BeginBlock logic needed for the Consumer Initiated Slashing sub-protocol.
Expand Down
29 changes: 19 additions & 10 deletions x/ccv/provider/keeper/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestQueueVSCPackets(t *testing.T) {
// no-op if tc.packets is empty
pk.AppendPendingVSCPackets(ctx, chainID, tc.packets...)

pk.QueueVSCPackets(ctx)
err := pk.QueueVSCPackets(ctx)
require.NoError(t, err)
pending := pk.GetPendingVSCPackets(ctx, chainID)
require.Len(t, pending, tc.expectedQueueSize, "pending vsc queue mismatch (%v != %v) in case: '%s'", tc.expectedQueueSize, len(pending), tc.name)

Expand Down Expand Up @@ -120,7 +121,8 @@ func TestQueueVSCPacketsDoesNotResetConsumerValidatorsHeights(t *testing.T) {
// validator for the first time after the `QueueVSCPackets` call.
providerKeeper.SetOptedIn(ctx, "chainID", providertypes.NewProviderConsAddress(valBConsAddr))

providerKeeper.QueueVSCPackets(ctx)
err := providerKeeper.QueueVSCPackets(ctx)
require.NoError(t, err)

// the height of consumer validator A should not be modified because A was already a consumer validator
cv, _ := providerKeeper.GetConsumerValidator(ctx, "chainID", providertypes.NewProviderConsAddress(valAConsAddr))
Expand Down Expand Up @@ -501,8 +503,9 @@ func TestSendVSCPacketsToChainFailure(t *testing.T) {
require.NoError(t, err)
providerKeeper.SetConsumerClientId(ctx, "consumerChainID", "clientID")

// No panic should occur, StopConsumerChain should be called
providerKeeper.SendVSCPacketsToChain(ctx, "consumerChainID", "CCVChannelID")
// No error should occur, StopConsumerChain should be called
err = providerKeeper.SendVSCPacketsToChain(ctx, "consumerChainID", "CCVChannelID")
require.NoError(t, err)

// Pending VSC packets should be deleted in StopConsumerChain
require.Empty(t, providerKeeper.GetPendingVSCPackets(ctx, "consumerChainID"))
Expand Down Expand Up @@ -616,24 +619,28 @@ func TestEndBlockVSU(t *testing.T) {

// with block height of 1 we do not expect any queueing of VSC packets
ctx = ctx.WithBlockHeight(1)
providerKeeper.EndBlockVSU(ctx)
_, err := providerKeeper.EndBlockVSU(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(providerKeeper.GetPendingVSCPackets(ctx, chainID)))

// with block height of 5 we do not expect any queueing of VSC packets
ctx = ctx.WithBlockHeight(5)
providerKeeper.EndBlockVSU(ctx)
_, err = providerKeeper.EndBlockVSU(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(providerKeeper.GetPendingVSCPackets(ctx, chainID)))

// with block height of 10 we expect the queueing of one VSC packet
ctx = ctx.WithBlockHeight(10)
providerKeeper.EndBlockVSU(ctx)
_, err = providerKeeper.EndBlockVSU(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(providerKeeper.GetPendingVSCPackets(ctx, chainID)))

// With block height of 15 we expect no additional queueing of a VSC packet.
// Note that the pending VSC packet is still there because `SendVSCPackets` does not send the packet. We
// need to mock channels, etc. for this to work, and it's out of scope for this test.
ctx = ctx.WithBlockHeight(15)
providerKeeper.EndBlockVSU(ctx)
_, err = providerKeeper.EndBlockVSU(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(providerKeeper.GetPendingVSCPackets(ctx, chainID)))
}

Expand Down Expand Up @@ -699,7 +706,8 @@ func TestProviderValidatorUpdates(t *testing.T) {
}

// Execute the function
updates := providerKeeper.ProviderValidatorUpdates(ctx)
updates, err := providerKeeper.ProviderValidatorUpdates(ctx)
require.NoError(t, err)

// Assertions
require.ElementsMatch(t, expectedUpdates, updates, "The validator updates should match the expected updates")
Expand Down Expand Up @@ -756,7 +764,8 @@ func TestQueueVSCPacketsWithPowerCapping(t *testing.T) {
params.MaxProviderConsensusValidators = 180
providerKeeper.SetParams(ctx, params)

providerKeeper.QueueVSCPackets(ctx)
err := providerKeeper.QueueVSCPackets(ctx)
require.NoError(t, err)

actualQueuedVSCPackets := providerKeeper.GetPendingVSCPackets(ctx, "chainID")
expectedQueuedVSCPackets := []ccv.ValidatorSetChangePacketData{
Expand Down

0 comments on commit e125c89

Please sign in to comment.