Skip to content

Commit

Permalink
Merge pull request #2245 from OffchainLabs/pebble-extra-options
Browse files Browse the repository at this point in the history
add pebble config options [NIT-2396]
  • Loading branch information
PlasmaPower authored May 23, 2024
2 parents 35bd2aa + a4e6f3e commit d897c4f
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 30 deletions.
3 changes: 2 additions & 1 deletion arbnode/dataposter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter/redis"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/signature"
Expand All @@ -44,7 +45,7 @@ func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.St

func newPebbleDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.Storage {
t.Helper()
db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true)
db, err := rawdb.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions("pebble"))
if err != nil {
t.Fatalf("NewPebbleDBDatabase() unexpected error: %v", err)
}
Expand Down
188 changes: 182 additions & 6 deletions cmd/conf/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@ package conf

import (
"fmt"
"math"
"os"
"path"
"path/filepath"
"runtime"
"time"

"github.com/ethereum/go-ethereum/ethdb/pebble"
flag "github.com/spf13/pflag"
)

type PersistentConfig struct {
GlobalConfig string `koanf:"global-config"`
Chain string `koanf:"chain"`
LogDir string `koanf:"log-dir"`
Handles int `koanf:"handles"`
Ancient string `koanf:"ancient"`
DBEngine string `koanf:"db-engine"`
GlobalConfig string `koanf:"global-config"`
Chain string `koanf:"chain"`
LogDir string `koanf:"log-dir"`
Handles int `koanf:"handles"`
Ancient string `koanf:"ancient"`
DBEngine string `koanf:"db-engine"`
Pebble PebbleConfig `koanf:"pebble"`
}

var PersistentConfigDefault = PersistentConfig{
Expand All @@ -28,6 +33,7 @@ var PersistentConfigDefault = PersistentConfig{
Handles: 512,
Ancient: "",
DBEngine: "leveldb",
Pebble: PebbleConfigDefault,
}

func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -37,6 +43,7 @@ func PersistentConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".handles", PersistentConfigDefault.Handles, "number of file descriptor handles to use for the database")
f.String(prefix+".ancient", PersistentConfigDefault.Ancient, "directory of ancient where the chain freezer can be opened")
f.String(prefix+".db-engine", PersistentConfigDefault.DBEngine, "backing database implementation to use ('leveldb' or 'pebble')")
PebbleConfigAddOptions(prefix+".pebble", f)
}

func (c *PersistentConfig) ResolveDirectoryNames() error {
Expand Down Expand Up @@ -94,5 +101,174 @@ func (c *PersistentConfig) Validate() error {
if c.DBEngine != "leveldb" && c.DBEngine != "pebble" {
return fmt.Errorf(`invalid .db-engine choice: %q, allowed "leveldb" or "pebble"`, c.DBEngine)
}
if c.DBEngine == "pebble" {
if err := c.Pebble.Validate(); err != nil {
return err
}
}
return nil
}

type PebbleConfig struct {
MaxConcurrentCompactions int `koanf:"max-concurrent-compactions"`
Experimental PebbleExperimentalConfig `koanf:"experimental"`
}

var PebbleConfigDefault = PebbleConfig{
MaxConcurrentCompactions: runtime.NumCPU(),
Experimental: PebbleExperimentalConfigDefault,
}

func PebbleConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-concurrent-compactions", PebbleConfigDefault.MaxConcurrentCompactions, "maximum number of concurrent compactions")
PebbleExperimentalConfigAddOptions(prefix+".experimental", f)
}

func (c *PebbleConfig) Validate() error {
if c.MaxConcurrentCompactions < 1 {
return fmt.Errorf("invalid .max-concurrent-compactions value: %d, has to be greater then 0", c.MaxConcurrentCompactions)
}
if err := c.Experimental.Validate(); err != nil {
return err
}
return nil
}

