diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index c1cbf77d349e..1a15c5d27e52 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -61,6 +61,9 @@ type CLIConfig struct { // the data availability type to use for posting batches, e.g. blobs vs calldata. DataAvailabilityType flags.DataAvailabilityType + // ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint. + ActiveSequencerCheckDuration time.Duration + TxMgrConfig txmgr.CLIConfig LogConfig oplog.CLIConfig MetricsConfig opmetrics.CLIConfig @@ -120,17 +123,18 @@ func NewConfig(ctx *cli.Context) *CLIConfig { PollInterval: ctx.Duration(flags.PollIntervalFlag.Name), /* Optional Flags */ - MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name), - MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name), - MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name), - Stopped: ctx.Bool(flags.StoppedFlag.Name), - BatchType: ctx.Uint(flags.BatchTypeFlag.Name), - DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)), - TxMgrConfig: txmgr.ReadCLIConfig(ctx), - LogConfig: oplog.ReadCLIConfig(ctx), - MetricsConfig: opmetrics.ReadCLIConfig(ctx), - PprofConfig: oppprof.ReadCLIConfig(ctx), - CompressorConfig: compressor.ReadCLIConfig(ctx), - RPC: oprpc.ReadCLIConfig(ctx), + MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name), + MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name), + MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name), + Stopped: ctx.Bool(flags.StoppedFlag.Name), + BatchType: ctx.Uint(flags.BatchTypeFlag.Name), + DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)), + ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name), + TxMgrConfig: txmgr.ReadCLIConfig(ctx), + LogConfig: oplog.ReadCLIConfig(ctx), + MetricsConfig: opmetrics.ReadCLIConfig(ctx), + PprofConfig: oppprof.ReadCLIConfig(ctx), + CompressorConfig: compressor.ReadCLIConfig(ctx), + RPC: oprpc.ReadCLIConfig(ctx), } } diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index a28a09079eea..570a051c7112 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -131,7 +131,7 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") { rollupUrls := strings.Split(cfg.RollupRpc, ",") ethUrls := strings.Split(cfg.L2EthRpc, ",") - endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log) + endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, bs.Log) } else { endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc) } diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 21192a6e7eda..549fa1026a31 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -93,6 +93,12 @@ var ( }(), EnvVars: prefixEnvVars("DATA_AVAILABILITY_TYPE"), } + ActiveSequencerCheckDurationFlag = &cli.DurationFlag{ + Name: "active-sequencer-check-duration", + Usage: "The duration between checks to determine the active sequencer endpoint. ", + Value: 2 * time.Minute, + EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"), + } // Legacy Flags SequencerHDPathFlag = txmgr.SequencerHDPathFlag ) @@ -113,6 +119,7 @@ var optionalFlags = []cli.Flag{ SequencerHDPathFlag, BatchTypeFlag, DataAvailabilityTypeFlag, + ActiveSequencerCheckDurationFlag, } func init() { diff --git a/op-conductor/conductor/config.go b/op-conductor/conductor/config.go index 4e2471165f74..8e69957c4f7f 100644 --- a/op-conductor/conductor/config.go +++ b/op-conductor/conductor/config.go @@ -48,6 +48,9 @@ type Config struct { // RollupCfg is the rollup config. RollupCfg rollup.Config + // RPCEnableProxy is true if the sequencer RPC proxy should be enabled. + RPCEnableProxy bool + LogConfig oplog.CLIConfig MetricsConfig opmetrics.CLIConfig PprofConfig oppprof.CLIConfig @@ -116,11 +119,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name), MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name), }, - RollupCfg: *rollupCfg, - LogConfig: oplog.ReadCLIConfig(ctx), - MetricsConfig: opmetrics.ReadCLIConfig(ctx), - PprofConfig: oppprof.ReadCLIConfig(ctx), - RPC: oprpc.ReadCLIConfig(ctx), + RollupCfg: *rollupCfg, + RPCEnableProxy: ctx.Bool(flags.RPCEnableProxy.Name), + LogConfig: oplog.ReadCLIConfig(ctx), + MetricsConfig: opmetrics.ReadCLIConfig(ctx), + PprofConfig: oppprof.ReadCLIConfig(ctx), + RPC: oprpc.ReadCLIConfig(ctx), }, nil } diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index c27a49e94bce..a4f4f5e1c2cb 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -21,6 +21,7 @@ import ( opp2p "github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-service/cliapp" opclient "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" "github.com/ethereum-optimism/optimism/op-service/sources" @@ -198,6 +199,29 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error { Version: oc.version, Service: api, }) + + if oc.cfg.RPCEnableProxy { + execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC) + if err != nil { + return errors.Wrap(err, "failed to create execution rpc client") + } + executionProxy := conductorrpc.NewExecutionProxyBackend(oc.log, oc, execClient) + server.AddAPI(rpc.API{ + Namespace: conductorrpc.ExecutionRPCNamespace, + Service: executionProxy, + }) + + nodeClient, err := dial.DialRollupClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.NodeRPC) + if err != nil { + return errors.Wrap(err, "failed to create node rpc client") + } + nodeProxy := conductorrpc.NewNodeProxyBackend(oc.log, oc, nodeClient) + server.AddAPI(rpc.API{ + Namespace: conductorrpc.NodeRPCNamespace, + Service: nodeProxy, + }) + } + oc.rpcServer = server return nil } diff --git a/op-conductor/conductor/service_test.go b/op-conductor/conductor/service_test.go index cd1b83ed0e47..3659ee949de6 100644 --- a/op-conductor/conductor/service_test.go +++ b/op-conductor/conductor/service_test.go @@ -70,6 +70,7 @@ func mockConfig(t *testing.T) Config { L1SystemConfigAddress: [20]byte{3, 4}, ProtocolVersionsAddress: [20]byte{4, 5}, }, + RPCEnableProxy: false, } } diff --git a/op-conductor/flags/flags.go b/op-conductor/flags/flags.go index 1de5b0e7833d..fff6e34cb31a 100644 --- a/op-conductor/flags/flags.go +++ b/op-conductor/flags/flags.go @@ -69,6 +69,12 @@ var ( EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "PAUSED"), Value: false, } + RPCEnableProxy = &cli.BoolFlag{ + Name: "rpc.enable-proxy", + Usage: "Enable the RPC proxy to underlying sequencer services", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RPC_ENABLE_PROXY"), + Value: true, + } ) var requiredFlags = []cli.Flag{ @@ -85,6 +91,7 @@ var requiredFlags = []cli.Flag{ var optionalFlags = []cli.Flag{ Paused, + RPCEnableProxy, } func init() { diff --git a/op-conductor/rpc/api.go b/op-conductor/rpc/api.go index 8b5abd9b8934..6e04939fbfb8 100644 --- a/op-conductor/rpc/api.go +++ b/op-conductor/rpc/api.go @@ -2,8 +2,15 @@ package rpc import ( "context" + "errors" + "math/big" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer") ) type ServerInfo struct { @@ -42,3 +49,17 @@ type API interface { // CommitUnsafePayload commits a unsafe payload (lastest head) to the consensus layer. CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error } + +// ExecutionProxyAPI defines the methods proxied to the execution rpc backend +// This should include all methods that are called by op-batcher or op-proposer +type ExecutionProxyAPI interface { + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) +} + +// NodeProxyAPI defines the methods proxied to the node rpc backend +// This should include all methods that are called by op-batcher or op-proposer +type NodeProxyAPI interface { + OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) + SequencerActive(ctx context.Context) (bool, error) + SyncStatus(ctx context.Context) (*eth.SyncStatus, error) +} diff --git a/op-conductor/rpc/execution_proxy.go b/op-conductor/rpc/execution_proxy.go new file mode 100644 index 000000000000..cfa7327e567e --- /dev/null +++ b/op-conductor/rpc/execution_proxy.go @@ -0,0 +1,40 @@ +package rpc + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" +) + +var ExecutionRPCNamespace = "eth" + +// ExecutionProxyBackend implements an execution rpc proxy with a leadership check before each call. +type ExecutionProxyBackend struct { + log log.Logger + con conductor + client *ethclient.Client +} + +var _ ExecutionProxyAPI = (*ExecutionProxyBackend)(nil) + +func NewExecutionProxyBackend(log log.Logger, con conductor, client *ethclient.Client) *ExecutionProxyBackend { + return &ExecutionProxyBackend{ + log: log, + con: con, + client: client, + } +} + +func (api *ExecutionProxyBackend) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + block, err := api.client.BlockByNumber(ctx, number) + if err != nil { + return nil, err + } + if !api.con.Leader(ctx) { + return nil, ErrNotLeader + } + return block, nil +} diff --git a/op-conductor/rpc/node_proxy.go b/op-conductor/rpc/node_proxy.go new file mode 100644 index 000000000000..64aa9907d286 --- /dev/null +++ b/op-conductor/rpc/node_proxy.go @@ -0,0 +1,61 @@ +package rpc + +import ( + "context" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum/go-ethereum/log" +) + +var NodeRPCNamespace = "optimism" + +// NodeProxyBackend implements a node rpc proxy with a leadership check before each call. +type NodeProxyBackend struct { + log log.Logger + con conductor + client *sources.RollupClient +} + +var _ NodeProxyAPI = (*NodeProxyBackend)(nil) + +func NewNodeProxyBackend(log log.Logger, con conductor, client *sources.RollupClient) *NodeProxyBackend { + return &NodeProxyBackend{ + log: log, + con: con, + client: client, + } +} + +func (api *NodeProxyBackend) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { + status, err := api.client.SyncStatus(ctx) + if err != nil { + return nil, err + } + if !api.con.Leader(ctx) { + return nil, ErrNotLeader + } + return status, err +} + +func (api *NodeProxyBackend) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) { + output, err := api.client.OutputAtBlock(ctx, blockNum) + if err != nil { + return nil, err + } + if !api.con.Leader(ctx) { + return nil, ErrNotLeader + } + return output, nil +} + +func (api *NodeProxyBackend) SequencerActive(ctx context.Context) (bool, error) { + active, err := api.client.SequencerActive(ctx) + if err != nil { + return false, err + } + if !api.con.Leader(ctx) { + return false, ErrNotLeader + } + return active, err +} diff --git a/op-e2e/sequencer_failover_setup.go b/op-e2e/sequencer_failover_setup.go index dcc71ad5d0e1..6a9a3d1ca338 100644 --- a/op-e2e/sequencer_failover_setup.go +++ b/op-e2e/sequencer_failover_setup.go @@ -44,12 +44,17 @@ type conductor struct { service *con.OpConductor client conrpc.API consensusPort int + rpcPort int } func (c *conductor) ConsensusEndpoint() string { return fmt.Sprintf("%s:%d", localhost, c.consensusPort) } +func (c *conductor) RPCEndpoint() string { + return fmt.Sprintf("%s:%d", localhost, c.rpcPort) +} + func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { InitParallel(t) ctx := context.Background() @@ -59,9 +64,6 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { sys, err := cfg.Start(t) require.NoError(t, err) - // 1 batcher that listens to all 3 sequencers, in started mode. - setupBatcher(t, sys) - // 3 conductors that connects to 1 sequencer each. conductors := make(map[string]*conductor) @@ -81,6 +83,9 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { conductors[cfg.name] = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.bootstrap, *sys.RollupConfig) } + // 1 batcher that listens to all 3 sequencers, in started mode. + setupBatcher(t, sys, conductors) + // form a cluster c1 := conductors[sequencer1Name] c2 := conductors[sequencer2Name] @@ -128,20 +133,21 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) { func setupConductor( t *testing.T, - serverID, dir, nodePRC, engineRPC string, + serverID, dir, nodeRPC, engineRPC string, bootstrap bool, rollupCfg rollup.Config, ) *conductor { // it's unfortunate that it is not possible to pass 0 as consensus port and get back the actual assigned port from raft implementation. // So we find an available port and pass it in to avoid test flakiness (avoid port already in use error). consensusPort := findAvailablePort(t) + rpcPort := findAvailablePort(t) cfg := con.Config{ ConsensusAddr: localhost, ConsensusPort: consensusPort, RaftServerID: serverID, RaftStorageDir: dir, RaftBootstrap: bootstrap, - NodeRPC: nodePRC, + NodeRPC: nodeRPC, ExecutionRPC: engineRPC, Paused: true, HealthCheck: con.HealthCheckConfig{ @@ -149,14 +155,15 @@ func setupConductor( SafeInterval: 4, // per test setup (l1 block time = 2s, max channel duration = 1, 2s buffer) MinPeerCount: 2, // per test setup, each sequencer has 2 peers }, - RollupCfg: rollupCfg, + RollupCfg: rollupCfg, + RPCEnableProxy: true, LogConfig: oplog.CLIConfig{ Level: log.LvlInfo, Color: false, }, RPC: oprpc.CLIConfig{ ListenAddr: localhost, - ListenPort: 0, + ListenPort: rpcPort, }, } @@ -174,10 +181,11 @@ func setupConductor( service: service, client: client, consensusPort: consensusPort, + rpcPort: rpcPort, } } -func setupBatcher(t *testing.T, sys *System) { +func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { var batchType uint = derive.SingularBatchType if sys.Cfg.DeployConfig.L2GenesisDeltaTimeOffset != nil && *sys.Cfg.DeployConfig.L2GenesisDeltaTimeOffset == hexutil.Uint64(0) { batchType = derive.SpanBatchType @@ -189,14 +197,14 @@ func setupBatcher(t *testing.T, sys *System) { // enable active sequencer follow mode. l2EthRpc := strings.Join([]string{ - sys.EthInstances[sequencer1Name].WSEndpoint(), - sys.EthInstances[sequencer2Name].WSEndpoint(), - sys.EthInstances[sequencer3Name].WSEndpoint(), + conductors[sequencer1Name].RPCEndpoint(), + conductors[sequencer2Name].RPCEndpoint(), + conductors[sequencer3Name].RPCEndpoint(), }, ",") rollupRpc := strings.Join([]string{ - sys.RollupNodes[sequencer1Name].HTTPEndpoint(), - sys.RollupNodes[sequencer2Name].HTTPEndpoint(), - sys.RollupNodes[sequencer3Name].HTTPEndpoint(), + conductors[sequencer1Name].RPCEndpoint(), + conductors[sequencer2Name].RPCEndpoint(), + conductors[sequencer3Name].RPCEndpoint(), }, ",") batcherCLIConfig := &bss.CLIConfig{ L1EthRpc: sys.EthInstances["l1"].WSEndpoint(), @@ -217,9 +225,10 @@ func setupBatcher(t *testing.T, sys *System) { Level: log.LvlInfo, Format: oplog.FormatText, }, - Stopped: false, - BatchType: batchType, - DataAvailabilityType: batcherFlags.CalldataType, + Stopped: false, + BatchType: batchType, + DataAvailabilityType: batcherFlags.CalldataType, + ActiveSequencerCheckDuration: 0, } batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) diff --git a/op-proposer/flags/flags.go b/op-proposer/flags/flags.go index a62a8034359a..2ca34179d02c 100644 --- a/op-proposer/flags/flags.go +++ b/op-proposer/flags/flags.go @@ -69,6 +69,12 @@ var ( EnvVars: prefixEnvVars("DG_TYPE"), Hidden: true, } + ActiveSequencerCheckDurationFlag = &cli.DurationFlag{ + Name: "active-sequencer-check-duration", + Usage: "The duration between checks to determine the active sequencer endpoint. ", + Value: 2 * time.Minute, + EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"), + } // Legacy Flags L2OutputHDPathFlag = txmgr.L2OutputHDPathFlag ) @@ -86,6 +92,7 @@ var optionalFlags = []cli.Flag{ DisputeGameFactoryAddressFlag, ProposalIntervalFlag, DisputeGameTypeFlag, + ActiveSequencerCheckDurationFlag, } func init() { diff --git a/op-proposer/proposer/config.go b/op-proposer/proposer/config.go index ac1c4dc89de3..8f3770e6e288 100644 --- a/op-proposer/proposer/config.go +++ b/op-proposer/proposer/config.go @@ -55,6 +55,9 @@ type CLIConfig struct { // DisputeGameType is the type of dispute game to create when submitting an output proposal. DisputeGameType uint8 + + // ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint. + ActiveSequencerCheckDuration time.Duration } func (c *CLIConfig) Check() error { @@ -94,13 +97,14 @@ func NewConfig(ctx *cli.Context) *CLIConfig { PollInterval: ctx.Duration(flags.PollIntervalFlag.Name), TxMgrConfig: txmgr.ReadCLIConfig(ctx), // Optional Flags - AllowNonFinalized: ctx.Bool(flags.AllowNonFinalizedFlag.Name), - RPCConfig: oprpc.ReadCLIConfig(ctx), - LogConfig: oplog.ReadCLIConfig(ctx), - MetricsConfig: opmetrics.ReadCLIConfig(ctx), - PprofConfig: oppprof.ReadCLIConfig(ctx), - DGFAddress: ctx.String(flags.DisputeGameFactoryAddressFlag.Name), - ProposalInterval: ctx.Duration(flags.ProposalIntervalFlag.Name), - DisputeGameType: uint8(ctx.Uint(flags.DisputeGameTypeFlag.Name)), + AllowNonFinalized: ctx.Bool(flags.AllowNonFinalizedFlag.Name), + RPCConfig: oprpc.ReadCLIConfig(ctx), + LogConfig: oplog.ReadCLIConfig(ctx), + MetricsConfig: opmetrics.ReadCLIConfig(ctx), + PprofConfig: oppprof.ReadCLIConfig(ctx), + DGFAddress: ctx.String(flags.DisputeGameFactoryAddressFlag.Name), + ProposalInterval: ctx.Duration(flags.ProposalIntervalFlag.Name), + DisputeGameType: uint8(ctx.Uint(flags.DisputeGameTypeFlag.Name)), + ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name), } } diff --git a/op-proposer/proposer/service.go b/op-proposer/proposer/service.go index 6796d7fb8cee..cadefdf32935 100644 --- a/op-proposer/proposer/service.go +++ b/op-proposer/proposer/service.go @@ -127,7 +127,7 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e var rollupProvider dial.RollupProvider if strings.Contains(cfg.RollupRpc, ",") { rollupUrls := strings.Split(cfg.RollupRpc, ",") - rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, ps.Log) + rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, ps.Log) } else { rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc) }