Skip to content

Commit

Permalink
Merge pull request #5901 from onflow/bastian/improve-migration-schedu…
Browse files Browse the repository at this point in the history
…ling

Improve scheduling in account-based migration
  • Loading branch information
turbolent authored May 16, 2024
2 parents dece53b + db1b024 commit 64d7049
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 104 deletions.
171 changes: 84 additions & 87 deletions cmd/util/ledger/migrations/account_based_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"sync"
"time"

"github.com/rs/zerolog"

"github.com/onflow/cadence/runtime/common"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
Expand Down Expand Up @@ -155,7 +154,7 @@ func MigrateGroupConcurrently(

wg := sync.WaitGroup{}
wg.Add(nWorker)
resultCh := make(chan *migrationResult, accountCount)
resultCh := make(chan migrationDuration, accountCount)
for i := 0; i < nWorker; i++ {
go func() {
defer wg.Done()
Expand All @@ -175,12 +174,10 @@ func MigrateGroupConcurrently(

// This is not an account, but service level keys.
if util.IsServiceLevelAddress(address) {
resultCh <- &migrationResult{
migrationDuration: migrationDuration{
Address: address,
Duration: time.Since(start),
RegisterCount: accountRegisters.Count(),
},
resultCh <- migrationDuration{
Address: address,
Duration: time.Since(start),
RegisterCount: accountRegisters.Count(),
}
continue
}
Expand All @@ -206,12 +203,10 @@ func MigrateGroupConcurrently(
}
}

resultCh <- &migrationResult{
migrationDuration: migrationDuration{
Address: address,
Duration: time.Since(start),
RegisterCount: accountRegisters.Count(),
},
resultCh <- migrationDuration{
Address: address,
Duration: time.Since(start),
RegisterCount: accountRegisters.Count(),
}
}
}
Expand All @@ -220,28 +215,67 @@ func MigrateGroupConcurrently(

go func() {
defer close(jobs)
err := registersByAccount.ForEachAccount(
func(owner string, accountRegisters *registers.AccountRegisters) error {
address, err := common.BytesToAddress([]byte(owner))
if err != nil {
return err
}
job := jobMigrateAccountGroup{
Address: address,
AccountRegisters: accountRegisters,
}

select {
case <-ctx.Done():
return nil
case jobs <- job:
// TODO: maybe adjust, make configurable, or dependent on chain
const keepTopNAccountRegisters = 20
largestAccountRegisters := util.NewTopN[*registers.AccountRegisters](
keepTopNAccountRegisters,
func(a, b *registers.AccountRegisters) bool {
return a.Count() < b.Count()
},
)

allAccountRegisters := make([]*registers.AccountRegisters, accountCount)

smallerAccountRegisterIndex := keepTopNAccountRegisters
err := registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {

// Try to add the account registers to the top N largest account registers.
// If there is an "overflow" element (either the added element, or an existing element),
// add it to the account registers.
// This way we can process the largest account registers first,
// and do not need to sort all account registers.

popped, didPop := largestAccountRegisters.Add(accountRegisters)
if didPop {
allAccountRegisters[smallerAccountRegisterIndex] = popped
smallerAccountRegisterIndex++
}

return nil
},
)
if err != nil {
cancel(fmt.Errorf("failed to schedule jobs for all accounts: %w", err))
cancel(fmt.Errorf("failed to get all account registers: %w", err))
}

// Add the largest account registers to the account registers.
// The elements in the top N largest account registers are returned in reverse order.
for index := largestAccountRegisters.Len() - 1; index >= 0; index-- {
accountRegisters := heap.Pop(largestAccountRegisters).(*registers.AccountRegisters)
allAccountRegisters[index] = accountRegisters
}

for _, accountRegisters := range allAccountRegisters {
owner := accountRegisters.Owner()

address, err := common.BytesToAddress([]byte(owner))
if err != nil {
cancel(fmt.Errorf("failed to convert owner to address: %w", err))
return
}

job := jobMigrateAccountGroup{
Address: address,
AccountRegisters: accountRegisters,
}

select {
case <-ctx.Done():
return
case jobs <- job:
}
}
}()

Expand All @@ -254,14 +288,20 @@ func MigrateGroupConcurrently(
),
)

durations := newMigrationDurations(logTopNDurations)
topDurations := util.NewTopN[migrationDuration](
logTopNDurations,
func(duration migrationDuration, duration2 migrationDuration) bool {
return duration.Duration < duration2.Duration
},
)

accountLoop:
for accountIndex := 0; accountIndex < accountCount; accountIndex++ {
select {
case <-ctx.Done():
break accountLoop
case result := <-resultCh:
durations.Add(result)
case duration := <-resultCh:
topDurations.Add(duration)
logAccount(1)
}
}
Expand All @@ -272,7 +312,7 @@ accountLoop:
wg.Wait()

log.Info().
Array("top_longest_migrations", durations.Array()).
Array("top_longest_migrations", loggableMigrationDurations(topDurations)).
Msgf("Top longest migrations")

err := ctx.Err()
Expand All @@ -293,67 +333,24 @@ type jobMigrateAccountGroup struct {
AccountRegisters *registers.AccountRegisters
}

type migrationResult struct {
migrationDuration
}

type migrationDuration struct {
Address common.Address
Duration time.Duration
RegisterCount int
}

// migrationDurations implements heap methods for the timer results
type migrationDurations struct {
v []migrationDuration

KeepTopN int
}

// newMigrationDurations creates a new migrationDurations which are used to track the
// accounts that took the longest time to migrate.
func newMigrationDurations(keepTopN int) *migrationDurations {
return &migrationDurations{
v: make([]migrationDuration, 0, keepTopN),
KeepTopN: keepTopN,
}
}

func (h *migrationDurations) Len() int { return len(h.v) }
func (h *migrationDurations) Less(i, j int) bool {
return h.v[i].Duration < h.v[j].Duration
}
func (h *migrationDurations) Swap(i, j int) {
h.v[i], h.v[j] = h.v[j], h.v[i]
}
func (h *migrationDurations) Push(x interface{}) {
h.v = append(h.v, x.(migrationDuration))
}
func (h *migrationDurations) Pop() interface{} {
old := h.v
n := len(old)
x := old[n-1]
h.v = old[0 : n-1]
return x
}

func (h *migrationDurations) Array() zerolog.LogArrayMarshaler {
func loggableMigrationDurations(durations *util.TopN[migrationDuration]) zerolog.LogArrayMarshaler {
array := zerolog.Arr()
for _, result := range h.v {
array = array.Str(fmt.Sprintf("%s [registers: %d]: %s",
result.Address.Hex(),
result.RegisterCount,
result.Duration.String(),

for index := durations.Len() - 1; index >= 0; index-- {
duration := heap.Pop(durations).(migrationDuration)
array = array.Str(fmt.Sprintf(
"%s [registers: %d]: %s",
duration.Address.Hex(),
duration.RegisterCount,
duration.Duration.String(),
))
}
return array
}

func (h *migrationDurations) Add(result *migrationResult) {
if h.Len() < h.KeepTopN || result.Duration > h.v[0].Duration {
if h.Len() == h.KeepTopN {
heap.Pop(h) // remove the element with the smallest duration
}
heap.Push(h, result.migrationDuration)
}
return array
}
32 changes: 18 additions & 14 deletions cmd/util/ledger/migrations/account_size_filter_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ func NewAccountSizeFilterMigration(
}
}

return registersByAccount.ForEachAccount(func(owner string, accountRegisters *registers.AccountRegisters) error {
if _, ok := exceptions[owner]; ok {
return nil
}

info := payloadCountByAddress[owner]
if info.size <= maxAccountSize {
return nil
}

return accountRegisters.ForEach(func(owner, key string, _ []byte) error {
return accountRegisters.Set(owner, key, nil)
})
})
return registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {
owner := accountRegisters.Owner()

if _, ok := exceptions[owner]; ok {
return nil
}

info := payloadCountByAddress[owner]
if info.size <= maxAccountSize {
return nil
}

return accountRegisters.ForEach(func(owner, key string, _ []byte) error {
return accountRegisters.Set(owner, key, nil)
})
},
)
}
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/util/ledger/util/registers/registers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ func (b *ByAccount) DestructIntoPayloads(nWorker int) []*ledger.Payload {
return payloads
}

func (b *ByAccount) ForEachAccount(f func(owner string, accountRegisters *AccountRegisters) error) error {
for owner, accountRegisters := range b.registers {
err := f(owner, accountRegisters)
func (b *ByAccount) ForEachAccount(f func(accountRegisters *AccountRegisters) error) error {
for _, accountRegisters := range b.registers {
err := f(accountRegisters)
if err != nil {
return err
}
Expand Down
64 changes: 64 additions & 0 deletions cmd/util/ledger/util/topn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package util

import (
"container/heap"
)

// TopN keeps track of the top N elements.
// Use Add to add elements to the list.
type TopN[T any] struct {
Tree []T
N int
IsLess func(T, T) bool
}

func NewTopN[T any](n int, isLess func(T, T) bool) *TopN[T] {
return &TopN[T]{
Tree: make([]T, 0, n),
N: n,
IsLess: isLess,
}
}

func (h *TopN[T]) Len() int {
return len(h.Tree)
}

func (h *TopN[T]) Less(i, j int) bool {
a := h.Tree[i]
b := h.Tree[j]
return h.IsLess(a, b)
}

func (h *TopN[T]) Swap(i, j int) {
h.Tree[i], h.Tree[j] =
h.Tree[j], h.Tree[i]
}

func (h *TopN[T]) Push(x interface{}) {
h.Tree = append(h.Tree, x.(T))
}

func (h *TopN[T]) Pop() interface{} {
tree := h.Tree
count := len(tree)
lastIndex := count - 1
last := tree[lastIndex]
var empty T
tree[lastIndex] = empty
h.Tree = tree[0:lastIndex]
return last
}

// Add tries to add a value to the list.
// If the list is full, it will return the smallest value and true.
// If the list is not full, it will return the zero value and false.
func (h *TopN[T]) Add(value T) (popped T, didPop bool) {
heap.Push(h, value)
if h.Len() > h.N {
popped := heap.Pop(h).(T)
return popped, true
}
var empty T
return empty, false
}
Loading

0 comments on commit 64d7049

Please sign in to comment.