Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2740] Horizontal Scaling of Validation Node #2354

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6f0bade
Use redis streams in bold
anodar May 30, 2024
2851ef3
Use different streams for bold
anodar May 31, 2024
d15ef6a
Initialize bold execution runner
anodar Jun 5, 2024
9100a8d
Merge branch 'master' into sepolia-tooling-merge-redis
anodar Jun 5, 2024
e2c20ce
Add stream connection and timeout logic
anodar Jun 5, 2024
40b6483
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Jul 1, 2024
5d30cc9
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
301ea72
fix config
amsanghi Aug 27, 2024
0bbc41e
clean up
amsanghi Aug 27, 2024
3cb9241
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
b9c8900
remove redudtant changes
amsanghi Aug 27, 2024
8fcd8b7
fix config setup
amsanghi Aug 28, 2024
f3fc6f3
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
ca909c8
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
f0bc4d4
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
b421fb0
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
aeb0e24
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
596ff81
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 29, 2024
4fbd706
Add tests and fix some bugs
amsanghi Aug 29, 2024
5170a79
Update validator/client/redis/boldproducer.go
rauljordan Aug 29, 2024
545f9d2
add metrics
amsanghi Aug 30, 2024
e9e0224
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
9be8a33
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
161fcca
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 28, 2024
d895bc4
fix lint
amsanghi Oct 29, 2024
8edfbda
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Dec 10, 2024
f765a89
fix lint
amsanghi Dec 10, 2024
a2cb261
fix lint
amsanghi Dec 10, 2024
bc70dfb
fix lint
amsanghi Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,20 @@ type BlockValidator struct {
}

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`
Enable bool `koanf:"enable"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
RedisBoldValidationClientConfig redis.ValidationClientConfig `koanf:"redis-bold-validation-client-config"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`

memoryFreeLimit int
}
Expand Down Expand Up @@ -169,6 +170,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
redis.ValidationClientConfigAddOptions(prefix+".redis-bold-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
Expand All @@ -185,33 +187,35 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) {
}

var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
RedisBoldValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
RedisBoldValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{
Expand Down
1 change: 0 additions & 1 deletion staker/bold_state_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ func (s *BOLDStateProvider) CollectMachineHashes(
if err != nil {
return nil, err
}
// TODO: Enable Redis streams.
execRun, err := s.statelessValidator.execSpawners[0].CreateExecutionRun(cfg.WasmModuleRoot, input).Await(ctx)
if err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ func NewStatelessBlockValidator(
for i := range configs {
i := i
confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] }
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack))
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack))
if i == 0 {
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisBoldValidationClientConfig, stack))
}
}

if len(executionSpawners) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion system_tests/full_challenge_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall
var mockSpawn *mockSpawner
builder.valnodeConfig.Wasm.RootPath = wasmRootDir
if useStubs {
mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator)
mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator, "")
} else {
// For now validation only works with HashScheme set
builder.execConfig.Caching.StateScheme = rawdb.HashScheme
Expand Down
59 changes: 49 additions & 10 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/validator"
clientredis "github.com/offchainlabs/nitro/validator/client/redis"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_arb"
arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis"
"github.com/offchainlabs/nitro/validator/valnode"
"github.com/offchainlabs/nitro/validator/valnode/redis"

validatorclient "github.com/offchainlabs/nitro/validator/client"
)
Expand Down Expand Up @@ -166,7 +170,7 @@ func (r *mockExecRun) CheckAlive(ctx context.Context) error {

func (r *mockExecRun) Close() {}

func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig) (*mockSpawner, *node.Node) {
func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig, redisURL string) (*mockSpawner, *node.Node) {
stackConf := node.DefaultConfig
stackConf.HTTPPort = 0
stackConf.DataDir = ""
Expand All @@ -184,7 +188,18 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_
}
configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config }
spawner := &mockSpawner{}
serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, configFetcher)
var redisExecSpawner *arbredis.ExecutionSpawner
if redisURL != "" {
redisValidationServerConfig := redis.TestValidationServerConfig
redisValidationServerConfig.RedisURL = redisURL
redisValidationServerConfig.ModuleRoots = make([]string, len(mockWasmModuleRoots))
for i := range redisValidationServerConfig.ModuleRoots {
redisValidationServerConfig.ModuleRoots[i] = mockWasmModuleRoots[i].Hex()
}
redisExecSpawner, err = arbredis.NewExecutionSpawner(&redisValidationServerConfig, spawner)
Require(t, err)
}
serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, redisExecSpawner, configFetcher)

valAPIs := []rpc.API{{
Namespace: server_api.Namespace,
Expand All @@ -211,11 +226,28 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_

// mostly tests translation to/from json and running over network
func TestValidationServerAPI(t *testing.T) {
testValidationServerAPI(t, false)
}

// mostly tests translation to/from json and running over network with bold validation redis consumer/producer
func TestValidationServerAPIWithBoldValidationConsumerProducer(t *testing.T) {
testValidationServerAPI(t, true)
}
func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bool) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, validationDefault := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault)
var redisURL string
var redisBoldValidationClientConfig *clientredis.ValidationClientConfig
if withBoldValidationConsumerProducer {
redisURL = redisutil.CreateTestRedis(ctx, t)
redisBoldValidationClientConfig = &clientredis.TestValidationClientConfig
redisBoldValidationClientConfig.RedisURL = redisURL
redisBoldValidationClientConfig.CreateStreams = true
}

_, validationDefault := createMockValidationNode(t, ctx, nil, redisURL)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -285,14 +317,21 @@ func TestValidationServerAPI(t *testing.T) {
if !bytes.Equal(proof, mockProof) {
t.Error("mock proof not expected")
}

hashes := execRun.GetMachineHashesWithStepSize(0, 1, 5)
hashesRes, err := hashes.Await(ctx)
Require(t, err)
if len(hashesRes) != 5 {
t.Error("unexpected number of hashes")
}
}

func TestValidationClientRoom(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack)
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil, "")
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -373,16 +412,16 @@ func TestExecutionKeepAlive(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, validationDefault := createMockValidationNode(t, ctx, nil)
_, validationDefault := createMockValidationNode(t, ctx, nil, "")
shortTimeoutConfig := server_arb.DefaultArbitratorSpawnerConfig
shortTimeoutConfig.ExecutionRunTimeout = time.Second
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig)
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig, "")
configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig)

clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault)
clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault)
err := clientDefault.Start(ctx)
Require(t, err)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO)
err = clientShortTO.Start(ctx)
Require(t, err)

Expand Down
85 changes: 85 additions & 0 deletions validator/client/redis/boldproducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package redis

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator/server_api"
)

// BoldValidationClient implements bold validation client through redis streams.
type BoldValidationClient struct {
stopwaiter.StopWaiter
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]
config *ValidationClientConfig
}

func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) {
return &BoldValidationClient{
producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]),
config: cfg,
}, nil
}

func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
if c.config.RedisURL == "" {
return fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL)
if err != nil {
return err
}
for _, mr := range moduleRoots {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already exists for module root", "hash", mr)
continue
}
p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](
redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
}
c.producers[mr] = p
}
return nil
}

func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] {
producer, found := c.producers[req.ModuleRoot]
if !found {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot))
}
promise, err := producer.Produce(c.GetContext(), req)
if err != nil {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err))
}
return promise
}

func (c *BoldValidationClient) Start(ctx_in context.Context) error {
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *BoldValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}
Loading
Loading