Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2740] Horizontal Scaling of Validation Node #2354

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6f0bade
Use redis streams in bold
anodar May 30, 2024
2851ef3
Use different streams for bold
anodar May 31, 2024
d15ef6a
Initialize bold execution runner
anodar Jun 5, 2024
9100a8d
Merge branch 'master' into sepolia-tooling-merge-redis
anodar Jun 5, 2024
e2c20ce
Add stream connection and timeout logic
anodar Jun 5, 2024
40b6483
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Jul 1, 2024
5d30cc9
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
301ea72
fix config
amsanghi Aug 27, 2024
0bbc41e
clean up
amsanghi Aug 27, 2024
3cb9241
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
b9c8900
remove redudtant changes
amsanghi Aug 27, 2024
8fcd8b7
fix config setup
amsanghi Aug 28, 2024
f3fc6f3
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
ca909c8
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
f0bc4d4
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
b421fb0
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
aeb0e24
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
596ff81
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 29, 2024
4fbd706
Add tests and fix some bugs
amsanghi Aug 29, 2024
5170a79
Update validator/client/redis/boldproducer.go
rauljordan Aug 29, 2024
545f9d2
add metrics
amsanghi Aug 30, 2024
e9e0224
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
9be8a33
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
161fcca
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 28, 2024
d895bc4
fix lint
amsanghi Oct 29, 2024
8edfbda
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Dec 10, 2024
f765a89
fix lint
amsanghi Dec 10, 2024
a2cb261
fix lint
amsanghi Dec 10, 2024
bc70dfb
fix lint
amsanghi Dec 10, 2024
70f64a1
Merge branch 'master' into sepolia-tooling-merge-redis
eljobe Dec 17, 2024
b0adab0
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Jan 3, 2025
3da52e3
Changes based on PR comments
amsanghi Jan 3, 2025
1b1fc9f
Changes based on PR comments
amsanghi Jan 3, 2025
fa80856
Changes based on PR comments
amsanghi Jan 3, 2025
580db8d
Changes based on PR comments
amsanghi Jan 3, 2025
87b68d5
remove
amsanghi Jan 3, 2025
a9ab22e
Changes based on PR comments
amsanghi Jan 3, 2025
7999540
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion staker/bold/bold_state_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (s *BOLDStateProvider) CollectMachineHashes(
if err != nil {
return nil, err
}
// TODO: Enable Redis streams.
wasmModRoot := cfg.AssertionMetadata.WasmModuleRoot
execRun, err := s.statelessValidator.ExecutionSpawners()[0].CreateExecutionRun(wasmModRoot, input, true).Await(ctx)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ func NewStatelessBlockValidator(
i := i
confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] }
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack))
if i == 0 {
if config().RedisValidationClientConfig.Enabled() {
executionSpawners = append(executionSpawners, validatorclient.NewBoldExecutionClient(confFetcher, &config().RedisValidationClientConfig, stack))
}
}
}

if len(executionSpawners) == 0 {
Expand Down
26 changes: 25 additions & 1 deletion system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/validator"
validatorclient "github.com/offchainlabs/nitro/validator/client"
clientredis "github.com/offchainlabs/nitro/validator/client/redis"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_arb"
"github.com/offchainlabs/nitro/validator/valnode"
Expand Down Expand Up @@ -206,11 +208,26 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_

// mostly tests translation to/from json and running over network
func TestValidationServerAPI(t *testing.T) {
testValidationServerAPI(t, false)
}

// mostly tests translation to/from json and running over network with bold validation redis consumer/producer
func TestValidationServerAPIWithBoldValidationConsumerProducer(t *testing.T) {
testValidationServerAPI(t, true)
}
func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bool) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var redisBoldValidationClientConfig *clientredis.ValidationClientConfig
if withBoldValidationConsumerProducer {
redisBoldValidationClientConfig = &clientredis.TestValidationClientConfig
redisBoldValidationClientConfig.RedisURL = redisutil.CreateTestRedis(ctx, t)
redisBoldValidationClientConfig.CreateStreams = true
}

_, validationDefault := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault)
client := validatorclient.NewBoldExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -280,6 +297,13 @@ func TestValidationServerAPI(t *testing.T) {
if !bytes.Equal(proof, mockProof) {
t.Error("mock proof not expected")
}

hashes := execRun.GetMachineHashesWithStepSize(0, 1, 5)
hashesRes, err := hashes.Await(ctx)
Require(t, err)
if len(hashesRes) != 5 {
t.Error("unexpected number of hashes")
}
}

func TestValidationClientRoom(t *testing.T) {
Expand Down
163 changes: 157 additions & 6 deletions validator/client/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
"github.com/offchainlabs/nitro/validator/client/redis"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_common"
)
Expand Down Expand Up @@ -144,14 +147,20 @@ func (c *ValidationClient) Room() int {
}

type ExecutionClient struct {
ValidationClient
*ValidationClient
}

func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient {
return &ExecutionClient{
ValidationClient: *NewValidationClient(config, stack),
ValidationClient: NewValidationClient(config, stack),
}
}
func (c *ExecutionClient) Start(ctx context.Context) error {
if err := c.ValidationClient.Start(ctx); err != nil {
return err
}
return nil
}

