Skip to content

Commit

Permalink
op-conductor: adds execution and node rpc proxy backends (ethereum-op…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhwrd authored Jan 20, 2024
1 parent b22a348 commit e7058c9
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 44 deletions.
28 changes: 16 additions & 12 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type CLIConfig struct {
// the data availability type to use for posting batches, e.g. blobs vs calldata.
DataAvailabilityType flags.DataAvailabilityType

// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration

TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
Expand Down Expand Up @@ -120,17 +123,18 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
PollInterval: ctx.Duration(flags.PollIntervalFlag.Name),

/* Optional Flags */
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}
}
2 changes: 1 addition & 1 deletion op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log)
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, bs.Log)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
}
Expand Down
7 changes: 7 additions & 0 deletions op-batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ var (
}(),
EnvVars: prefixEnvVars("DATA_AVAILABILITY_TYPE"),
}
ActiveSequencerCheckDurationFlag = &cli.DurationFlag{
Name: "active-sequencer-check-duration",
Usage: "The duration between checks to determine the active sequencer endpoint. ",
Value: 2 * time.Minute,
EnvVars: prefixEnvVars("ACTIVE_SEQUENCER_CHECK_DURATION"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
Expand All @@ -113,6 +119,7 @@ var optionalFlags = []cli.Flag{
SequencerHDPathFlag,
BatchTypeFlag,
DataAvailabilityTypeFlag,
ActiveSequencerCheckDurationFlag,
}

func init() {
Expand Down
14 changes: 9 additions & 5 deletions op-conductor/conductor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Config struct {
// RollupCfg is the rollup config.
RollupCfg rollup.Config

// RPCEnableProxy is true if the sequencer RPC proxy should be enabled.
RPCEnableProxy bool

LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
Expand Down Expand Up @@ -116,11 +119,12 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name),
MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name),
},
RollupCfg: *rollupCfg,
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
RollupCfg: *rollupCfg,
RPCEnableProxy: ctx.Bool(flags.RPCEnableProxy.Name),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
}, nil
}

Expand Down
24 changes: 24 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources"
Expand Down Expand Up @@ -198,6 +199,29 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error {
Version: oc.version,
Service: api,
})

if oc.cfg.RPCEnableProxy {
execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC)
if err != nil {
return errors.Wrap(err, "failed to create execution rpc client")
}
executionProxy := conductorrpc.NewExecutionProxyBackend(oc.log, oc, execClient)
server.AddAPI(rpc.API{
Namespace: conductorrpc.ExecutionRPCNamespace,
Service: executionProxy,
})

nodeClient, err := dial.DialRollupClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.NodeRPC)
if err != nil {
return errors.Wrap(err, "failed to create node rpc client")
}
nodeProxy := conductorrpc.NewNodeProxyBackend(oc.log, oc, nodeClient)
server.AddAPI(rpc.API{
Namespace: conductorrpc.NodeRPCNamespace,
Service: nodeProxy,
})
}

oc.rpcServer = server
return nil
}
Expand Down
1 change: 1 addition & 0 deletions op-conductor/conductor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func mockConfig(t *testing.T) Config {
L1SystemConfigAddress: [20]byte{3, 4},
ProtocolVersionsAddress: [20]byte{4, 5},
},
RPCEnableProxy: false,
}
}

Expand Down
7 changes: 7 additions & 0 deletions op-conductor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ var (
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "PAUSED"),
Value: false,
}
RPCEnableProxy = &cli.BoolFlag{
Name: "rpc.enable-proxy",
Usage: "Enable the RPC proxy to underlying sequencer services",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RPC_ENABLE_PROXY"),
Value: true,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -85,6 +91,7 @@ var requiredFlags = []cli.Flag{

var optionalFlags = []cli.Flag{
Paused,
RPCEnableProxy,
}

func init() {
Expand Down
21 changes: 21 additions & 0 deletions op-conductor/rpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@ package rpc

import (
"context"
"errors"
"math/big"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
)

var (
ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer")
)

type ServerInfo struct {
Expand Down Expand Up @@ -42,3 +49,17 @@ type API interface {
// CommitUnsafePayload commits a unsafe payload (lastest head) to the consensus layer.
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}

// ExecutionProxyAPI defines the methods proxied to the execution rpc backend
// This should include all methods that are called by op-batcher or op-proposer
type ExecutionProxyAPI interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}

// NodeProxyAPI defines the methods proxied to the node rpc backend
// This should include all methods that are called by op-batcher or op-proposer
type NodeProxyAPI interface {
OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error)
SequencerActive(ctx context.Context) (bool, error)
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
}
40 changes: 40 additions & 0 deletions op-conductor/rpc/execution_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package rpc

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)

var ExecutionRPCNamespace = "eth"

// ExecutionProxyBackend implements an execution rpc proxy with a leadership check before each call.
type ExecutionProxyBackend struct {
log log.Logger
con conductor
client *ethclient.Client
}

var _ ExecutionProxyAPI = (*ExecutionProxyBackend)(nil)

func NewExecutionProxyBackend(log log.Logger, con conductor, client *ethclient.Client) *ExecutionProxyBackend {
return &ExecutionProxyBackend{
log: log,
con: con,
client: client,
}
}

func (api *ExecutionProxyBackend) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
block, err := api.client.BlockByNumber(ctx, number)
if err != nil {
return nil, err
}
if !api.con.Leader(ctx) {
return nil, ErrNotLeader
}
return block, nil
}
61 changes: 61 additions & 0 deletions op-conductor/rpc/node_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package rpc

import (
"context"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/log"
)

var NodeRPCNamespace = "optimism"

// NodeProxyBackend implements a node rpc proxy with a leadership check before each call.
type NodeProxyBackend struct {
log log.Logger
con conductor
client *sources.RollupClient
}

var _ NodeProxyAPI = (*NodeProxyBackend)(nil)

func NewNodeProxyBackend(log log.Logger, con conductor, client *sources.RollupClient) *NodeProxyBackend {
return &NodeProxyBackend{
log: log,
con: con,
client: client,
}
}

func (api *NodeProxyBackend) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
status, err := api.client.SyncStatus(ctx)
if err != nil {
return nil, err
}
if !api.con.Leader(ctx) {
return nil, ErrNotLeader
}
return status, err
}

func (api *NodeProxyBackend) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) {
output, err := api.client.OutputAtBlock(ctx, blockNum)
if err != nil {
return nil, err
}
if !api.con.Leader(ctx) {
return nil, ErrNotLeader
}
return output, nil
}

func (api *NodeProxyBackend) SequencerActive(ctx context.Context) (bool, error) {
active, err := api.client.SequencerActive(ctx)
if err != nil {
return false, err
}
if !api.con.Leader(ctx) {
return false, ErrNotLeader
}
return active, err
}
Loading

0 comments on commit e7058c9

Please sign in to comment.