Skip to content

Commit

Permalink
Merge pull request #620 from onflow/janez/start-from-checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolent authored Apr 3, 2024
2 parents b37f493 + 2b55630 commit 3f120d3
Show file tree
Hide file tree
Showing 6 changed files with 550 additions and 61 deletions.
149 changes: 89 additions & 60 deletions README.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cmd/emulator/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type Config struct {
LegacyContractUpgradeEnabled bool `default:"false" flag:"legacy-upgrade" info:"enable Cadence legacy contract upgrade"`
StartBlockHeight uint64 `default:"0" flag:"start-block-height" info:"block height to start the emulator at. only valid when forking Mainnet or Testnet"`
RPCHost string `default:"" flag:"rpc-host" info:"rpc host to query when forking Mainnet or Testnet"`
CheckpointPath string `default:"" flag:"checkpoint-dir" info:"checkpoint directory to load the emulator state from"`
StateHash string `default:"" flag:"state-hash" info:"state hash of the checkpoint to load the emulator state from"`
ComputationReportingEnabled bool `default:"false" flag:"computation-reporting" info:"enable Cadence computation reporting"`
}

Expand Down Expand Up @@ -207,6 +209,8 @@ func Cmd(getServiceKey serviceKeyFunc) *cobra.Command {
LegacyContractUpgradeEnabled: conf.LegacyContractUpgradeEnabled,
StartBlockHeight: conf.StartBlockHeight,
RPCHost: conf.RPCHost,
CheckpointPath: conf.CheckpointPath,
StateHash: conf.StateHash,
ComputationReportingEnabled: conf.ComputationReportingEnabled,
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/rs/zerolog v1.29.0
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
go.uber.org/atomic v1.11.0
go.uber.org/mock v0.4.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
google.golang.org/grpc v1.60.1
Expand Down Expand Up @@ -188,7 +189,6 @@ require (
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
Expand Down
25 changes: 25 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/onflow/flow-emulator/server/debugger"
"github.com/onflow/flow-emulator/server/utils"
"github.com/onflow/flow-emulator/storage"
"github.com/onflow/flow-emulator/storage/checkpoint"
"github.com/onflow/flow-emulator/storage/remote"
"github.com/onflow/flow-emulator/storage/sqlite"
"github.com/onflow/flow-emulator/storage/util"
Expand Down Expand Up @@ -142,6 +143,12 @@ type Config struct {
RPCHost string
// StartBlockHeight is the height at which to start the emulator.
StartBlockHeight uint64
// CheckpointPath is the path to the checkpoint folder to use when starting the network on top of existing state.
// StateHash should be provided as well.
CheckpointPath string
// StateHash is the state hash to use when starting the network on top of existing state.
// CheckpointPath should be provided as well.
StateHash string
// ComputationReportingEnabled enables/disables Cadence computation reporting.
ComputationReportingEnabled bool
}
Expand Down Expand Up @@ -332,6 +339,24 @@ func configureStorage(logger *zerolog.Logger, conf *Config) (storageProvider sto
}
}

if conf.CheckpointPath != "" || conf.StateHash != "" {
if conf.CheckpointPath == "" || conf.StateHash == "" {
return nil, fmt.Errorf("both checkpoint path and state hash should be provided")
}

if conf.Persist {
return nil, fmt.Errorf("you cannot use persist with checkpoint")
}

if storageProvider != nil {
return nil, fmt.Errorf("you cannot define more than one storage")
}
storageProvider, err = checkpoint.New(*logger, conf.CheckpointPath, conf.StateHash, conf.ChainID)
if err != nil {
return nil, err
}
}

if conf.Persist {
if storageProvider != nil {
return nil, fmt.Errorf("you cannot use persist with current configuration")
Expand Down
172 changes: 172 additions & 0 deletions storage/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Flow Emulator
*
* Copyright 2019 Dapper Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package checkpoint

import (
"context"
"encoding/hex"
"fmt"
"math"

"github.com/rs/zerolog"
"go.uber.org/atomic"

"github.com/onflow/flow-go/fvm/storage/snapshot"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"

"github.com/onflow/flow-emulator/storage"
"github.com/onflow/flow-emulator/storage/memstore"
)

// Store is jus a memstore, but the starting state is loaded from a checkpoint folder
// any new blocks exist in memory only and are not persisted to disk.
type Store struct {
// Store is a memstore
// Theoretically this could also be a persistent store
*memstore.Store
}

// New returns a new Store implementation.
func New(
log zerolog.Logger,
path string,
stateCommitment string,
chainID flow.ChainID,
) (*Store, error) {
var err error
stateCommitmentBytes, err := hex.DecodeString(stateCommitment)
if err != nil {
return nil, fmt.Errorf("invalid state commitment hex: %w", err)
}
state, err := flow.ToStateCommitment(stateCommitmentBytes)
if err != nil {
return nil, fmt.Errorf("invalid state commitment: %w", err)
}

store := memstore.New()
snap, err := loadSnapshotFromCheckpoint(log, path, state)
if err != nil {
return nil, fmt.Errorf("failed to load snapshot from checkpoint: %w", err)
}

// pretend this state was the genesis state
genesis := flow.Genesis(chainID)
err = store.CommitBlock(
context.Background(),
*genesis,
nil,
nil,
nil,
snap,
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to commit genesis block: %s", err)
}

return &Store{
Store: store,
}, nil
}

func loadSnapshotFromCheckpoint(
log zerolog.Logger,
dir string,
targetHash flow.StateCommitment,
) (*snapshot.ExecutionSnapshot, error) {
log.Info().Msg("init WAL")

diskWal, err := wal.NewDiskWAL(
log,
nil,
metrics.NewNoopCollector(),
dir,
complete.DefaultCacheSize,
pathfinder.PathByteSize,
wal.SegmentSize,
)
if err != nil {
return nil, fmt.Errorf("cannot create disk WAL: %w", err)
}

log.Info().Msg("init ledger")

led, err := complete.NewLedger(
diskWal,
complete.DefaultCacheSize,
&metrics.NoopCollector{},
log,
complete.DefaultPathFinderVersion)
if err != nil {
return nil, fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}

const (
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)

log.Info().Msg("init compactor")

compactor, err := complete.NewCompactor(led, diskWal, log, complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{})
if err != nil {
return nil, fmt.Errorf("cannot create compactor: %w", err)
}

log.Info().Msgf("waiting for compactor to load checkpoint and WAL")

<-compactor.Ready()

defer func() {
<-led.Done()
<-compactor.Done()
}()

trie, err := led.FindTrieByStateCommit(targetHash)
if err != nil {
return nil, fmt.Errorf("cannot find trie by state commitment: %w", err)
}
payloads := trie.AllPayloads()

writeSet := make(map[flow.RegisterID]flow.RegisterValue, len(payloads))
for _, p := range payloads {
id, value, err := convert.PayloadToRegister(p)
if err != nil {
return nil, fmt.Errorf("cannot convert payload to register: %w", err)
}

writeSet[id] = value
}

log.Info().Msg("snapshot loaded")

// garbage collector should clean up the WAL and the checkpoint

// only the write set is needed for the emulator
return &snapshot.ExecutionSnapshot{
WriteSet: writeSet,
}, err
}

var _ storage.Store = &Store{}
Loading

0 comments on commit 3f120d3

Please sign in to comment.