Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2740] Horizontal Scaling of Validation Node #2354

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6f0bade
Use redis streams in bold
anodar May 30, 2024
2851ef3
Use different streams for bold
anodar May 31, 2024
d15ef6a
Initialize bold execution runner
anodar Jun 5, 2024
9100a8d
Merge branch 'master' into sepolia-tooling-merge-redis
anodar Jun 5, 2024
e2c20ce
Add stream connection and timeout logic
anodar Jun 5, 2024
40b6483
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Jul 1, 2024
5d30cc9
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
301ea72
fix config
amsanghi Aug 27, 2024
0bbc41e
clean up
amsanghi Aug 27, 2024
3cb9241
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
b9c8900
remove redudtant changes
amsanghi Aug 27, 2024
8fcd8b7
fix config setup
amsanghi Aug 28, 2024
f3fc6f3
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
ca909c8
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
f0bc4d4
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
b421fb0
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
aeb0e24
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
596ff81
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 29, 2024
4fbd706
Add tests and fix some bugs
amsanghi Aug 29, 2024
5170a79
Update validator/client/redis/boldproducer.go
rauljordan Aug 29, 2024
545f9d2
add metrics
amsanghi Aug 30, 2024
e9e0224
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
9be8a33
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
161fcca
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 28, 2024
d895bc4
fix lint
amsanghi Oct 29, 2024
8edfbda
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Dec 10, 2024
f765a89
fix lint
amsanghi Dec 10, 2024
a2cb261
fix lint
amsanghi Dec 10, 2024
bc70dfb
fix lint
amsanghi Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,20 @@ type BlockValidator struct {
}

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`
Enable bool `koanf:"enable"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
RedisBoldValidationClientConfig redis.ValidationClientConfig `koanf:"redis-bold-validation-client-config"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`

memoryFreeLimit int
}
Expand Down Expand Up @@ -169,6 +170,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
redis.ValidationClientConfigAddOptions(prefix+".redis-bold-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
Expand All @@ -185,33 +187,35 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) {
}

var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
RedisBoldValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
RedisBoldValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
MemoryFreeLimit: "default",
}

var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{
Expand Down
1 change: 0 additions & 1 deletion staker/bold_state_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ func (s *BOLDStateProvider) CollectMachineHashes(
if err != nil {
return nil, err
}
// TODO: Enable Redis streams.
execRun, err := s.statelessValidator.execSpawners[0].CreateExecutionRun(cfg.WasmModuleRoot, input).Await(ctx)
if err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ func NewStatelessBlockValidator(
for i := range configs {
i := i
confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] }
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack))
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack))
if i == 0 {
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisBoldValidationClientConfig, stack))
}
}

if len(executionSpawners) == 0 {
Expand Down
10 changes: 5 additions & 5 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_
}
configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config }
spawner := &mockSpawner{}
serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, configFetcher)
serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, nil, configFetcher)

valAPIs := []rpc.API{{
Namespace: server_api.Namespace,
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestValidationServerAPI(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, validationDefault := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, validationDefault)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -292,7 +292,7 @@ func TestValidationClientRoom(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -379,10 +379,10 @@ func TestExecutionKeepAlive(t *testing.T) {
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig)
configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig)

clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault)
clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault)
err := clientDefault.Start(ctx)
Require(t, err)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO)
err = clientShortTO.Start(ctx)
Require(t, err)

Expand Down
86 changes: 86 additions & 0 deletions validator/client/redis/boldproducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package redis

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator/server_api"
)

// BoldValidationClient implements bold validation client through redis streams.
type BoldValidationClient struct {
stopwaiter.StopWaiter
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]
config *ValidationClientConfig
}

func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) {
return &BoldValidationClient{
producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]),
config: cfg,
}, nil
}

func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
if c.config.RedisURL == "" {
return fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL)
if err != nil {
return err
}
for _, mr := range moduleRoots {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already existsw for module root", "hash", mr)
rauljordan marked this conversation as resolved.
Show resolved Hide resolved
continue
}
p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](
redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
}
p.Start(c.GetContext())
c.producers[mr] = p
}
return nil
}

func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] {
producer, found := c.producers[req.ModuleRoot]
if !found {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot))
}
promise, err := producer.Produce(c.GetContext(), req)
if err != nil {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err))
}
return promise
}

func (c *BoldValidationClient) Start(ctx_in context.Context) error {
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *BoldValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}
28 changes: 25 additions & 3 deletions validator/client/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"

"github.com/offchainlabs/nitro/validator/client/redis"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_common"

Expand Down Expand Up @@ -146,11 +147,21 @@ func (c *ValidationClient) Room() int {

type ExecutionClient struct {
ValidationClient
boldValClient *redis.BoldValidationClient
}

func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient {
func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *ExecutionClient {
var boldClient *redis.BoldValidationClient
if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() {
var err error
boldClient, err = redis.NewBoldValidationClient(redisBoldValidationClientConfig)
if err != nil {
log.Error("Creating new redis bold validation client", "error", err)
}
}
return &ExecutionClient{
ValidationClient: *NewValidationClient(config, stack),
boldValClient: boldClient,
}
}

Expand All @@ -172,8 +183,10 @@ func (c *ExecutionClient) CreateExecutionRun(wasmModuleRoot common.Hash, input *

type ExecutionClientRun struct {
stopwaiter.StopWaiter
client *ExecutionClient
id uint64
client *ExecutionClient
id uint64
wasmModuleRoot common.Hash
input *validator.ValidationInput
}

func (c *ExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] {
Expand Down Expand Up @@ -233,6 +246,15 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[*
}

func (r *ExecutionClientRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] {
if r.client.boldValClient != nil {
return r.client.boldValClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{
anodar marked this conversation as resolved.
Show resolved Hide resolved
ModuleRoot: r.wasmModuleRoot,
MachineStartIndex: machineStartIndex,
StepSize: stepSize,
NumDesiredLeaves: maxIterations,
ValidationInput: r.input,
})
}
return stopwaiter.LaunchPromiseThread[[]common.Hash](r, func(ctx context.Context) ([]common.Hash, error) {
var resJson []common.Hash
err := r.client.client.CallContext(ctx, &resJson, server_api.Namespace+"_getMachineHashesWithStepSize", r.id, machineStartIndex, stepSize, maxIterations)
Expand Down
12 changes: 12 additions & 0 deletions validator/server_api/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string {
return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex())
}

func RedisBoldStreamForRoot(prefix string, moduleRoot common.Hash) string {
return fmt.Sprintf("%sstream-bold:%s", prefix, moduleRoot.Hex())
}

type Request struct {
Input *InputJSON
ModuleRoot common.Hash
Expand Down Expand Up @@ -176,3 +180,11 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro
}
return valInput, nil
}

type GetLeavesWithStepSizeInput struct {
ModuleRoot common.Hash
MachineStartIndex uint64
StepSize uint64
NumDesiredLeaves uint64
ValidationInput *validator.ValidationInput
}
Loading
Loading