Skip to content

Commit

Permalink
Merge pull request #6097 from onflow/fxamacker/optimize-diff-states-util
Browse files Browse the repository at this point in the history
Optimize diff-states cmd by comparing data in parallel
  • Loading branch information
fxamacker authored Jun 14, 2024
2 parents a8e97c1 + 10708aa commit 4def8fb
Show file tree
Hide file tree
Showing 3 changed files with 346 additions and 114 deletions.
294 changes: 223 additions & 71 deletions cmd/util/cmd/diff-states/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package diff_states

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/model/flow"
moduleUtil "github.com/onflow/flow-go/module/util"
)

var (
Expand All @@ -40,6 +42,13 @@ var Cmd = &cobra.Command{

const ReporterName = "state-diff"

type state uint8

const (
oldState state = 1
newState state = 2
)

func init() {

// Input 1
Expand Down Expand Up @@ -177,7 +186,10 @@ func run(*cobra.Command, []string) {
}
}

diff(registers1, registers2, chainID, rw)
err := diff(registers1, registers2, chainID, rw, flagNWorker)
if err != nil {
log.Warn().Err(err).Msgf("failed to diff registers")
}
}

func loadPayloads() (payloads1, payloads2 []*ledger.Payload) {
Expand Down Expand Up @@ -266,108 +278,248 @@ func payloadsToRegisters(payloads1, payloads2 []*ledger.Payload) (registers1, re

var accountsDiffer = errors.New("accounts differ")

func diffAccount(
owner string,
accountRegisters1 *registers.AccountRegisters,
accountRegisters2 *registers.AccountRegisters,
chainID flow.ChainID,
rw reporters.ReportWriter,
) (err error) {

if accountRegisters1.Count() != accountRegisters2.Count() {
rw.Write(countDiff{
Owner: owner,
State1: accountRegisters1.Count(),
State2: accountRegisters2.Count(),
})
}

err = accountRegisters1.ForEach(func(owner, key string, value1 []byte) error {
var value2 []byte
value2, err = accountRegisters2.Get(owner, key)
if err != nil {
return err
}

if !bytes.Equal(value1, value2) {

if flagRaw {
rw.Write(rawDiff{
Owner: owner,
Key: key,
Value1: value1,
Value2: value2,
})
} else {
// stop on first difference in accounts
return accountsDiffer
}
}

return nil
})
if err != nil {
if flagRaw || !errors.Is(err, accountsDiffer) {
return err
}

address, err := common.BytesToAddress([]byte(owner))
if err != nil {
return err
}

migrations.NewCadenceValueDiffReporter(
address,
chainID,
rw,
true,
flagNWorker,
).DiffStates(
accountRegisters1,
accountRegisters2,
migrations.AllStorageMapDomains,
)
}

return nil
}

func diff(
registers1 *registers.ByAccount,
registers2 *registers.ByAccount,
chainID flow.ChainID,
rw reporters.ReportWriter,
) {
log.Info().Msg("Diffing accounts")
nWorkers int,
) error {
log.Info().Msgf("Diffing %d accounts", registers1.AccountCount())

err := registers1.ForEachAccount(func(accountRegisters1 *registers.AccountRegisters) (err error) {
owner := accountRegisters1.Owner()
if registers1.AccountCount() < nWorkers {
nWorkers = registers1.AccountCount()
}

if !registers2.HasAccountOwner(owner) {
rw.Write(accountMissing{
Owner: owner,
State: 2,
})
logAccount := moduleUtil.LogProgress(
log.Logger,
moduleUtil.DefaultLogProgressConfig(
"processing account group",
registers1.AccountCount(),
),
)

return nil
}
if nWorkers <= 1 {
foundAccountCountInRegisters2 := 0

accountRegisters2 := registers2.AccountRegisters(owner)
_ = registers1.ForEachAccount(func(accountRegisters1 *registers.AccountRegisters) (err error) {
owner := accountRegisters1.Owner()

if accountRegisters1.Count() != accountRegisters2.Count() {
rw.Write(countDiff{
Owner: owner,
State1: accountRegisters1.Count(),
State2: accountRegisters2.Count(),
})
}
if !registers2.HasAccountOwner(owner) {
rw.Write(accountMissing{
Owner: owner,
State: int(newState),
})

return nil
}

err = accountRegisters1.ForEach(func(owner, key string, value1 []byte) error {
var value2 []byte
value2, err = accountRegisters2.Get(owner, key)
foundAccountCountInRegisters2++

accountRegisters2 := registers2.AccountRegisters(owner)

err = diffAccount(
owner,
accountRegisters1,
accountRegisters2,
chainID,
rw,
)
if err != nil {
return err
log.Warn().Err(err).Msgf("failed to diff account %x", []byte(owner))
}

if !bytes.Equal(value1, value2) {
logAccount(1)

if flagRaw {
rw.Write(rawDiff{
Owner: owner,
Key: key,
Value1: value1,
Value2: value2,
return nil
})

if foundAccountCountInRegisters2 < registers2.AccountCount() {
_ = registers2.ForEachAccount(func(accountRegisters2 *registers.AccountRegisters) error {
owner := accountRegisters2.Owner()
if !registers1.HasAccountOwner(owner) {
rw.Write(accountMissing{
Owner: owner,
State: int(oldState),
})
} else {
// stop on first difference in accounts
return accountsDiffer
}
}
return nil
})
}

return nil
}

type job struct {
owner string
accountRegisters1 *registers.AccountRegisters
accountRegisters2 *registers.AccountRegisters
}

type result struct {
owner string
err error
}

jobs := make(chan job, nWorkers)

results := make(chan result, nWorkers)

g, ctx := errgroup.WithContext(context.Background())

// Launch goroutines to diff accounts
for i := 0; i < nWorkers; i++ {
g.Go(func() (err error) {
for job := range jobs {
err := diffAccount(
job.owner,
job.accountRegisters1,
job.accountRegisters2,
chainID,
rw,
)

select {
case results <- result{owner: job.owner, err: err}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
if err != nil {
if flagRaw || !errors.Is(err, accountsDiffer) {
return err
}
}

address, err := common.BytesToAddress([]byte(owner))
if err != nil {
return err
// Launch goroutine to wait for workers and close result channel
go func() {
_ = g.Wait()
close(results)
}()

// Launch goroutine to send account registers to jobs channel
go func() {
defer close(jobs)

foundAccountCountInRegisters2 := 0

_ = registers1.ForEachAccount(func(accountRegisters1 *registers.AccountRegisters) (err error) {
owner := accountRegisters1.Owner()
if !registers2.HasAccountOwner(owner) {
rw.Write(accountMissing{
Owner: owner,
State: int(newState),
})

return nil
}

migrations.NewCadenceValueDiffReporter(
address,
chainID,
rw,
true,
flagNWorker,
).DiffStates(
accountRegisters1,
accountRegisters2,
migrations.AllStorageMapDomains,
)
}
foundAccountCountInRegisters2++

return nil
})
if err != nil {
log.Fatal().Err(err).Msg("failed to diff")
}
accountRegisters2 := registers2.AccountRegisters(owner)

jobs <- job{
owner: owner,
accountRegisters1: accountRegisters1,
accountRegisters2: accountRegisters2,
}

err = registers2.ForEachAccount(func(accountRegisters2 *registers.AccountRegisters) (err error) {
owner := accountRegisters2.Owner()
return nil
})

if !registers1.HasAccountOwner(owner) {
rw.Write(accountMissing{
Owner: owner,
State: 1,
if foundAccountCountInRegisters2 < registers2.AccountCount() {
_ = registers2.ForEachAccount(func(accountRegisters2 *registers.AccountRegisters) (err error) {
owner := accountRegisters2.Owner()
if !registers1.HasAccountOwner(owner) {
rw.Write(accountMissing{
Owner: owner,
State: int(oldState),
})
}
return nil
})
return nil
}
}()

return nil
})
if err != nil {
log.Fatal().Err(err).Msg("failed to diff")
// Gather results
for result := range results {
logAccount(1)
if result.err != nil {
log.Warn().Err(result.err).Msgf("failed to diff account %x", []byte(result.owner))
}
}

log.Info().Msg("Finished diffing accounts")
log.Info().Msgf("Finished diffing accounts, waiting for goroutines...")

if err := g.Wait(); err != nil {
return err
}

return nil
}

type rawDiff struct {
Expand Down
21 changes: 9 additions & 12 deletions cmd/util/cmd/diff-states/diff_states_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package diff_states

import (
"encoding/json"
"io/fs"
"path/filepath"
"strings"
Expand Down Expand Up @@ -82,18 +83,14 @@ func TestDiffStates(t *testing.T) {
report, err := io.ReadFile(reportPath)
require.NoError(t, err)

assert.JSONEq(
t,
`
[
{"kind":"raw-diff", "owner":"0100000000000000", "key":"62", "value1":"03", "value2":"05"},
{"kind":"account-missing", "owner":"0200000000000000", "state":2},
{"kind":"account-missing", "owner":"0300000000000000", "state":1},
{"kind":"account-missing", "owner":"0400000000000000", "state":1}
]
`,
string(report),
)
var msgs []any
err = json.Unmarshal(report, &msgs)
require.NoError(t, err)

assert.Equal(t, 4, len(msgs))
assert.Containsf(t, string(report), `{"kind":"raw-diff","owner":"0100000000000000","key":"62","value1":"03","value2":"05"}`, "diff report contains raw-diff")
assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0200000000000000","state":2}`, "diff report contains account-missing for 0200000000000000")
assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0300000000000000","state":1}`, "diff report contains account-missing for 0300000000000000")
assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0400000000000000","state":1}`, "diff report contains account-missing for 0400000000000000")
})
}
Loading

0 comments on commit 4def8fb

Please sign in to comment.