Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(pkg): introduce EthClient with a timeout attached (#337)
Browse files Browse the repository at this point in the history
Co-authored-by: jeff <[email protected]>
  • Loading branch information
davidtaikocha and cyberhorsey committed Aug 1, 2023
1 parent 046f2c8 commit a8990ee
Show file tree
Hide file tree
Showing 21 changed files with 490 additions and 48 deletions.
2 changes: 1 addition & 1 deletion bindings/.githead
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f8415a2f73e8a3cdc354a519b5c5af02e715fbf4
b9f3bdad77614657c7978f341c4d4b14ca295b61
6 changes: 6 additions & 0 deletions cmd/flags/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ var (
Category: commonCategory,
Value: 12,
}
RPCTimeout = &cli.Uint64Flag{
Name: "rpc.timeout",
Usage: "Timeout in seconds for RPC calls",
Category: commonCategory,
}
)

// All common flags.
Expand All @@ -111,6 +116,7 @@ var CommonFlags = []cli.Flag{
MetricsPort,
BackOffMaxRetrys,
BackOffRetryInterval,
RPCTimeout,
}

// MergeFlags merges the given flag slices.
Expand Down
6 changes: 3 additions & 3 deletions driver/chain_syncer/beaconsync/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/taikoxyz/taiko-client/pkg/rpc"
)