type PebbleExperimentalConfig struct {
BytesPerSync int `koanf:"bytes-per-sync"`
L0CompactionFileThreshold int `koanf:"l0-compaction-file-threshold"`
L0CompactionThreshold int `koanf:"l0-compaction-threshold"`
L0StopWritesThreshold int `koanf:"l0-stop-writes-threshold"`
LBaseMaxBytes int64 `koanf:"l-base-max-bytes"`
MemTableStopWritesThreshold int `koanf:"mem-table-stop-writes-threshold"`
DisableAutomaticCompactions bool `koanf:"disable-automatic-compactions"`
WALBytesPerSync int `koanf:"wal-bytes-per-sync"`
WALDir string `koanf:"wal-dir"`
WALMinSyncInterval int `koanf:"wal-min-sync-interval"`
TargetByteDeletionRate int `koanf:"target-byte-deletion-rate"`

// level specific
BlockSize int `koanf:"block-size"`
IndexBlockSize int `koanf:"index-block-size"`
TargetFileSize int64 `koanf:"target-file-size"`
TargetFileSizeEqualLevels bool `koanf:"target-file-size-equal-levels"`

// pebble experimental
L0CompactionConcurrency int `koanf:"l0-compaction-concurrency"`
CompactionDebtConcurrency uint64 `koanf:"compaction-debt-concurrency"`
ReadCompactionRate int64 `koanf:"read-compaction-rate"`
ReadSamplingMultiplier int64 `koanf:"read-sampling-multiplier"`
MaxWriterConcurrency int `koanf:"max-writer-concurrency"`
ForceWriterParallelism bool `koanf:"force-writer-parallelism"`
}

var PebbleExperimentalConfigDefault = PebbleExperimentalConfig{
BytesPerSync: 512 << 10, // 512 KB
L0CompactionFileThreshold: 500,
L0CompactionThreshold: 4,
L0StopWritesThreshold: 12,
LBaseMaxBytes: 64 << 20, // 64 MB
MemTableStopWritesThreshold: 2,
DisableAutomaticCompactions: false,
WALBytesPerSync: 0, // no background syncing
WALDir: "", // use same dir as for sstables
WALMinSyncInterval: 0, // no artificial delay
TargetByteDeletionRate: 0, // deletion pacing disabled

BlockSize: 4 << 10, // 4 KB
IndexBlockSize: 4 << 10, // 4 KB
TargetFileSize: 2 << 20, // 2 MB
TargetFileSizeEqualLevels: true,

L0CompactionConcurrency: 10,
CompactionDebtConcurrency: 1 << 30, // 1GB
ReadCompactionRate: 16000, // see ReadSamplingMultiplier comment
ReadSamplingMultiplier: -1, // geth default, disables read sampling and disables read triggered compaction
MaxWriterConcurrency: 0,
ForceWriterParallelism: false,
}

func PebbleExperimentalConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".bytes-per-sync", PebbleExperimentalConfigDefault.BytesPerSync, "number of bytes to write to a SSTable before calling Sync on it in the background")
f.Int(prefix+".l0-compaction-file-threshold", PebbleExperimentalConfigDefault.L0CompactionFileThreshold, "count of L0 files necessary to trigger an L0 compaction")
f.Int(prefix+".l0-compaction-threshold", PebbleExperimentalConfigDefault.L0CompactionThreshold, "amount of L0 read-amplification necessary to trigger an L0 compaction")
f.Int(prefix+".l0-stop-writes-threshold", PebbleExperimentalConfigDefault.L0StopWritesThreshold, "hard limit on L0 read-amplification, computed as the number of L0 sublevels. Writes are stopped when this threshold is reached")
f.Int64(prefix+".l-base-max-bytes", PebbleExperimentalConfigDefault.LBaseMaxBytes, "The maximum number of bytes for LBase. The base level is the level which L0 is compacted into. The base level is determined dynamically based on the existing data in the LSM. The maximum number of bytes for other levels is computed dynamically based on the base level's maximum size. When the maximum number of bytes for a level is exceeded, compaction is requested.")
f.Int(prefix+".mem-table-stop-writes-threshold", PebbleExperimentalConfigDefault.MemTableStopWritesThreshold, "hard limit on the number of queued of MemTables")
f.Bool(prefix+".disable-automatic-compactions", PebbleExperimentalConfigDefault.DisableAutomaticCompactions, "disables automatic compactions")
f.Int(prefix+".wal-bytes-per-sync", PebbleExperimentalConfigDefault.WALBytesPerSync, "number of bytes to write to a write-ahead log (WAL) before calling Sync on it in the background")
f.String(prefix+".wal-dir", PebbleExperimentalConfigDefault.WALDir, "absolute path of directory to store write-ahead logs (WALs) in. If empty, WALs will be stored in the same directory as sstables")
f.Int(prefix+".wal-min-sync-interval", PebbleExperimentalConfigDefault.WALMinSyncInterval, "minimum duration in microseconds between syncs of the WAL. If WAL syncs are requested faster than this interval, they will be artificially delayed.")
f.Int(prefix+".target-byte-deletion-rate", PebbleExperimentalConfigDefault.TargetByteDeletionRate, "rate (in bytes per second) at which sstable file deletions are limited to (under normal circumstances).")
f.Int(prefix+".block-size", PebbleExperimentalConfigDefault.BlockSize, "target uncompressed size in bytes of each table block")
f.Int(prefix+".index-block-size", PebbleExperimentalConfigDefault.IndexBlockSize, fmt.Sprintf("target uncompressed size in bytes of each index block. When the index block size is larger than this target, two-level indexes are automatically enabled. Setting this option to a large value (such as %d) disables the automatic creation of two-level indexes.", math.MaxInt32))
f.Int64(prefix+".target-file-size", PebbleExperimentalConfigDefault.TargetFileSize, "target file size for the level 0")
f.Bool(prefix+".target-file-size-equal-levels", PebbleExperimentalConfigDefault.TargetFileSizeEqualLevels, "if true same target-file-size will be uses for all levels, otherwise target size for layer n = 2 * target size for layer n - 1")

