From 292b9c176dd9090a8425a0788cbfea63fe48a219 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Fri, 14 Jun 2024 10:09:29 -0500 Subject: [PATCH 1/2] Optimize diff-states cmd to diff accts in parallel --- cmd/util/cmd/diff-states/cmd.go | 294 ++++++++++++++----- cmd/util/cmd/diff-states/diff_states_test.go | 21 +- 2 files changed, 232 insertions(+), 83 deletions(-) diff --git a/cmd/util/cmd/diff-states/cmd.go b/cmd/util/cmd/diff-states/cmd.go index b26a759b7a4..d58fbe2001e 100644 --- a/cmd/util/cmd/diff-states/cmd.go +++ b/cmd/util/cmd/diff-states/cmd.go @@ -2,6 +2,7 @@ package diff_states import ( "bytes" + "context" "encoding/hex" "encoding/json" "errors" @@ -17,6 +18,7 @@ import ( "github.com/onflow/flow-go/cmd/util/ledger/util/registers" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" + moduleUtil "github.com/onflow/flow-go/module/util" ) var ( @@ -40,6 +42,13 @@ var Cmd = &cobra.Command{ const ReporterName = "state-diff" +type state uint8 + +const ( + oldState state = 1 + newState state = 2 +) + func init() { // Input 1 @@ -177,7 +186,10 @@ func run(*cobra.Command, []string) { } } - diff(registers1, registers2, chainID, rw) + err := diff(registers1, registers2, chainID, rw, flagNWorker) + if err != nil { + log.Warn().Err(err).Msgf("failed to diff registers") + } } func loadPayloads() (payloads1, payloads2 []*ledger.Payload) { @@ -266,108 +278,248 @@ func payloadsToRegisters(payloads1, payloads2 []*ledger.Payload) (registers1, re var accountsDiffer = errors.New("accounts differ") +func diffAccount( + owner string, + accountRegisters1 *registers.AccountRegisters, + accountRegisters2 *registers.AccountRegisters, + chainID flow.ChainID, + rw reporters.ReportWriter, +) (err error) { + + if accountRegisters1.Count() != accountRegisters2.Count() { + rw.Write(countDiff{ + Owner: owner, + State1: accountRegisters1.Count(), + State2: accountRegisters2.Count(), + }) + } + + err = accountRegisters1.ForEach(func(owner, key string, value1 []byte) error { + var value2 []byte + value2, err = accountRegisters2.Get(owner, key) + if err != nil { + return err + } + + if !bytes.Equal(value1, value2) { + + if flagRaw { + rw.Write(rawDiff{ + Owner: owner, + Key: key, + Value1: value1, + Value2: value2, + }) + } else { + // stop on first difference in accounts + return accountsDiffer + } + } + + return nil + }) + if err != nil { + if flagRaw || !errors.Is(err, accountsDiffer) { + return err + } + + address, err := common.BytesToAddress([]byte(owner)) + if err != nil { + return err + } + + migrations.NewCadenceValueDiffReporter( + address, + chainID, + rw, + true, + flagNWorker, + ).DiffStates( + accountRegisters1, + accountRegisters2, + migrations.AllStorageMapDomains, + ) + } + + return nil +} + func diff( registers1 *registers.ByAccount, registers2 *registers.ByAccount, chainID flow.ChainID, rw reporters.ReportWriter, -) { - log.Info().Msg("Diffing accounts") + nWorkers int, +) error { + log.Info().Msgf("Diffing %d accounts", registers1.AccountCount()) - err := registers1.ForEachAccount(func(accountRegisters1 *registers.AccountRegisters) (err error) { - owner := accountRegisters1.Owner() + if registers1.AccountCount() < nWorkers { + nWorkers = registers1.AccountCount() + } - if !registers2.HasAccountOwner(owner) { - rw.Write(accountMissing{ - Owner: owner, - State: 2, - }) + logAccount := moduleUtil.LogProgress( + log.Logger, + moduleUtil.DefaultLogProgressConfig( + "processing account group", + registers1.AccountCount(), + ), + ) - return nil - } + if nWorkers <= 1 { + foundAccountCountInRegisters2 := 0 - accountRegisters2 := registers2.AccountRegisters(owner) + _ = registers1.ForEachAccount(func(accountRegisters1 *registers.AccountRegisters) (err error) { + owner := accountRegisters1.Owner() - if accountRegisters1.Count() != accountRegisters2.Count() { - rw.Write(countDiff{ - Owner: owner, - State1: accountRegisters1.Count(), - State2: accountRegisters2.Count(), - }) - } + if !registers2.HasAccountOwner(owner) { + rw.Write(accountMissing{ + Owner: owner, + State: int(newState), + }) + + return nil + } - err = accountRegisters1.ForEach(func(owner, key string, value1 []byte) error { - var value2 []byte - value2, err = accountRegisters2.Get(owner, key) + foundAccountCountInRegisters2++ + + accountRegisters2 := registers2.AccountRegisters(owner) + + err = diffAccount( + owner, + accountRegisters1, + accountRegisters2, + chainID, + rw, + ) if err != nil { - return err + log.Warn().Err(err).Msgf("failed to diff account %x", []byte(owner)) } - if !bytes.Equal(value1, value2) { + logAccount(1) - if flagRaw { - rw.Write(rawDiff{ - Owner: owner, - Key: key, - Value1: value1, - Value2: value2, + return nil + }) + + if foundAccountCountInRegisters2 < registers2.AccountCount() { + _ = registers2.ForEachAccount(func(accountRegisters2 *registers.AccountRegisters) error { + owner := accountRegisters2.Owner() + if !registers1.HasAccountOwner(owner) { + rw.Write(accountMissing{ + Owner: owner, + State: int(oldState), }) - } else { - // stop on first difference in accounts - return accountsDiffer } - } + return nil + }) + } + + return nil + } + + type job struct { + owner string + accountRegisters1 *registers.AccountRegisters + accountRegisters2 *registers.AccountRegisters + } + + type result struct { + owner string + err error + } + + jobs := make(chan job, nWorkers) + results := make(chan result, nWorkers) + + g, ctx := errgroup.WithContext(context.Background()) + + // Launch goroutines to diff accounts + for i := 0; i < nWorkers; i++ { + g.Go(func() (err error) { + for job := range jobs { + err := diffAccount( + job.owner, + job.accountRegisters1, + job.accountRegisters2, + chainID, + rw, + ) + + select { + case results <- result{owner: job.owner, err: err}: + case <-ctx.Done(): + return ctx.Err() + } + } return nil }) - if err != nil { - if flagRaw || !errors.Is(err, accountsDiffer) { - return err - } + } - address, err := common.BytesToAddress([]byte(owner)) - if err != nil { - return err + // Launch goroutine to wait for workers and close result channel + go func() { + _ = g.Wait() + close(results) + }() + + // Launch goroutine to send account registers to jobs channel + go func() { + defer close(jobs) + + foundAccountCountInRegisters2 := 0 + + _ = registers1.ForEachAccount(func(accountRegisters1 *registers.AccountRegisters) (err error) { + owner := accountRegisters1.Owner() + if !registers2.HasAccountOwner(owner) { + rw.Write(accountMissing{ + Owner: owner, + State: int(newState), + }) + + return nil } - migrations.NewCadenceValueDiffReporter( - address, - chainID, - rw, - true, - flagNWorker, - ).DiffStates( - accountRegisters1, - accountRegisters2, - migrations.AllStorageMapDomains, - ) - } + foundAccountCountInRegisters2++ - return nil - }) - if err != nil { - log.Fatal().Err(err).Msg("failed to diff") - } + accountRegisters2 := registers2.AccountRegisters(owner) + + jobs <- job{ + owner: owner, + accountRegisters1: accountRegisters1, + accountRegisters2: accountRegisters2, + } - err = registers2.ForEachAccount(func(accountRegisters2 *registers.AccountRegisters) (err error) { - owner := accountRegisters2.Owner() + return nil + }) - if !registers1.HasAccountOwner(owner) { - rw.Write(accountMissing{ - Owner: owner, - State: 1, + if foundAccountCountInRegisters2 < registers2.AccountCount() { + _ = registers2.ForEachAccount(func(accountRegisters2 *registers.AccountRegisters) (err error) { + owner := accountRegisters2.Owner() + if !registers1.HasAccountOwner(owner) { + rw.Write(accountMissing{ + Owner: owner, + State: int(oldState), + }) + } + return nil }) - return nil } + }() - return nil - }) - if err != nil { - log.Fatal().Err(err).Msg("failed to diff") + // Gather results + for result := range results { + logAccount(1) + if result.err != nil { + log.Warn().Err(result.err).Msgf("failed to diff account %x", []byte(result.owner)) + } } - log.Info().Msg("Finished diffing accounts") + log.Info().Msgf("Finished diffing accounts, waiting for goroutines...") + + if err := g.Wait(); err != nil { + return err + } + return nil } type rawDiff struct { diff --git a/cmd/util/cmd/diff-states/diff_states_test.go b/cmd/util/cmd/diff-states/diff_states_test.go index 01c238ca3ab..fe6041dec5d 100644 --- a/cmd/util/cmd/diff-states/diff_states_test.go +++ b/cmd/util/cmd/diff-states/diff_states_test.go @@ -1,6 +1,7 @@ package diff_states import ( + "encoding/json" "io/fs" "path/filepath" "strings" @@ -82,18 +83,14 @@ func TestDiffStates(t *testing.T) { report, err := io.ReadFile(reportPath) require.NoError(t, err) - assert.JSONEq( - t, - ` - [ - {"kind":"raw-diff", "owner":"0100000000000000", "key":"62", "value1":"03", "value2":"05"}, - {"kind":"account-missing", "owner":"0200000000000000", "state":2}, - {"kind":"account-missing", "owner":"0300000000000000", "state":1}, - {"kind":"account-missing", "owner":"0400000000000000", "state":1} - ] - `, - string(report), - ) + var msgs []any + err = json.Unmarshal(report, &msgs) + require.NoError(t, err) + assert.Equal(t, 4, len(msgs)) + assert.Containsf(t, string(report), `{"kind":"raw-diff","owner":"0100000000000000","key":"62","value1":"03","value2":"05"}`, "diff report contains raw-diff") + assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0200000000000000","state":2}`, "diff report contains account-missing for 0200000000000000") + assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0300000000000000","state":1}`, "diff report contains account-missing for 0300000000000000") + assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0400000000000000","state":1}`, "diff report contains account-missing for 0400000000000000") }) } From 10708aa1d3437112d0f79b22f9d346da6d085da9 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Fri, 14 Jun 2024 10:28:32 -0500 Subject: [PATCH 2/2] Optimize diff-states cmd to diff domains in parallel --- .../ledger/migrations/cadence_value_diff.go | 145 ++++++++++++++---- 1 file changed, 114 insertions(+), 31 deletions(-) diff --git a/cmd/util/ledger/migrations/cadence_value_diff.go b/cmd/util/ledger/migrations/cadence_value_diff.go index a12dcd7126a..90d5172277d 100644 --- a/cmd/util/ledger/migrations/cadence_value_diff.go +++ b/cmd/util/ledger/migrations/cadence_value_diff.go @@ -3,8 +3,10 @@ package migrations import ( "fmt" + "github.com/onflow/cadence/runtime" "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" + "golang.org/x/sync/errgroup" "github.com/onflow/flow-go/cmd/util/ledger/reporters" "github.com/onflow/flow-go/cmd/util/ledger/util" @@ -72,6 +74,8 @@ type difference struct { NewValueStaticType string `json:",omitempty"` } +const minLargeAccountRegisterCount = 1_000 + type CadenceValueDiffReporter struct { address common.Address chainID flow.ChainID @@ -96,81 +100,130 @@ func NewCadenceValueDiffReporter( } } -func (dr *CadenceValueDiffReporter) newStorageRuntime( - registers registers.Registers, -) ( - *InterpreterMigrationRuntime, - error, -) { - // TODO: maybe make read-only again - runtimeInterface, err := NewInterpreterMigrationRuntime( - registers, - dr.chainID, - InterpreterMigrationRuntimeConfig{}, - ) - if err != nil { - return nil, err - } +func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Registers, domains []string) { - return runtimeInterface, nil -} + oldStorage := newReadonlyStorage(oldRegs) -func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Registers, domains []string) { + newStorage := newReadonlyStorage(newRegs) - // Create all the runtime components we need for comparing Cadence values. - oldRuntime, err := dr.newStorageRuntime(oldRegs) + var loadAtreeStorageGroup errgroup.Group + + loadAtreeStorageGroup.Go(func() (err error) { + return loadAtreeSlabsInStorage(oldStorage, oldRegs, dr.nWorkers) + }) + + err := loadAtreeSlabsInStorage(newStorage, newRegs, dr.nWorkers) if err != nil { dr.reportWriter.Write( diffError{ Address: dr.address.Hex(), Kind: diffErrorKindString[abortErrorKind], - Msg: fmt.Sprintf("failed to create runtime for old registers: %s", err), + Msg: fmt.Sprintf("failed to preload new atree registers: %s", err), }) return } - newRuntime, err := dr.newStorageRuntime(newRegs) - if err != nil { + // Wait for old registers to be loaded in storage. + if err := loadAtreeStorageGroup.Wait(); err != nil { dr.reportWriter.Write( diffError{ Address: dr.address.Hex(), Kind: diffErrorKindString[abortErrorKind], - Msg: fmt.Sprintf("failed to create runtime with new registers: %s", err), + Msg: fmt.Sprintf("failed to preload old atree registers: %s", err), }) return } - err = loadAtreeSlabsInStorage(oldRuntime.Storage, oldRegs, dr.nWorkers) + if oldRegs.Count() > minLargeAccountRegisterCount { + // Add concurrency to diff domains + var g errgroup.Group + + // NOTE: preload storage map in the same goroutine + for _, domain := range domains { + _ = oldStorage.GetStorageMap(dr.address, domain, false) + _ = newStorage.GetStorageMap(dr.address, domain, false) + } + + // Create goroutine to diff storage domain + g.Go(func() (err error) { + oldRuntime, err := newReadonlyStorageRuntimeWithStorage(oldStorage, oldRegs.Count()) + if err != nil { + return fmt.Errorf("failed to create runtime for old registers: %s", err) + } + + newRuntime, err := newReadonlyStorageRuntimeWithStorage(newStorage, newRegs.Count()) + if err != nil { + return fmt.Errorf("failed to create runtime for new registers: %s", err) + } + + dr.diffStorageDomain(oldRuntime, newRuntime, common.PathDomainStorage.Identifier()) + return nil + }) + + // Create goroutine to diff other domains + g.Go(func() (err error) { + oldRuntime, err := newReadonlyStorageRuntimeWithStorage(oldStorage, oldRegs.Count()) + if err != nil { + return fmt.Errorf("failed to create runtime for old registers: %s", err) + } + + newRuntime, err := newReadonlyStorageRuntimeWithStorage(newStorage, oldRegs.Count()) + if err != nil { + return fmt.Errorf("failed to create runtime for new registers: %s", err) + } + + for _, domain := range domains { + if domain != common.PathDomainStorage.Identifier() { + dr.diffStorageDomain(oldRuntime, newRuntime, domain) + } + } + return nil + }) + + err = g.Wait() + if err != nil { + dr.reportWriter.Write( + diffError{ + Address: dr.address.Hex(), + Kind: diffErrorKindString[abortErrorKind], + Msg: err.Error(), + }) + } + + return + } + + // Skip goroutine overhead for smaller accounts + oldRuntime, err := newReadonlyStorageRuntimeWithStorage(oldStorage, oldRegs.Count()) if err != nil { dr.reportWriter.Write( diffError{ Address: dr.address.Hex(), Kind: diffErrorKindString[abortErrorKind], - Msg: fmt.Sprintf("failed to preload old atree registers: %s", err), + Msg: fmt.Sprintf("failed to create runtime for old registers: %s", err), }) return } - err = loadAtreeSlabsInStorage(newRuntime.Storage, newRegs, dr.nWorkers) + newRuntime, err := newReadonlyStorageRuntimeWithStorage(newStorage, newRegs.Count()) if err != nil { dr.reportWriter.Write( diffError{ Address: dr.address.Hex(), Kind: diffErrorKindString[abortErrorKind], - Msg: fmt.Sprintf("failed to preload new atree registers: %s", err), + Msg: fmt.Sprintf("failed to create runtime with new registers: %s", err), }) return } - // Iterate through all domains and compare cadence values. for _, domain := range domains { dr.diffStorageDomain(oldRuntime, newRuntime, domain) } } func (dr *CadenceValueDiffReporter) diffStorageDomain( - oldRuntime *InterpreterMigrationRuntime, - newRuntime *InterpreterMigrationRuntime, + oldRuntime *readonlyStorageRuntime, + newRuntime *readonlyStorageRuntime, domain string, ) { defer func() { @@ -966,3 +1019,33 @@ func min(a, b int) int { } return b } + +func newReadonlyStorage(regs registers.Registers) *runtime.Storage { + ledger := ®isters.ReadOnlyLedger{Registers: regs} + return runtime.NewStorage(ledger, nil) +} + +type readonlyStorageRuntime struct { + Interpreter *interpreter.Interpreter + Storage *runtime.Storage + PayloadCount int +} + +func newReadonlyStorageRuntimeWithStorage(storage *runtime.Storage, payloadCount int) (*readonlyStorageRuntime, error) { + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + if err != nil { + return nil, err + } + + return &readonlyStorageRuntime{ + Interpreter: inter, + Storage: storage, + PayloadCount: payloadCount, + }, nil +}