var (
Expand All @@ -21,7 +21,7 @@ var (
// connected peer or some other reasons).
type SyncProgressTracker struct {
// RPC client
client *ethclient.Client
client *rpc.EthClient

// Meta data
triggered bool
Expand All @@ -41,7 +41,7 @@ type SyncProgressTracker struct {
}

// NewSyncProgressTracker creates a new SyncProgressTracker instance.
func NewSyncProgressTracker(c *ethclient.Client, timeout time.Duration) *SyncProgressTracker {
func NewSyncProgressTracker(c *rpc.EthClient, timeout time.Duration) *SyncProgressTracker {
return &SyncProgressTracker{client: c, timeout: timeout, ticker: time.NewTicker(syncProgressCheckInterval)}
}

Expand Down
9 changes: 9 additions & 0 deletions driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
P2PSyncVerifiedBlocks bool
P2PSyncTimeout time.Duration
BackOffRetryInterval time.Duration
RPCTimeout *time.Duration
}

// NewConfigFromCliContext creates a new config instance from
Expand All @@ -42,6 +43,13 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
return nil, errors.New("empty L2 check point URL")
}

var timeout *time.Duration

if c.IsSet(flags.RPCTimeout.Name) {
duration := time.Duration(c.Uint64(flags.RPCTimeout.Name)) * time.Second
timeout = &duration
}

return &Config{
L1Endpoint: c.String(flags.L1WSEndpoint.Name),
L2Endpoint: c.String(flags.L2WSEndpoint.Name),
Expand All @@ -53,5 +61,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
P2PSyncVerifiedBlocks: p2pSyncVerifiedBlocks,
P2PSyncTimeout: time.Duration(int64(time.Second) * int64(c.Uint(flags.P2PSyncTimeout.Name))),
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
RPCTimeout: timeout,
}, nil
}
4 changes: 4 additions & 0 deletions driver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() {
l2EngineEndpoint := os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT")
taikoL1 := os.Getenv("TAIKO_L1_ADDRESS")
taikoL2 := os.Getenv("TAIKO_L2_ADDRESS")
rpcTimeout := 5 * time.Second

app := cli.NewApp()
app.Flags = []cli.Flag{
Expand All @@ -25,6 +26,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() {
&cli.StringFlag{Name: flags.TaikoL2Address.Name},
&cli.StringFlag{Name: flags.JWTSecret.Name},
&cli.UintFlag{Name: flags.P2PSyncTimeout.Name},
&cli.UintFlag{Name: flags.RPCTimeout.Name},
}
app.Action = func(ctx *cli.Context) error {
c, err := NewConfigFromCliContext(ctx)
Expand All @@ -35,6 +37,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() {
s.Equal(taikoL1, c.TaikoL1Address.String())
s.Equal(taikoL2, c.TaikoL2Address.String())
s.Equal(120*time.Second, c.P2PSyncTimeout)
s.Equal(rpcTimeout, *c.RPCTimeout)
s.NotEmpty(c.JwtSecret)
s.Nil(new(Driver).InitFromCli(context.Background(), ctx))

Expand All @@ -50,5 +53,6 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() {
"-" + flags.TaikoL2Address.Name, taikoL2,
"-" + flags.JWTSecret.Name, os.Getenv("JWT_SECRET"),
"-" + flags.P2PSyncTimeout.Name, "120",
"-" + flags.RPCTimeout.Name, "5",
}))
}
7 changes: 2 additions & 5 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

const (
protocolStatusReportInterval = 30 * time.Second
exchangeTransitionConfigTimeout = 30 * time.Second
exchangeTransitionConfigInterval = 1 * time.Minute
)

Expand Down Expand Up @@ -67,6 +66,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) {
L2EngineEndpoint: cfg.L2EngineEndpoint,
JwtSecret: cfg.JwtSecret,
RetryInterval: cfg.BackOffRetryInterval,
Timeout: cfg.RPCTimeout,
}); err != nil {
return err
}
Expand Down Expand Up @@ -242,10 +242,7 @@ func (d *Driver) exchangeTransitionConfigLoop() {
return
case <-ticker.C:
func() {
ctx, cancel := context.WithTimeout(d.ctx, exchangeTransitionConfigTimeout)
defer cancel()

tc, err := d.rpc.L2Engine.ExchangeTransitionConfiguration(ctx, &engine.TransitionConfigurationV1{
tc, err := d.rpc.L2Engine.ExchangeTransitionConfiguration(d.ctx, &engine.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(common.Big0),
TerminalBlockHash: common.Hash{},
TerminalBlockNumber: 0,
Expand Down
6 changes: 3 additions & 3 deletions pkg/chain_iterator/block_batch_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/taikoxyz/taiko-client/pkg/rpc"
)

const (
Expand Down Expand Up @@ -43,7 +43,7 @@ type EndIterFunc func()
// with the awareness of reorganization.
type BlockBatchIterator struct {
ctx context.Context
client *ethclient.Client
client *rpc.EthClient
chainID *big.Int
blocksReadPerEpoch uint64
startHeight uint64
Expand All @@ -58,7 +58,7 @@ type BlockBatchIterator struct {

// BlockBatchIteratorConfig represents the configs of a block batch iterator.
type BlockBatchIteratorConfig struct {
Client *ethclient.Client
Client *rpc.EthClient
MaxBlocksReadPerEpoch *uint64
StartHeight *big.Int
EndHeight *big.Int
Expand Down
6 changes: 3 additions & 3 deletions pkg/chain_iterator/event_iterator/block_proposed_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/taikoxyz/taiko-client/bindings"
chainIterator "github.com/taikoxyz/taiko-client/pkg/chain_iterator"
"github.com/taikoxyz/taiko-client/pkg/rpc"
)

// EndBlockProposedEventIterFunc ends the current iteration.
Expand All @@ -35,7 +35,7 @@ type BlockProposedIterator struct {

// BlockProposedIteratorConfig represents the configs of a BlockProposed event iterator.
type BlockProposedIteratorConfig struct {
Client *ethclient.Client
Client *rpc.EthClient
TaikoL1 *bindings.TaikoL1Client
MaxBlocksReadPerEpoch *uint64
StartHeight *big.Int
Expand Down Expand Up @@ -95,7 +95,7 @@ func (i *BlockProposedIterator) end() {
// assembleBlockProposedIteratorCallback assembles the callback which will be used
// by a event iterator's inner block iterator.
func assembleBlockProposedIteratorCallback(
client *ethclient.Client,
client *rpc.EthClient,
taikoL1Client *bindings.TaikoL1Client,
filterQuery []*big.Int,
callback OnBlockProposedEvent,
Expand Down
6 changes: 3 additions & 3 deletions pkg/chain_iterator/event_iterator/block_proven_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/taikoxyz/taiko-client/bindings"
chainIterator "github.com/taikoxyz/taiko-client/pkg/chain_iterator"
"github.com/taikoxyz/taiko-client/pkg/rpc"
)

// EndBlockProvenEventIterFunc ends the current iteration.
Expand All @@ -31,7 +31,7 @@ type BlockProvenIterator struct {

// BlockProvenIteratorConfig represents the configs of a BlockProven event iterator.
type BlockProvenIteratorConfig struct {
Client *ethclient.Client
Client *rpc.EthClient
TaikoL1 *bindings.TaikoL1Client
MaxBlocksReadPerEpoch *uint64
StartHeight *big.Int
Expand Down Expand Up @@ -91,7 +91,7 @@ func (i *BlockProvenIterator) end() {
// assembleBlockProvenIteratorCallback assembles the callback which will be used
// by a event iterator's inner block iterator.
func assembleBlockProvenIteratorCallback(
client *ethclient.Client,
client *rpc.EthClient,
taikoL1Client *bindings.TaikoL1Client,
filterQuery []*big.Int,
callback OnBlockProvenEvent,
Expand Down
46 changes: 32 additions & 14 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,27 @@ package rpc

import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/ethclient/gethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/taikoxyz/taiko-client/bindings"
)

var (
errNotArchiveNode = errors.New("error with rpc: node must be archive node")
defaultTimeout = 1 * time.Minute
)

// Client contains all L1/L2 RPC clients that a driver needs.
type Client struct {
// Geth ethclient clients
L1 *ethclient.Client
L2 *ethclient.Client
L2CheckPoint *ethclient.Client
L1 *EthClient
L2 *EthClient
L2CheckPoint *EthClient
// Geth gethclient clients
L1GethClient *gethclient.Client
L2GethClient *gethclient.Client
Expand Down Expand Up @@ -55,15 +53,32 @@ type ClientConfig struct {
L2EngineEndpoint string
JwtSecret string
RetryInterval time.Duration
Timeout *time.Duration
}

// NewClient initializes all RPC clients used by Taiko client softwares.
func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint, cfg.RetryInterval)
l1EthClient, err := DialClientWithBackoff(ctx, cfg.L1Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}

l2EthClient, err := DialClientWithBackoff(ctx, cfg.L2Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}

var l1RPC *EthClient
var l2RPC *EthClient

if cfg.Timeout != nil {
l1RPC = NewEthClientWithTimeout(l1EthClient, *cfg.Timeout)
l2RPC = NewEthClientWithTimeout(l2EthClient, *cfg.Timeout)
} else {
l1RPC = NewEthClientWithDefaultTimeout(l1EthClient)
l2RPC = NewEthClientWithDefaultTimeout(l2EthClient)
}

taikoL1, err := bindings.NewTaikoL1Client(cfg.TaikoL1Address, l1RPC)
if err != nil {
return nil, err
Expand All @@ -77,11 +92,6 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
}
}

l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}

taikoL2, err := bindings.NewTaikoL2Client(cfg.TaikoL2Address, l2RPC)
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,11 +145,19 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
}
}

var l2CheckPoint *ethclient.Client
var l2CheckPoint *EthClient
if len(cfg.L2CheckPoint) != 0 {
if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint, cfg.RetryInterval); err != nil {
l2CheckPointEthClient, err := DialClientWithBackoff(ctx, cfg.L2CheckPoint, cfg.RetryInterval)

if err != nil {
return nil, err
}

if cfg.Timeout != nil {
l2CheckPoint = NewEthClientWithTimeout(l2CheckPointEthClient, *cfg.Timeout)
} else {
l2CheckPoint = NewEthClientWithDefaultTimeout(l2CheckPointEthClient)
}
}

client := &Client{
Expand Down
12 changes: 7 additions & 5 deletions pkg/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rpc

import (
"context"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -21,7 +20,7 @@ func (c *EngineClient) ForkchoiceUpdate(
fc *engine.ForkchoiceStateV1,
attributes *engine.PayloadAttributes,
) (*engine.ForkChoiceResponse, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

var result *engine.ForkChoiceResponse
Expand All @@ -37,7 +36,7 @@ func (c *EngineClient) NewPayload(
ctx context.Context,
payload *engine.ExecutableData,
) (*engine.PayloadStatusV1, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

var result *engine.PayloadStatusV1
Expand All @@ -53,7 +52,7 @@ func (c *EngineClient) GetPayload(
ctx context.Context,
payloadID *engine.PayloadID,
) (*engine.ExecutableData, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

var result *engine.ExecutionPayloadEnvelope
Expand All @@ -69,8 +68,11 @@ func (c *EngineClient) ExchangeTransitionConfiguration(
ctx context.Context,
cfg *engine.TransitionConfigurationV1,
) (*engine.TransitionConfigurationV1, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

var result *engine.TransitionConfigurationV1
if err := c.Client.CallContext(ctx, &result, "engine_exchangeTransitionConfigurationV1", cfg); err != nil {
if err := c.Client.CallContext(timeoutCtx, &result, "engine_exchangeTransitionConfigurationV1", cfg); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit a8990ee

Please sign in to comment.