f.Int(prefix+".l0-compaction-concurrency", PebbleExperimentalConfigDefault.L0CompactionConcurrency, "threshold of L0 read-amplification at which compaction concurrency is enabled (if compaction-debt-concurrency was not already exceeded). Every multiple of this value enables another concurrent compaction up to max-concurrent-compactions.")
f.Uint64(prefix+".compaction-debt-concurrency", PebbleExperimentalConfigDefault.CompactionDebtConcurrency, "controls the threshold of compaction debt at which additional compaction concurrency slots are added. For every multiple of this value in compaction debt bytes, an additional concurrent compaction is added. This works \"on top\" of l0-compaction-concurrency, so the higher of the count of compaction concurrency slots as determined by the two options is chosen.")
f.Int64(prefix+".read-compaction-rate", PebbleExperimentalConfigDefault.ReadCompactionRate, "controls the frequency of read triggered compactions by adjusting `AllowedSeeks` in manifest.FileMetadata: AllowedSeeks = FileSize / ReadCompactionRate")
f.Int64(prefix+".read-sampling-multiplier", PebbleExperimentalConfigDefault.ReadSamplingMultiplier, "a multiplier for the readSamplingPeriod in iterator.maybeSampleRead() to control the frequency of read sampling to trigger a read triggered compaction. A value of -1 prevents sampling and disables read triggered compactions. Geth default is -1. The pebble default is 1 << 4. which gets multiplied with a constant of 1 << 16 to yield 1 << 20 (1MB).")
f.Int(prefix+".max-writer-concurrency", PebbleExperimentalConfigDefault.MaxWriterConcurrency, "maximum number of compression workers the compression queue is allowed to use. If max-writer-concurrency > 0, then the Writer will use parallelism, to compress and write blocks to disk. Otherwise, the writer will compress and write blocks to disk synchronously.")
f.Bool(prefix+".force-writer-parallelism", PebbleExperimentalConfigDefault.ForceWriterParallelism, "force parallelism in the sstable Writer for the metamorphic tests. Even with the MaxWriterConcurrency option set, pebble only enables parallelism in the sstable Writer if there is enough CPU available, and this option bypasses that.")
}

func (c *PebbleExperimentalConfig) Validate() error {
if !filepath.IsAbs(c.WALDir) {
return fmt.Errorf("invalid .wal-dir directory (%s) - has to be an absolute path", c.WALDir)
}
// TODO
return nil
}

