Skip to content

Commit

Permalink
fix!: add ConsumeIdsFromTimeQueue to handle consumers at a given time (
Browse files Browse the repository at this point in the history
…#2236)

* replace GetConsumersReadyToLaunch with ConsumeConsumersReadyToLaunch

* add ConsumeIdsFromTimeQueue for both launch and remove

* Update x/ccv/provider/keeper/consumer_lifecycle.go

Co-authored-by: Simon Noetzlin <[email protected]>

---------

Co-authored-by: Simon Noetzlin <[email protected]>
  • Loading branch information
mpoke and sainoe committed Sep 9, 2024
1 parent 5b6252c commit 50bbbfe
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 187 deletions.
211 changes: 95 additions & 116 deletions x/ccv/provider/keeper/consumer_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,19 @@ func (k Keeper) InitializeConsumer(ctx sdk.Context, consumerId string) (time.Tim
}

// BeginBlockLaunchConsumers launches initialized consumers that are ready to launch
func (k Keeper) BeginBlockLaunchConsumers(ctx sdk.Context) {
// TODO (PERMISSIONLESS): we can parameterize the limit
for _, consumerId := range k.GetConsumersReadyToLaunch(ctx, 200) {
initializationParameters, err := k.GetConsumerInitializationParameters(ctx, consumerId)
if err != nil {
ctx.Logger().Error("could not retrieve initialization record",
"consumerId", consumerId,
"error", err)
continue
}
// Remove consumer to prevent re-trying launching this chain.
// We would only try to re-launch this chain if a new `MsgUpdateConsumer` message is sent.
err = k.RemoveConsumerToBeLaunched(ctx, consumerId, initializationParameters.SpawnTime)
if err != nil {
ctx.Logger().Error("could not remove consumer from to-be-launched queue",
"consumerId", consumerId,
"error", err)
continue
}

func (k Keeper) BeginBlockLaunchConsumers(ctx sdk.Context) error {
consumerIds, err := k.ConsumeIdsFromTimeQueue(
ctx,
types.SpawnTimeToConsumerIdsKeyPrefix(),
k.GetConsumersToBeLaunched,
k.DeleteAllConsumersToBeLaunched,
k.AppendConsumerToBeLaunched,
200,
)
if err != nil {
return errorsmod.Wrapf(ccv.ErrInvalidConsumerState, "getting consumers ready to laumch: %s", err.Error())
}
for _, consumerId := range consumerIds {
cachedCtx, writeFn := ctx.CacheContext()
err = k.LaunchConsumer(cachedCtx, consumerId)
if err != nil {
Expand All @@ -94,49 +87,77 @@ func (k Keeper) BeginBlockLaunchConsumers(ctx sdk.Context) {
ctx.EventManager().EmitEvents(cachedCtx.EventManager().Events())
writeFn()
}
return nil
}

// GetConsumersReadyToLaunch returns the consumer ids of the pending initialized consumer chains
// that are ready to launch, i.e., consumer clients to be created.
func (k Keeper) GetConsumersReadyToLaunch(ctx sdk.Context, limit uint32) []string {
// ConsumeIdsFromTimeQueue returns from a time queue the consumer ids for which the associated time passed.
// The number of ids return is limited to 'limit'. The ids returned are removed from the time queue.
func (k Keeper) ConsumeIdsFromTimeQueue(
ctx sdk.Context,
timeQueueKeyPrefix byte,
getIds func(sdk.Context, time.Time) (types.ConsumerIds, error),
deleteAllIds func(sdk.Context, time.Time),
appendId func(sdk.Context, string, time.Time) error,
limit int,
) ([]string, error) {
store := ctx.KVStore(k.storeKey)

spawnTimeToConsumerIdsKeyPrefix := types.SpawnTimeToConsumerIdsKeyPrefix()
iterator := storetypes.KVStorePrefixIterator(store, []byte{spawnTimeToConsumerIdsKeyPrefix})
defer iterator.Close()

result := []string{}
nextTime := []string{}
timestampsToDelete := []time.Time{}

iterator := storetypes.KVStorePrefixIterator(store, []byte{timeQueueKeyPrefix})
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
spawnTime, err := types.ParseTime(spawnTimeToConsumerIdsKeyPrefix, iterator.Key())
if len(result) >= limit {
break
}
ts, err := types.ParseTime(timeQueueKeyPrefix, iterator.Key())
if err != nil {
k.Logger(ctx).Error("failed to parse spawn time",
"error", err)
continue
return result, fmt.Errorf("parsing removal time: %w", err)
}
if spawnTime.After(ctx.BlockTime()) {
return result
if ts.After(ctx.BlockTime()) {
break
}

// if current block time is equal to or after spawnTime, and the chain is initialized, we can launch the chain
consumerIds, err := k.GetConsumersToBeLaunched(ctx, spawnTime)
consumerIds, err := getIds(ctx, ts)
if err != nil {
k.Logger(ctx).Error("failed to retrieve consumers to launch",
"spawn time", spawnTime,
"error", err)
continue
return result,
fmt.Errorf("getting consumers ids, ts(%s): %w", ts.String(), err)
}
if len(result)+len(consumerIds.Ids) >= int(limit) {
remainingConsumerIds := len(result) + len(consumerIds.Ids) - int(limit)
if len(consumerIds.Ids[:len(consumerIds.Ids)-remainingConsumerIds]) == 0 {
return result
}
return append(result, consumerIds.Ids[:len(consumerIds.Ids)-remainingConsumerIds]...)
} else {

timestampsToDelete = append(timestampsToDelete, ts)

availableSlots := limit - len(result)
if availableSlots >= len(consumerIds.Ids) {
// consumer all the ids
result = append(result, consumerIds.Ids...)
} else {
// consume only availableSlots
result = append(result, consumerIds.Ids[:availableSlots]...)
// and leave the others for next time
nextTime = consumerIds.Ids[availableSlots:]
break
}
}

return result
// remove consumers to prevent handling them twice
for i, ts := range timestampsToDelete {
deleteAllIds(ctx, ts)
if i == len(timestampsToDelete)-1 {
// for the last ts consumed, store back the ids for later
for _, consumerId := range nextTime {
err := appendId(ctx, consumerId, ts)
if err != nil {
return result,
fmt.Errorf("failed to append consumer id, consumerId(%s), ts(%s): %w",
consumerId, ts.String(), err)
}
}
}
}

return result, nil
}

// LaunchConsumer launches the chain with the provided consumer id by creating the consumer client and the respective
Expand Down Expand Up @@ -379,26 +400,19 @@ func (k Keeper) StopAndPrepareForConsumerRemoval(ctx sdk.Context, consumerId str
}

// BeginBlockRemoveConsumers iterates over the pending consumer proposals and stop/removes the chain if the removal time has passed
func (k Keeper) BeginBlockRemoveConsumers(ctx sdk.Context) {
// TODO (PERMISSIONLESS): parameterize the limit
for _, consumerId := range k.GetConsumersReadyToStop(ctx, 200) {
removalTime, err := k.GetConsumerRemovalTime(ctx, consumerId)
if err != nil {
k.Logger(ctx).Error("chain could not be stopped",
"consumerId", consumerId,
"error", err.Error())
continue
}

// Remove consumer to prevent re-trying removing this chain.
err = k.RemoveConsumerToBeRemoved(ctx, consumerId, removalTime)
if err != nil {
ctx.Logger().Error("could not remove consumer from to-be-removed queue",
"consumerId", consumerId,
"error", err)
continue
}

func (k Keeper) BeginBlockRemoveConsumers(ctx sdk.Context) error {
consumerIds, err := k.ConsumeIdsFromTimeQueue(
ctx,
types.RemovalTimeToConsumerIdsKeyPrefix(),
k.GetConsumersToBeRemoved,
k.DeleteAllConsumersToBeRemoved,
k.AppendConsumerToBeRemoved,
200,
)
if err != nil {
return errorsmod.Wrapf(ccv.ErrInvalidConsumerState, "getting consumers ready to stop: %s", err.Error())
}
for _, consumerId := range consumerIds {
// delete consumer chain in a cached context to abort deletion in case of errors
cachedCtx, writeFn := ctx.CacheContext()
err = k.DeleteConsumerChain(cachedCtx, consumerId)
Expand All @@ -409,58 +423,11 @@ func (k Keeper) BeginBlockRemoveConsumers(ctx sdk.Context) {
continue
}

// The cached context is created with a new EventManager so we merge the event into the original context
// the cached context is created with a new EventManager so we merge the event into the original context
ctx.EventManager().EmitEvents(cachedCtx.EventManager().Events())

writeFn()

k.Logger(ctx).Info("executed consumer deletion",
"consumer id", consumerId,
"removal time", removalTime,
)
}
}

// GetConsumersReadyToStop returns the consumer ids of the pending launched consumer chains
// that are ready to stop
func (k Keeper) GetConsumersReadyToStop(ctx sdk.Context, limit uint32) []string {
store := ctx.KVStore(k.storeKey)

removalTimeToConsumerIdsKeyPrefix := types.RemovalTimeToConsumerIdsKeyPrefix()
iterator := storetypes.KVStorePrefixIterator(store, []byte{removalTimeToConsumerIdsKeyPrefix})
defer iterator.Close()

result := []string{}
for ; iterator.Valid(); iterator.Next() {
removalTime, err := types.ParseTime(removalTimeToConsumerIdsKeyPrefix, iterator.Key())
if err != nil {
k.Logger(ctx).Error("failed to parse removal time",
"error", err)
continue
}
if removalTime.After(ctx.BlockTime()) {
return result
}

consumers, err := k.GetConsumersToBeRemoved(ctx, removalTime)
if err != nil {
k.Logger(ctx).Error("failed to retrieve consumers to remove",
"removal time", removalTime,
"error", err)
continue
}
if len(result)+len(consumers.Ids) >= int(limit) {
remainingConsumerIds := len(result) + len(consumers.Ids) - int(limit)
if len(consumers.Ids[:len(consumers.Ids)-remainingConsumerIds]) == 0 {
return result
}
return append(result, consumers.Ids[:len(consumers.Ids)-remainingConsumerIds]...)
} else {
result = append(result, consumers.Ids...)
}
}

return result
return nil
}

// DeleteConsumerChain cleans up the state of the given consumer chain
Expand Down Expand Up @@ -657,6 +624,12 @@ func (k Keeper) RemoveConsumerToBeLaunched(ctx sdk.Context, consumerId string, s
return k.removeConsumerIdFromTime(ctx, consumerId, types.SpawnTimeToConsumerIdsKey, spawnTime)
}

// DeleteAllConsumersToBeLaunched deletes all consumer to be launched at this specific spawn time
func (k Keeper) DeleteAllConsumersToBeLaunched(ctx sdk.Context, spawnTime time.Time) {
store := ctx.KVStore(k.storeKey)
store.Delete(types.SpawnTimeToConsumerIdsKey(spawnTime))
}

// GetConsumersToBeRemoved returns all the consumer ids of chains stored under this removal time
func (k Keeper) GetConsumersToBeRemoved(ctx sdk.Context, removalTime time.Time) (types.ConsumerIds, error) {
return k.getConsumerIdsBasedOnTime(ctx, types.RemovalTimeToConsumerIdsKey, removalTime)
Expand All @@ -671,3 +644,9 @@ func (k Keeper) AppendConsumerToBeRemoved(ctx sdk.Context, consumerId string, re
func (k Keeper) RemoveConsumerToBeRemoved(ctx sdk.Context, consumerId string, removalTime time.Time) error {
return k.removeConsumerIdFromTime(ctx, consumerId, types.RemovalTimeToConsumerIdsKey, removalTime)
}

// DeleteAllConsumersToBeRemoved deletes all consumer to be removed at this specific removal time
func (k Keeper) DeleteAllConsumersToBeRemoved(ctx sdk.Context, removalTime time.Time) {
store := ctx.KVStore(k.storeKey)
store.Delete(types.RemovalTimeToConsumerIdsKey(removalTime))
}
Loading

0 comments on commit 50bbbfe

Please sign in to comment.