func (c *ExecutionClient) CreateExecutionRun(
wasmModuleRoot common.Hash,
Expand All @@ -165,8 +174,10 @@ func (c *ExecutionClient) CreateExecutionRun(
return nil, err
}
run := &ExecutionClientRun{
client: c,
id: res,
wasmModuleRoot: wasmModuleRoot,
client: c,
id: res,
input: input,
}
run.Start(c.GetContext()) // note: not this temporary thread's context!
return run, nil
Expand All @@ -175,8 +186,11 @@ func (c *ExecutionClient) CreateExecutionRun(

type ExecutionClientRun struct {
stopwaiter.StopWaiter
client *ExecutionClient
id uint64
client *ExecutionClient
boldClient *BoldExecutionClient
id uint64
wasmModuleRoot common.Hash
input *validator.ValidationInput
}

func (c *ExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] {
Expand Down Expand Up @@ -223,6 +237,15 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[*
}

func (r *ExecutionClientRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] {
if r.boldClient != nil {
return r.boldClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{
ModuleRoot: r.wasmModuleRoot,
MachineStartIndex: machineStartIndex,
StepSize: stepSize,
NumDesiredLeaves: maxIterations,
ValidationInput: r.input,
})
}
return stopwaiter.LaunchPromiseThread[[]common.Hash](r, func(ctx context.Context) ([]common.Hash, error) {
var resJson []common.Hash
err := r.client.client.CallContext(ctx, &resJson, server_api.Namespace+"_getMachineHashesWithStepSize", r.id, machineStartIndex, stepSize, maxIterations)
Expand Down Expand Up @@ -267,3 +290,131 @@ func (r *ExecutionClientRun) Close() {
}
})
}

// BoldValidationClient implements bold validation client through redis streams.
type BoldValidationClient struct {
stopwaiter.StopWaiter
*ValidationClient
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]
config *redis.ValidationClientConfig
}

func NewBoldValidationClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) (*BoldValidationClient, error) {
return &BoldValidationClient{
ValidationClient: NewValidationClient(config, stack),
producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]),
config: redisBoldValidationClientConfig,
}, nil
}

func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
if c.config.RedisURL == "" {
return fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL)
if err != nil {
return err
}
for _, mr := range moduleRoots {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already exists for module root", "hash", mr)
continue
}
p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](
redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
}
c.producers[mr] = p
}
return nil
}

func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] {
producer, found := c.producers[req.ModuleRoot]
if !found {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot))
}
promise, err := producer.Produce(c.GetContext(), req)
if err != nil {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err))
}
return promise
}

func (c *BoldValidationClient) Start(ctx_in context.Context) error {
if err := c.Initialize(ctx_in, c.wasmModuleRoots); err != nil {
return err
}
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *BoldValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}

type BoldExecutionClient struct {
*BoldValidationClient
}

func NewBoldExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *BoldExecutionClient {
var validationClient *BoldValidationClient
if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() {
var err error
validationClient, err = NewBoldValidationClient(config, redisBoldValidationClientConfig, stack)
if err != nil {
log.Error("Creating new redis bold validation client", "error", err)
}
}
return &BoldExecutionClient{
BoldValidationClient: validationClient,
}
}

func (c *BoldExecutionClient) CreateExecutionRun(
wasmModuleRoot common.Hash,
input *validator.ValidationInput,
useBoldMachine bool,
) containers.PromiseInterface[validator.ExecutionRun] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (validator.ExecutionRun, error) {
var res uint64
err := c.client.CallContext(ctx, &res, server_api.Namespace+"_createExecutionRun", wasmModuleRoot, server_api.ValidationInputToJson(input), useBoldMachine)
if err != nil {
return nil, err
}
run := &ExecutionClientRun{
wasmModuleRoot: wasmModuleRoot,
boldClient: c,
client: &ExecutionClient{ValidationClient: c.BoldValidationClient.ValidationClient},
id: res,
input: input,
}
run.Start(c.GetContext()) // note: not this temporary thread's context!
return run, nil
})
}

func (c *BoldExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] {
return stopwaiter.LaunchPromiseThread[common.Hash](c, func(ctx context.Context) (common.Hash, error) {
var res common.Hash
err := c.client.CallContext(ctx, &res, server_api.Namespace+"_latestWasmModuleRoot")
if err != nil {
return common.Hash{}, err
}
return res, nil
})
}
12 changes: 12 additions & 0 deletions validator/server_api/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string {
return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex())
}

func RedisBoldStreamForRoot(prefix string, moduleRoot common.Hash) string {
return fmt.Sprintf("%sstream-bold:%s", prefix, moduleRoot.Hex())
}

type Request struct {
Input *InputJSON
ModuleRoot common.Hash
Expand Down Expand Up @@ -169,3 +173,11 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro
}
return valInput, nil
}

type GetLeavesWithStepSizeInput struct {
ModuleRoot common.Hash
MachineStartIndex uint64
StepSize uint64
NumDesiredLeaves uint64
ValidationInput *validator.ValidationInput
}
Loading
Loading