Skip to content

Commit

Permalink
Merge branch 'shawn/optimize-pending-packets-storage' into shawn/thro…
Browse files Browse the repository at this point in the history
…ttle-with-retries-consumer-changes
  • Loading branch information
shaspitz committed Jun 23, 2023
2 parents cf78884 + 394f709 commit da2003b
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 150 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

Add an entry to the unreleased section whenever merging a PR to main that is not targeted at a specific release. These entries will eventually be included in a release.

* (feat!) optimize pending packets storage on consumer, with migration! [#1037](https://github.com/cosmos/interchain-security/pull/1037)

## v3.0.0

Date: June 21st, 2023
Expand Down
16 changes: 16 additions & 0 deletions docs/docs/adrs/adr-008-throttle-retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ title: Throttle with retries
## Changelog

* 6/9/23: Initial draft
* 6/22/23: added note on consumer pending packets storage optimization

## Status

Expand Down Expand Up @@ -46,6 +47,21 @@ With the behavior described, we maintain very similar behavior to the current th

In the normal case, when no or a few slash packets are being sent, the VSCMaturedPackets will not be delayed, and hence unbonding will not be delayed.

### Consumer pending packets storage optimization

In addition to the mentioned consumer changes above. An optimization will need to be made to the consumer's pending packets storage to properly implement the feature from this ADR.

The consumer ccv module previously queued "pending packets" to be sent on each endblocker in [SendPackets](https://github.com/cosmos/interchain-security/blob/3bc4e7135066d848aac60b0787364c07157fd36d/x/ccv/consumer/keeper/relay.go#L178). These packets are queued in state with a protobuf list of `ConsumerPacketData`. For a single append operation, the entire list is deserialized, then a packet is appended to that list, and the list is serialized again. See older version of [AppendPendingPacket](https://github.com/cosmos/interchain-security/blob/05c2dae7c6372b1252b9e97215d07c6aa7618f33/x/ccv/consumer/keeper/keeper.go#L606). That is, a single append operation has O(N) complexity, where N is the size of the list.

This poor append performance isn't a problem when the pending packets list is small. But with this ADR being implemented, the pending packets list could potentially grow to the order of thousands of entries, in the scenario that a slash packet is bouncing.

We can improve the append time for this queue by converting it from a protobuf-esq list, to a queue implemented with sdk-esq code. The idea is to persist an uint64 index that will be incremented each time you queue up a packet. You can think of this as storing the tail of the queue. Then, packet data will be keyed by that index, making the data naturally ordered byte-wite for sdk's iterator. The index will also be stored in the packet data value bytes, so that the index can later be used to delete certain packets from the queue.

Two things are achieved with this approach:

* More efficient packet append/enqueue times
* The ability to delete select packets from the queue (previously all packets were deleted at once)

### Provider changes

The main change needed for the provider is the removal of queuing logic for slash and vsc matured packets upon being received.
Expand Down
7 changes: 6 additions & 1 deletion proto/interchain_security/ccv/v1/ccv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ message SlashPacketData {
// unbonding operations.
message MaturedUnbondingOps { repeated uint64 ids = 1; }

// ConsumerPacketData contains a consumer packet data and a type tag
// ConsumerPacketData contains a consumer packet data, type tag, and index for storage.
message ConsumerPacketData {
ConsumerPacketDataType type = 1;

oneof data {
SlashPacketData slashPacketData = 2;
VSCMaturedPacketData vscMaturedPacketData = 3;
}
uint64 idx = 4;
}


// [Depreciated] favor using []ConsumerPacketData directly, which can be stored more efficiently.
//
// ConsumerPacketDataList is a list of consumer packet data packets.
// It is only used for genesis to ensure backwards compatibility with older versions of ICS.
message ConsumerPacketDataList {
repeated ConsumerPacketData list = 1 [ (gogoproto.nullable) = false ];
}
Expand Down
5 changes: 2 additions & 3 deletions tests/integration/expired_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets := consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(2, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 2, "unexpected number of pending data packets")

// try to send slash packet for downtime infraction
addr := ed25519.GenPrivKey().PubKey().Address()
Expand All @@ -137,7 +137,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(4, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 4, "unexpected number of pending data packets")

// upgrade expired client to the consumer
upgradeExpiredClient(s, Provider)
Expand All @@ -148,7 +148,6 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the list of pending data packets is emptied
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().Empty(consumerPackets)
s.Require().Equal(0, len(consumerPackets.GetList()), "unexpected number of pending data packets")

// relay all packet from consumer to provider
relayAllCommittedPackets(s, s.consumerChain, s.path, ccv.ConsumerPortID, s.path.EndpointA.ChannelID, 4)
Expand Down
24 changes: 12 additions & 12 deletions tests/integration/slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,15 @@ func (suite *CCVTestSuite) TestValidatorDowntime() {

// check that slash packet is queued
pendingPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// verify that the slash packet was sent
gotCommit := consumerIBCKeeper.ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -572,15 +572,15 @@ func (suite *CCVTestSuite) TestValidatorDoubleSigning() {

// check slash packet is queued
pendingPackets := suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// check slash packet is sent
gotCommit := suite.consumerApp.GetIBCKeeper().ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -635,7 +635,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// the downtime slash request duplicates
dataPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 12)
suite.Require().Len(dataPackets, 12)

// save consumer next sequence
seq, _ := consumerIBCKeeper.ChannelKeeper.GetNextSequenceSend(ctx, ccv.ConsumerPortID, channelID)
Expand All @@ -662,7 +662,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// check that pending data packets got cleared
dataPackets = consumerKeeper.GetPendingPackets(ctx)
suite.Require().Empty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 0)
suite.Require().Len(dataPackets, 0)
}

// TestCISBeforeCCVEstablished tests that the consumer chain doesn't panic or
Expand All @@ -673,14 +673,14 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check pending packets is empty
pendingPackets := consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)

consumerKeeper.SlashWithInfractionReason(suite.consumerCtx(), []byte{0x01, 0x02, 0x3},
66, 4324, sdk.MustNewDecFromStr("0.05"), stakingtypes.Infraction_INFRACTION_DOWNTIME)

// Check slash packet was queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// Pass 5 blocks, confirming the consumer doesn't panic
for i := 0; i < 5; i++ {
Expand All @@ -689,7 +689,7 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check packet is still queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// establish ccv channel
suite.SetupCCVChannel(suite.path)
Expand All @@ -698,5 +698,5 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {
// Pass one more block, and confirm the packet is sent now that ccv channel is established
suite.consumerChain.NextBlock()
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)
}
16 changes: 12 additions & 4 deletions x/ccv/consumer/keeper/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ func (k Keeper) InitGenesis(ctx sdk.Context, state *consumertypes.GenesisState)
k.SetLastTransmissionBlockHeight(ctx, state.LastTransmissionBlockHeight)
}

