Skip to content

Commit

Permalink
feat: implement fast node (#75)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <[email protected]>
  • Loading branch information
yutianwu and owen-reorg authored Apr 25, 2024
1 parent af39caa commit 882e17a
Show file tree
Hide file tree
Showing 27 changed files with 521 additions and 103 deletions.
3 changes: 2 additions & 1 deletion build/ci.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"time"

"github.com/cespare/cp"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/signify"
"github.com/ethereum/go-ethereum/internal/build"
Expand Down Expand Up @@ -304,7 +305,7 @@ func doTest(cmdline []string) {
gotest := tc.Go("test")

// CI needs a bit more time for the statetests (default 10m).
gotest.Args = append(gotest.Args, "-timeout=20m")
gotest.Args = append(gotest.Args, "-timeout=50m")

// Enable CKZG backend in CI.
gotest.Args = append(gotest.Args, "-tags=ckzg")
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ var (
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
utils.CachePreimagesFlag,
utils.AllowInsecureNoTriesFlag,
utils.CacheLogSizeFlag,
utils.FDLimitFlag,
utils.CryptoKZGFlag,
Expand Down
72 changes: 71 additions & 1 deletion cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@ import (
"os"
"time"

cli "github.com/urfave/cli/v2"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
cli "github.com/urfave/cli/v2"
)

var (
Expand Down Expand Up @@ -147,6 +151,29 @@ as the backend data source, making this command a lot faster.
The argument is interpreted as block number or hash. If none is provided, the latest
block is used.
`,
},
{
Name: "insecure-prune-all",
Usage: "Prune all trie state data except genesis block, it will break storage for fullnode, only suitable for fast node " +
"who do not need trie storage at all",
ArgsUsage: "<pathOfGenesisFile>",
Action: pruneAllState,
Category: "MISCELLANEOUS COMMANDS",
Flags: []cli.Flag{
utils.DataDirFlag,
utils.AncientFlag,
},
Description: `
will prune all historical trie state data except genesis block.
All trie nodes will be deleted from the database.
It expects the genesis file as argument.
WARNING: It's necessary to delete the trie clean cache after the pruning.
If you specify another directory for the trie clean cache via "--cache.trie.journal"
during the use of Geth, please also specify it here for correct deletion. Otherwise
the trie clean cache with default directory will be deleted.
`,
},
},
Expand Down Expand Up @@ -635,3 +662,46 @@ func checkAccount(ctx *cli.Context) error {
log.Info("Checked the snapshot journalled storage", "time", common.PrettyDuration(time.Since(start)))
return nil
}

func pruneAllState(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()

genesisPath := ctx.Args().First()
if len(genesisPath) == 0 {
utils.Fatalf("Must supply path to genesis JSON file")
}
file, err := os.Open(genesisPath)
if err != nil {
utils.Fatalf("Failed to read genesis file: %v", err)
}
defer file.Close()

g := new(core.Genesis)
if err := json.NewDecoder(file).Decode(g); err != nil {
cfg := gethConfig{
Eth: ethconfig.Defaults,
Node: defaultNodeConfig(),
Metrics: metrics.DefaultConfig,
}

// Load config file.
if err := loadConfig(genesisPath, &cfg); err != nil {
utils.Fatalf("%v", err)
}
g = cfg.Eth.Genesis
}

chaindb := utils.MakeChainDatabase(ctx, stack, false)
defer chaindb.Close()
pruner, err := pruner.NewAllPruner(chaindb)
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
}
if err = pruner.PruneAll(g); err != nil {
log.Error("Failed to prune state", "err", err)
return err
}
return nil
}
15 changes: 12 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"strings"
"time"

pcsclite "github.com/gballet/go-libpcsclite"
gopsutil "github.com/shirou/gopsutil/mem"
"github.com/urfave/cli/v2"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -74,9 +78,6 @@ import (
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
pcsclite "github.com/gballet/go-libpcsclite"
gopsutil "github.com/shirou/gopsutil/mem"
"github.com/urfave/cli/v2"
)

// These are all the command line flags we support.
Expand Down Expand Up @@ -269,6 +270,11 @@ var (
Value: 2048,
Category: flags.EthCategory,
}
AllowInsecureNoTriesFlag = &cli.BoolFlag{
Name: "allow-insecure-no-tries",
Usage: `Disable the tries state root verification, the state consistency is no longer 100% guaranteed. Do not enable it unless you know exactly what the consequence it will cause.`,
Category: flags.EthCategory,
}
OverrideCancun = &cli.Uint64Flag{
Name: "override.cancun",
Usage: "Manually specify the Cancun fork timestamp, overriding the bundled setting",
Expand Down Expand Up @@ -1906,6 +1912,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if ctx.IsSet(AllowInsecureNoTriesFlag.Name) {
cfg.NoTries = ctx.Bool(AllowInsecureNoTriesFlag.Name)
}
if !ctx.Bool(SnapshotFlag.Name) {
// If snap-sync is requested, this flag is also required
if cfg.SyncMode == downloader.SnapSync {
Expand Down
59 changes: 36 additions & 23 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/slices"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock"
Expand All @@ -50,7 +52,6 @@ import (
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -83,13 +84,13 @@ var (
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil)
stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil)
blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil)
stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil)
triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)

blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -152,6 +153,7 @@ type CacheConfig struct {
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk
NoTries bool // Insecure settings. Do not have any tries in databases if enabled.
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathNodeBuffer pathdb.NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer
Expand All @@ -164,7 +166,10 @@ type CacheConfig struct {

// triedbConfig derives the configures for trie database.
func (c *CacheConfig) triedbConfig() *trie.Config {
config := &trie.Config{Preimages: c.Preimages}
config := &trie.Config{
Preimages: c.Preimages,
NoTries: c.NoTries,
}
if c.StateScheme == rawdb.HashScheme {
config.HashDB = &hashdb.Config{
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
Expand Down Expand Up @@ -367,7 +372,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Make sure the state associated with the block is available, or log out
// if there is no available state, waiting for state sync.
head := bc.CurrentBlock()
if !bc.HasState(head.Root) {
if !bc.NoTries() && !bc.HasState(head.Root) {
if head.Number.Uint64() == 0 {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the initial state sync is not finished
Expand Down Expand Up @@ -478,6 +483,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
NoTries: bc.stateCache.NoTries(),
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
}
Expand Down Expand Up @@ -862,7 +868,7 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
return err
}
}
if !bc.HasState(root) {
if !bc.NoTries() && !bc.HasState(root) {
return fmt.Errorf("non existent state [%x..]", root[:4])
}
// If all checks out, manually set the head block.
Expand Down Expand Up @@ -1460,9 +1466,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
blockWriteExternalTimer.UpdateSince(start)
log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())

// Commit all cached state changes into underlying memory database.
start = time.Now()
state.SetExpectedStateRoot(block.Root())
root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return err
Expand All @@ -1477,10 +1484,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we're running an archive node, always flush
start = time.Now()
defer func () {
defer func() {
triedbCommitExternalTimer.UpdateSince(start)
log.Debug("triedbCommitExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())
} ()
}()
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false)
}
Expand Down Expand Up @@ -1785,7 +1792,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
}()

defer func () {
defer func() {
DebugInnerExecutionDuration = 0
}()
for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() {
Expand Down Expand Up @@ -1887,6 +1894,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
}

statedb.SetExpectedStateRoot(block.Root())

// Process block using the parent state as reference point
pstart = time.Now()
receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig)
Expand All @@ -1908,16 +1917,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
proctime := time.Since(start) // processing + validation

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing)
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing)
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation

innerExecutionTimer.Update(DebugInnerExecutionDuration)

Expand Down Expand Up @@ -1959,7 +1968,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
blockGasUsedGauge.Update(int64(block.GasUsed())/1000000)
blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down Expand Up @@ -2675,3 +2684,7 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
return time.Duration(bc.flushInterval.Load())
}

func (bc *BlockChain) NoTries() bool {
return bc.stateCache.NoTries()
}
19 changes: 18 additions & 1 deletion core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"errors"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -278,6 +279,9 @@ func (bc *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {

// HasState checks if state trie is fully present in the database or not.
func (bc *BlockChain) HasState(hash common.Hash) bool {
if bc.NoTries() {
return bc.snaps != nil && bc.snaps.Snapshot(hash) != nil
}
_, err := bc.stateCache.OpenTrie(hash)
return err == nil
}
Expand Down Expand Up @@ -334,7 +338,20 @@ func (bc *BlockChain) State() (*state.StateDB, error) {

// StateAt returns a new mutable state based on a particular point in time.
func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
return state.New(root, bc.stateCache, bc.snaps)
stateDb, err := state.New(root, bc.stateCache, bc.snaps)
if err != nil {
return nil, err
}

// If there's no trie and the specified snapshot is not available, getting
// any state will by default return nil.
// Instead of that, it will be more useful to return an error to indicate
// the state is not available.
if stateDb.NoTrie() && stateDb.GetSnap() == nil {
return nil, errors.New("state is not available")
}

return stateDb, err
}

// Config retrieves the chain's fork configuration.
Expand Down
7 changes: 6 additions & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (ga *GenesisAlloc) hash() (common.Hash, error) {
// states will be persisted into the given database. Also, the genesis state
// specification will be flushed as well.
func (ga *GenesisAlloc) flush(db ethdb.Database, triedb *trie.Database, blockhash common.Hash) error {
triedbConfig := triedb.Config()
if triedbConfig != nil {
triedbConfig.NoTries = false
}

statedb, err := state.New(types.EmptyRootHash, state.NewDatabaseWithNodeDB(db, triedb), nil)
if err != nil {
return err
Expand Down Expand Up @@ -346,7 +351,7 @@ func SetupGenesisBlockWithOverride(db ethdb.Database, triedb *trie.Database, gen
// is initialized with an external ancient store. Commit genesis state
// in this case.
header := rawdb.ReadHeader(db, stored, 0)
if header.Root != types.EmptyRootHash && !triedb.Initialized(header.Root) {
if header.Root != types.EmptyRootHash && !triedb.Initialized(header.Root) && !triedb.Config().NoTries {
if genesis == nil {
genesis = DefaultGenesisBlock()
}
Expand Down
Loading

0 comments on commit 882e17a

Please sign in to comment.