diff --git a/staker/block_validator.go b/staker/block_validator.go index 43e5c7d28f..fae899d4cc 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -107,21 +107,22 @@ type BlockValidator struct { } type BlockValidatorConfig struct { - Enable bool `koanf:"enable"` - RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` - ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` - ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` - ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` - PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` - RecordingIterLimit uint64 `koanf:"recording-iter-limit"` - ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` - BatchCacheLimit uint32 `koanf:"batch-cache-limit"` - CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload - PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload - FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` - Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` - MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` - ValidationServerConfigsList string `koanf:"validation-server-configs-list"` + Enable bool `koanf:"enable"` + RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` + RedisBoldValidationClientConfig redis.ValidationClientConfig `koanf:"redis-bold-validation-client-config"` + ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` + ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` + ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` + PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` + RecordingIterLimit uint64 `koanf:"recording-iter-limit"` + ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` + BatchCacheLimit uint32 `koanf:"batch-cache-limit"` + CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload + PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload + FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` + Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` + ValidationServerConfigsList string `koanf:"validation-server-configs-list"` // The directory to which the BlockValidator will write the // block_inputs_.json files when WriteToFile() is called. BlockInputsFilePath string `koanf:"block-inputs-file-path"` @@ -181,6 +182,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation") rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer) redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f) + redis.ValidationClientConfigAddOptions(prefix+".redis-bold-validation-client-config", f) f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations") f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (stores batch-copy per block)") @@ -200,39 +202,41 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) { } var DefaultBlockValidatorConfig = BlockValidatorConfig{ - Enable: false, - ValidationServerConfigsList: "default", - ValidationServer: rpcclient.DefaultClientConfig, - RedisValidationClientConfig: redis.DefaultValidationClientConfig, - ValidationPoll: time.Second, - ForwardBlocks: 128, - PrerecordedBlocks: uint64(2 * runtime.NumCPU()), - BatchCacheLimit: 20, - CurrentModuleRoot: "current", - PendingUpgradeModuleRoot: "latest", - FailureIsFatal: true, - Dangerous: DefaultBlockValidatorDangerousConfig, - BlockInputsFilePath: "./target/validation_inputs", - MemoryFreeLimit: "default", - RecordingIterLimit: 20, + Enable: false, + ValidationServerConfigsList: "default", + ValidationServer: rpcclient.DefaultClientConfig, + RedisValidationClientConfig: redis.DefaultValidationClientConfig, + RedisBoldValidationClientConfig: redis.DefaultValidationClientConfig, + ValidationPoll: time.Second, + ForwardBlocks: 128, + PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + BatchCacheLimit: 20, + CurrentModuleRoot: "current", + PendingUpgradeModuleRoot: "latest", + FailureIsFatal: true, + Dangerous: DefaultBlockValidatorDangerousConfig, + BlockInputsFilePath: "./target/validation_inputs", + MemoryFreeLimit: "default", + RecordingIterLimit: 20, } var TestBlockValidatorConfig = BlockValidatorConfig{ - Enable: false, - ValidationServer: rpcclient.TestClientConfig, - ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, - RedisValidationClientConfig: redis.TestValidationClientConfig, - ValidationPoll: 100 * time.Millisecond, - ForwardBlocks: 128, - BatchCacheLimit: 20, - PrerecordedBlocks: uint64(2 * runtime.NumCPU()), - RecordingIterLimit: 20, - CurrentModuleRoot: "latest", - PendingUpgradeModuleRoot: "latest", - FailureIsFatal: true, - Dangerous: DefaultBlockValidatorDangerousConfig, - BlockInputsFilePath: "./target/validation_inputs", - MemoryFreeLimit: "default", + Enable: false, + ValidationServer: rpcclient.TestClientConfig, + ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, + RedisValidationClientConfig: redis.TestValidationClientConfig, + RedisBoldValidationClientConfig: redis.TestValidationClientConfig, + ValidationPoll: 100 * time.Millisecond, + ForwardBlocks: 128, + BatchCacheLimit: 20, + PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + RecordingIterLimit: 20, + CurrentModuleRoot: "latest", + PendingUpgradeModuleRoot: "latest", + FailureIsFatal: true, + Dangerous: DefaultBlockValidatorDangerousConfig, + BlockInputsFilePath: "./target/validation_inputs", + MemoryFreeLimit: "default", } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ diff --git a/staker/bold/bold_state_provider.go b/staker/bold/bold_state_provider.go index 48b7cbd91e..f70c0b6617 100644 --- a/staker/bold/bold_state_provider.go +++ b/staker/bold/bold_state_provider.go @@ -369,7 +369,6 @@ func (s *BOLDStateProvider) CollectMachineHashes( if err != nil { return nil, err } - // TODO: Enable Redis streams. wasmModRoot := cfg.AssertionMetadata.WasmModuleRoot execRun, err := s.statelessValidator.ExecutionSpawners()[0].CreateExecutionRun(wasmModRoot, input, true).Await(ctx) if err != nil { diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 62e772d5f8..16b0077430 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -249,7 +249,10 @@ func NewStatelessBlockValidator( for i := range configs { i := i confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] } - executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack)) + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack)) + if i == 0 { + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisBoldValidationClientConfig, stack)) + } } if len(executionSpawners) == 0 { diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index 4d902f87ba..549a608afd 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -261,7 +261,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall var mockSpawn *mockSpawner builder.valnodeConfig.Wasm.RootPath = wasmRootDir if useStubs { - mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator) + mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator, "") } else { // For now validation only works with HashScheme set builder.execConfig.Caching.StateScheme = rawdb.HashScheme diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 98dab7ad39..199ebb07a8 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -20,12 +20,16 @@ import ( "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/containers" + "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/validator" validatorclient "github.com/offchainlabs/nitro/validator/client" + clientredis "github.com/offchainlabs/nitro/validator/client/redis" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" + arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" "github.com/offchainlabs/nitro/validator/valnode" + "github.com/offchainlabs/nitro/validator/valnode/redis" ) type mockSpawner struct { @@ -161,7 +165,7 @@ func (r *mockExecRun) CheckAlive(ctx context.Context) error { func (r *mockExecRun) Close() {} -func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig) (*mockSpawner, *node.Node) { +func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig, redisURL string) (*mockSpawner, *node.Node) { stackConf := node.DefaultConfig stackConf.HTTPPort = 0 stackConf.DataDir = "" @@ -179,7 +183,18 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_ } configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config } spawner := &mockSpawner{} - serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, configFetcher) + var redisExecSpawner *arbredis.ExecutionSpawner + if redisURL != "" { + redisValidationServerConfig := redis.TestValidationServerConfig + redisValidationServerConfig.RedisURL = redisURL + redisValidationServerConfig.ModuleRoots = make([]string, len(mockWasmModuleRoots)) + for i := range redisValidationServerConfig.ModuleRoots { + redisValidationServerConfig.ModuleRoots[i] = mockWasmModuleRoots[i].Hex() + } + redisExecSpawner, err = arbredis.NewExecutionSpawner(&redisValidationServerConfig, spawner) + Require(t, err) + } + serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, redisExecSpawner, configFetcher) valAPIs := []rpc.API{{ Namespace: server_api.Namespace, @@ -206,11 +221,28 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_ // mostly tests translation to/from json and running over network func TestValidationServerAPI(t *testing.T) { + testValidationServerAPI(t, false) +} + +// mostly tests translation to/from json and running over network with bold validation redis consumer/producer +func TestValidationServerAPIWithBoldValidationConsumerProducer(t *testing.T) { + testValidationServerAPI(t, true) +} +func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bool) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, validationDefault := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault) + var redisURL string + var redisBoldValidationClientConfig *clientredis.ValidationClientConfig + if withBoldValidationConsumerProducer { + redisURL = redisutil.CreateTestRedis(ctx, t) + redisBoldValidationClientConfig = &clientredis.TestValidationClientConfig + redisBoldValidationClientConfig.RedisURL = redisURL + redisBoldValidationClientConfig.CreateStreams = true + } + + _, validationDefault := createMockValidationNode(t, ctx, nil, redisURL) + client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault) err := client.Start(ctx) Require(t, err) @@ -280,14 +312,21 @@ func TestValidationServerAPI(t *testing.T) { if !bytes.Equal(proof, mockProof) { t.Error("mock proof not expected") } + + hashes := execRun.GetMachineHashesWithStepSize(0, 1, 5) + hashesRes, err := hashes.Await(ctx) + Require(t, err) + if len(hashesRes) != 5 { + t.Error("unexpected number of hashes") + } } func TestValidationClientRoom(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack) + mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil, "") + client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack) err := client.Start(ctx) Require(t, err) @@ -368,16 +407,16 @@ func TestExecutionKeepAlive(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, validationDefault := createMockValidationNode(t, ctx, nil) + _, validationDefault := createMockValidationNode(t, ctx, nil, "") shortTimeoutConfig := server_arb.DefaultArbitratorSpawnerConfig shortTimeoutConfig.ExecutionRunTimeout = time.Second - _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig) + _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig, "") configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig) - clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault) + clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault) err := clientDefault.Start(ctx) Require(t, err) - clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO) + clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO) err = clientShortTO.Start(ctx) Require(t, err) diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go new file mode 100644 index 0000000000..d834b0fefe --- /dev/null +++ b/validator/client/redis/boldproducer.go @@ -0,0 +1,86 @@ +package redis + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/offchainlabs/nitro/pubsub" + "github.com/offchainlabs/nitro/util/containers" + "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator/server_api" +) + +// BoldValidationClient implements bold validation client through redis streams. +type BoldValidationClient struct { + stopwaiter.StopWaiter + // producers stores moduleRoot to producer mapping. + producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + config *ValidationClientConfig +} + +func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) { + return &BoldValidationClient{ + producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), + config: cfg, + }, nil +} + +func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { + if c.config.RedisURL == "" { + return fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL) + if err != nil { + return err + } + for _, mr := range moduleRoots { + if c.config.CreateStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } + } + if _, exists := c.producers[mr]; exists { + log.Warn("Producer already exists for module root", "hash", mr) + continue + } + p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( + redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig) + if err != nil { + log.Warn("failed init redis for %v: %w", mr, err) + continue + } + c.producers[mr] = p + } + return nil +} + +func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] { + producer, found := c.producers[req.ModuleRoot] + if !found { + return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot)) + } + promise, err := producer.Produce(c.GetContext(), req) + if err != nil { + return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err)) + } + return promise +} + +func (c *BoldValidationClient) Start(ctx_in context.Context) error { + for _, p := range c.producers { + p.Start(ctx_in) + } + c.StopWaiter.Start(ctx_in, c) + return nil +} + +func (c *BoldValidationClient) Stop() { + for _, p := range c.producers { + p.StopAndWait() + } + c.StopWaiter.StopAndWait() +} diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index c04817d654..ba8c399f03 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -22,6 +22,7 @@ import ( "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/validator" + "github.com/offchainlabs/nitro/validator/client/redis" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_common" ) @@ -145,13 +146,37 @@ func (c *ValidationClient) Room() int { type ExecutionClient struct { ValidationClient + boldValClient *redis.BoldValidationClient } -func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient { +func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *ExecutionClient { + var boldClient *redis.BoldValidationClient + if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() { + var err error + boldClient, err = redis.NewBoldValidationClient(redisBoldValidationClientConfig) + if err != nil { + log.Error("Creating new redis bold validation client", "error", err) + } + } return &ExecutionClient{ ValidationClient: *NewValidationClient(config, stack), + boldValClient: boldClient, } } +func (c *ExecutionClient) Start(ctx context.Context) error { + if err := c.ValidationClient.Start(ctx); err != nil { + return err + } + if c.boldValClient != nil { + if err := c.boldValClient.Initialize(ctx, c.wasmModuleRoots); err != nil { + return err + } + if err := c.boldValClient.Start(ctx); err != nil { + return err + } + } + return nil +} func (c *ExecutionClient) CreateExecutionRun( wasmModuleRoot common.Hash, @@ -165,8 +190,10 @@ func (c *ExecutionClient) CreateExecutionRun( return nil, err } run := &ExecutionClientRun{ - client: c, - id: res, + wasmModuleRoot: wasmModuleRoot, + client: c, + id: res, + input: input, } run.Start(c.GetContext()) // note: not this temporary thread's context! return run, nil @@ -175,8 +202,10 @@ func (c *ExecutionClient) CreateExecutionRun( type ExecutionClientRun struct { stopwaiter.StopWaiter - client *ExecutionClient - id uint64 + client *ExecutionClient + id uint64 + wasmModuleRoot common.Hash + input *validator.ValidationInput } func (c *ExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] { @@ -223,6 +252,15 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[* } func (r *ExecutionClientRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] { + if r.client.boldValClient != nil { + return r.client.boldValClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{ + ModuleRoot: r.wasmModuleRoot, + MachineStartIndex: machineStartIndex, + StepSize: stepSize, + NumDesiredLeaves: maxIterations, + ValidationInput: r.input, + }) + } return stopwaiter.LaunchPromiseThread[[]common.Hash](r, func(ctx context.Context) ([]common.Hash, error) { var resJson []common.Hash err := r.client.client.CallContext(ctx, &resJson, server_api.Namespace+"_getMachineHashesWithStepSize", r.id, machineStartIndex, stepSize, maxIterations) diff --git a/validator/server_api/json.go b/validator/server_api/json.go index f56493cd92..84f3bf035e 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -50,6 +50,10 @@ func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string { return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex()) } +func RedisBoldStreamForRoot(prefix string, moduleRoot common.Hash) string { + return fmt.Sprintf("%sstream-bold:%s", prefix, moduleRoot.Hex()) +} + type Request struct { Input *InputJSON ModuleRoot common.Hash @@ -169,3 +173,11 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro } return valInput, nil } + +type GetLeavesWithStepSizeInput struct { + ModuleRoot common.Hash + MachineStartIndex uint64 + StepSize uint64 + NumDesiredLeaves uint64 + ValidationInput *validator.ValidationInput +} diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go new file mode 100644 index 0000000000..15d9ea2e46 --- /dev/null +++ b/validator/server_arb/redis/consumer.go @@ -0,0 +1,131 @@ +package redis + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/offchainlabs/nitro/pubsub" + "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator" + "github.com/offchainlabs/nitro/validator/server_api" + "github.com/offchainlabs/nitro/validator/valnode/redis" +) + +type ExecutionSpawner struct { + stopwaiter.StopWaiter + spawner validator.ExecutionSpawner + + // consumers stores moduleRoot to consumer mapping. + consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + config *redis.ValidationServerConfig +} + +func NewExecutionSpawner(cfg *redis.ValidationServerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { + if cfg.RedisURL == "" { + return nil, fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) + if err != nil { + return nil, err + } + consumers := make(map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]) + for _, hash := range cfg.ModuleRoots { + mr := common.HexToHash(hash) + c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisBoldStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) + if err != nil { + return nil, fmt.Errorf("creating consumer for validation: %w", err) + } + consumers[mr] = c + } + return &ExecutionSpawner{ + consumers: consumers, + spawner: spawner, + config: cfg, + }, nil +} + +func (s *ExecutionSpawner) Start(ctx_in context.Context) { + s.StopWaiter.Start(ctx_in, s) + // Channel that all consumers use to indicate their readiness. + readyStreams := make(chan struct{}, len(s.consumers)) + for moduleRoot, c := range s.consumers { + c := c + moduleRoot := moduleRoot + c.Start(ctx_in) + // Channel for single consumer, once readiness is indicated in this, + // consumer will start consuming iteratively. + ready := make(chan struct{}, 1) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) { + ready <- struct{}{} + readyStreams <- struct{}{} + return + } + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-time.After(time.Millisecond * 100): + } + } + }) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-ready: // Wait until the stream exists and start consuming iteratively. + } + s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + req, err := c.Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + if req == nil { + // There's nothing in the queue. + return time.Second + } + run, err := s.spawner.CreateExecutionRun(moduleRoot, + req.Value.ValidationInput, true).Await(ctx) + if err != nil { + log.Error("Creating BOLD execution", "error", err) + return 0 + } + hashes, err := run.GetMachineHashesWithStepSize( + req.Value.MachineStartIndex, + req.Value.StepSize, + req.Value.NumDesiredLeaves).Await(ctx) + if err != nil { + log.Error("Getting machine hashes", "error", err) + return 0 + } + if err := c.SetResult(ctx, req.ID, hashes); err != nil { + log.Error("Error setting result for request", "id", req.ID, "result", hashes, "error", err) + return 0 + } + return time.Second + }) + }) + } + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + select { + case <-readyStreams: + log.Trace("At least one stream is ready") + return // Don't block Start if at least one of the stream is ready. + case <-time.After(s.config.StreamTimeout): + log.Error("Waiting for redis streams timed out") + case <-ctx.Done(): + log.Info("Context expired, failed to start") + return + } + } + }) +} diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index dab74f6e29..58393d0844 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -18,6 +18,7 @@ import ( "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" + arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" ) type ValidationServerAPI struct { @@ -61,7 +62,8 @@ type execRunEntry struct { type ExecServerAPI struct { stopwaiter.StopWaiter ValidationServerAPI - execSpawner validator.ExecutionSpawner + execSpawner validator.ExecutionSpawner + redisExecSpawner *arbredis.ExecutionSpawner config server_arb.ArbitratorSpawnerConfigFecher @@ -70,10 +72,15 @@ type ExecServerAPI struct { runs map[uint64]*execRunEntry } -func NewExecutionServerAPI(valSpawner validator.ValidationSpawner, execution validator.ExecutionSpawner, config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { +func NewExecutionServerAPI( + valSpawner validator.ValidationSpawner, + execution validator.ExecutionSpawner, + redisExecSpawner *arbredis.ExecutionSpawner, + config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { return &ExecServerAPI{ ValidationServerAPI: *NewValidationServerAPI(valSpawner), execSpawner: execution, + redisExecSpawner: redisExecSpawner, nextId: rand.Uint64(), // good-enough to aver reusing ids after reboot runs: make(map[uint64]*execRunEntry), config: config, @@ -120,6 +127,9 @@ func (a *ExecServerAPI) removeOldRuns(ctx context.Context) time.Duration { func (a *ExecServerAPI) Start(ctx_in context.Context) { a.StopWaiter.Start(ctx_in, a) a.CallIteratively(a.removeOldRuns) + if a.redisExecSpawner != nil { + a.redisExecSpawner.Start(ctx_in) + } } var errRunNotFound error = errors.New("run not found") diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index e3bf662aaa..f60f963712 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -12,6 +12,7 @@ import ( "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" + arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" "github.com/offchainlabs/nitro/validator/server_common" "github.com/offchainlabs/nitro/validator/server_jit" "github.com/offchainlabs/nitro/validator/valnode/redis" @@ -36,32 +37,35 @@ var DefaultWasmConfig = WasmConfig{ } type Config struct { - UseJit bool `koanf:"use-jit"` - ApiAuth bool `koanf:"api-auth"` - ApiPublic bool `koanf:"api-public"` - Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` - Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` - Wasm WasmConfig `koanf:"wasm"` + UseJit bool `koanf:"use-jit"` + ApiAuth bool `koanf:"api-auth"` + ApiPublic bool `koanf:"api-public"` + Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` + RedisExecRunner redis.ValidationServerConfig `koanf:"redis-exec-runner"` + Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` + Wasm WasmConfig `koanf:"wasm"` } type ValidationConfigFetcher func() *Config var DefaultValidationConfig = Config{ - UseJit: true, - Jit: server_jit.DefaultJitSpawnerConfig, - ApiAuth: true, - ApiPublic: false, - Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - Wasm: DefaultWasmConfig, + UseJit: true, + Jit: server_jit.DefaultJitSpawnerConfig, + ApiAuth: true, + ApiPublic: false, + Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, + RedisExecRunner: redis.DefaultValidationServerConfig, + Wasm: DefaultWasmConfig, } var TestValidationConfig = Config{ - UseJit: true, - Jit: server_jit.DefaultJitSpawnerConfig, - ApiAuth: false, - ApiPublic: true, - Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - Wasm: DefaultWasmConfig, + UseJit: true, + Jit: server_jit.DefaultJitSpawnerConfig, + ApiAuth: false, + ApiPublic: true, + Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, + RedisExecRunner: redis.DefaultValidationServerConfig, + Wasm: DefaultWasmConfig, } func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { @@ -71,6 +75,7 @@ func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { server_arb.ArbitratorSpawnerConfigAddOptions(prefix+".arbitrator", f) server_jit.JitSpawnerConfigAddOptions(prefix+".jit", f) WasmConfigAddOptions(prefix+".wasm", f) + redis.ValidationServerConfigAddOptions(prefix+".redis-exec-runner", f) } type ValidationNode struct { @@ -78,7 +83,8 @@ type ValidationNode struct { arbSpawner *server_arb.ArbitratorSpawner jitSpawner *server_jit.JitSpawner - redisConsumer *redis.ValidationServer + redisConsumer *redis.ValidationServer + redisExecSpawner *arbredis.ExecutionSpawner } func EnsureValidationExposedViaAuthRPC(stackConf *node.Config) { @@ -107,8 +113,18 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod if err != nil { return nil, err } - var serverAPI *ExecServerAPI - var jitSpawner *server_jit.JitSpawner + var ( + serverAPI *ExecServerAPI + jitSpawner *server_jit.JitSpawner + redisSpawner *arbredis.ExecutionSpawner + ) + if config.RedisExecRunner.Enabled() { + es, err := arbredis.NewExecutionSpawner(&config.RedisExecRunner, arbSpawner) + if err != nil { + log.Error("creating redis execution spawner", "error", err) + } + redisSpawner = es + } if config.UseJit { jitConfigFetcher := func() *server_jit.JitSpawnerConfig { return &configFetcher().Jit } var err error @@ -116,9 +132,9 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod if err != nil { return nil, err } - serverAPI = NewExecutionServerAPI(jitSpawner, arbSpawner, arbConfigFetcher) + serverAPI = NewExecutionServerAPI(jitSpawner, arbSpawner, redisSpawner, arbConfigFetcher) } else { - serverAPI = NewExecutionServerAPI(arbSpawner, arbSpawner, arbConfigFetcher) + serverAPI = NewExecutionServerAPI(arbSpawner, arbSpawner, redisSpawner, arbConfigFetcher) } var redisConsumer *redis.ValidationServer redisValidationConfig := arbConfigFetcher().RedisValidationServerConfig @@ -137,7 +153,13 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod }} stack.RegisterAPIs(valAPIs) - return &ValidationNode{configFetcher, arbSpawner, jitSpawner, redisConsumer}, nil + return &ValidationNode{ + config: configFetcher, + arbSpawner: arbSpawner, + jitSpawner: jitSpawner, + redisConsumer: redisConsumer, + redisExecSpawner: redisSpawner, + }, nil } func (v *ValidationNode) Start(ctx context.Context) error {