// set pending consumer pending packets
// Set pending consumer packets, using the depreciated ConsumerPacketDataList type
// that exists for genesis.
// note that the list includes pending mature VSC packet only if the handshake is completed
k.AppendPendingPacket(ctx, state.PendingConsumerPackets.List...)
for _, packet := range state.PendingConsumerPackets.List {
k.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// set height to valset update id mapping
for _, h2v := range state.HeightToValsetUpdateId {
Expand Down Expand Up @@ -121,6 +124,11 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
// export the current validator set
valset := k.MustGetCurrentValidatorsAsABCIUpdates(ctx)

// export pending packets using the depreciated ConsumerPacketDataList type
pendingPackets := k.GetPendingPackets(ctx)
pendingPacketsDepreciated := ccv.ConsumerPacketDataList{}
pendingPacketsDepreciated.List = append(pendingPacketsDepreciated.List, pendingPackets...)

// export all the states created after a provider channel got established
if channelID, ok := k.GetProviderChannel(ctx); ok {
clientID, found := k.GetProviderClientID(ctx)
Expand All @@ -135,7 +143,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
k.GetAllPacketMaturityTimes(ctx),
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
k.GetAllOutstandingDowntimes(ctx),
k.GetLastTransmissionBlockHeight(ctx),
params,
Expand All @@ -155,7 +163,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
nil,
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
nil,
consumertypes.LastTransmissionBlockHeight{},
params,
Expand Down
27 changes: 23 additions & 4 deletions x/ccv/consumer/keeper/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ func TestInitGenesis(t *testing.T) {
func(ctx sdk.Context, ck consumerkeeper.Keeper, gs *consumertypes.GenesisState) {
assertConsumerPortIsBound(t, ctx, &ck)

require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))
obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

assertHeightValsetUpdateIDs(t, ctx, &ck, defaultHeightValsetUpdateIDs)
assertProviderClientID(t, ctx, &ck, provClientID)
require.Equal(t, validator.Address.Bytes(), ck.GetAllCCValidator(ctx)[0].Address)
Expand Down Expand Up @@ -186,7 +191,12 @@ func TestInitGenesis(t *testing.T) {
require.Equal(t, provChannelID, gotChannelID)

require.True(t, ck.PacketMaturityTimeExists(ctx, matPackets[0].VscId, matPackets[0].MaturityTime))
require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))

obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

require.Equal(t, gs.OutstandingDowntimeSlashing, ck.GetAllOutstandingDowntimes(ctx))

Expand Down Expand Up @@ -252,12 +262,16 @@ func TestExportGenesis(t *testing.T) {
Data: &ccv.ConsumerPacketData_SlashPacketData{
SlashPacketData: ccv.NewSlashPacketData(abciValidator, vscID, stakingtypes.Infraction_INFRACTION_DOWNTIME),
},
Idx: 0,
},
{
Type: ccv.VscMaturedPacket,
Data: &ccv.ConsumerPacketData_VscMaturedPacketData{
VscMaturedPacketData: ccv.NewVSCMaturedPacketData(vscID),
},
// This idx is a part of the expected genesis state.
// If the keeper is correctly storing consumer packet data, indexes should be populated.
Idx: 1,
},
},
}
Expand Down Expand Up @@ -291,7 +305,10 @@ func TestExportGenesis(t *testing.T) {
ck.SetCCValidator(ctx, cVal)
ck.SetParams(ctx, params)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

ck.SetHeightValsetUpdateID(ctx, defaultHeightValsetUpdateIDs[0].Height, defaultHeightValsetUpdateIDs[0].ValsetUpdateId)
},
consumertypes.NewRestartGenesisState(
Expand Down Expand Up @@ -321,7 +338,9 @@ func TestExportGenesis(t *testing.T) {
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[0].Height, updatedHeightValsetUpdateIDs[0].ValsetUpdateId)
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[1].Height, updatedHeightValsetUpdateIDs[1].ValsetUpdateId)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// populate the required states for an established CCV channel
ck.SetPacketMaturityTime(ctx, matPackets[0].VscId, matPackets[0].MaturityTime)
Expand Down
Loading

0 comments on commit da2003b

Please sign in to comment.