func (c *PebbleConfig) ExtraOptions(namespace string) *pebble.ExtraOptions {
var maxConcurrentCompactions func() int
if c.MaxConcurrentCompactions > 0 {
maxConcurrentCompactions = func() int { return c.MaxConcurrentCompactions }
}
var walMinSyncInterval func() time.Duration
if c.Experimental.WALMinSyncInterval > 0 {
walMinSyncInterval = func() time.Duration {
return time.Microsecond * time.Duration(c.Experimental.WALMinSyncInterval)
}
}
var levels []pebble.ExtraLevelOptions
for i := 0; i < 7; i++ {
targetFileSize := c.Experimental.TargetFileSize
if !c.Experimental.TargetFileSizeEqualLevels {
targetFileSize = targetFileSize << i
}
levels = append(levels, pebble.ExtraLevelOptions{
BlockSize: c.Experimental.BlockSize,
IndexBlockSize: c.Experimental.IndexBlockSize,
TargetFileSize: targetFileSize,
})
}
walDir := c.Experimental.WALDir
if walDir != "" {
walDir = path.Join(walDir, namespace)
}
return &pebble.ExtraOptions{
BytesPerSync: c.Experimental.BytesPerSync,
L0CompactionFileThreshold: c.Experimental.L0CompactionFileThreshold,
L0CompactionThreshold: c.Experimental.L0CompactionThreshold,
L0StopWritesThreshold: c.Experimental.L0StopWritesThreshold,
LBaseMaxBytes: c.Experimental.LBaseMaxBytes,
MemTableStopWritesThreshold: c.Experimental.MemTableStopWritesThreshold,
MaxConcurrentCompactions: maxConcurrentCompactions,
DisableAutomaticCompactions: c.Experimental.DisableAutomaticCompactions,
WALBytesPerSync: c.Experimental.WALBytesPerSync,
WALDir: walDir,
WALMinSyncInterval: walMinSyncInterval,
TargetByteDeletionRate: c.Experimental.TargetByteDeletionRate,
Experimental: pebble.ExtraOptionsExperimental{
L0CompactionConcurrency: c.Experimental.L0CompactionConcurrency,
CompactionDebtConcurrency: c.Experimental.CompactionDebtConcurrency,
ReadCompactionRate: c.Experimental.ReadCompactionRate,
ReadSamplingMultiplier: c.Experimental.ReadSamplingMultiplier,
MaxWriterConcurrency: c.Experimental.MaxWriterConcurrency,
ForceWriterParallelism: c.Experimental.ForceWriterParallelism,
},
Levels: levels,
}
}
12 changes: 6 additions & 6 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ func validateBlockChain(blockChain *core.BlockChain, chainConfig *params.ChainCo
return nil
}

func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) {
func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) {
if !config.Init.Force {
if readOnlyDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", 0, 0, "", "l2chaindata/", true); err == nil {
if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil {
if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil {
readOnlyDb.Close()
if !arbmath.BigEquals(chainConfig.ChainID, chainId) {
return nil, nil, fmt.Errorf("database has chain ID %v but config has chain ID %v (are you sure this database is for the right chain?)", chainConfig.ChainID, chainId)
}
chainData, err := stack.OpenDatabaseWithFreezer("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false)
chainData, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions("l2chaindata"))
if err != nil {
return nil, nil, err
}
Expand All @@ -187,7 +187,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
return nil, nil, err
}
chainDb := rawdb.WrapDatabaseWithWasm(chainData, wasmDb, 1)
err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, persistentConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
if err != nil {
return chainDb, nil, fmt.Errorf("error pruning: %w", err)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo

var initDataReader statetransfer.InitDataReader = nil

chainData, err := stack.OpenDatabaseWithFreezer("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false)
chainData, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", config.Execution.Caching.DatabaseCache, config.Persistent.Handles, config.Persistent.Ancient, "l2chaindata/", false, persistentConfig.Pebble.ExtraOptions("l2chaindata"))
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
return chainDb, l2BlockChain, err
}

err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
err = pruning.PruneChainDb(ctx, chainDb, stack, &config.Init, cacheConfig, persistentConfig, l1Client, rollupAddrs, config.Node.ValidatorRequired())
if err != nil {
return chainDb, nil, fmt.Errorf("error pruning: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func mainImpl() int {
}
}

chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), l1Client, rollupAddrs)
chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), &nodeConfig.Persistent, l1Client, rollupAddrs)
if l2BlockChain != nil {
deferFuncs = append(deferFuncs, func() { l2BlockChain.Stop() })
}
Expand All @@ -502,7 +502,7 @@ func mainImpl() int {
return 1
}

arbDb, err := stack.OpenDatabase("arbitrumdata", 0, 0, "arbitrumdata/", false)
arbDb, err := stack.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, nodeConfig.Persistent.Pebble.ExtraOptions("arbitrumdata"))
deferFuncs = append(deferFuncs, func() { closeDb(arbDb, "arbDb") })
if err != nil {
log.Error("failed to open database", "err", err)
Expand Down
Loading

0 comments on commit d897c4f

Please sign in to comment.