Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(rpc): keep retrying when connecting to endpoints #708

Merged
merged 5 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion driver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() {
s.NotEmpty(c.JwtSecret)
s.True(c.P2PSyncVerifiedBlocks)
s.Equal(l2CheckPoint, c.L2CheckPoint)
s.NotNil(new(Driver).InitFromCli(context.Background(), ctx))
s.Nil(new(Driver).InitFromCli(context.Background(), ctx))

return err
}
Expand Down
68 changes: 45 additions & 23 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package rpc

import (
"context"
"os"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/taikoxyz/taiko-client/bindings"
)

Expand Down Expand Up @@ -48,19 +51,53 @@ type ClientConfig struct {

// NewClient initializes all RPC clients used by Taiko client software.
func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout)
defer cancel()
var (
l1Client *EthClient
l2Client *EthClient
l1BeaconClient *BeaconClient
l2CheckPoint *EthClient
err error
)

l1Client, err := NewEthClient(ctxWithTimeout, cfg.L1Endpoint, cfg.Timeout)
if err != nil {
return nil, err
}
// Keep retrying to connect to the RPC endpoints until success or context is cancelled.
if err := backoff.Retry(func() error {
ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout)
defer cancel()

l2Client, err := NewEthClient(ctxWithTimeout, cfg.L2Endpoint, cfg.Timeout)
if err != nil {
if l1Client, err = NewEthClient(ctxWithTimeout, cfg.L1Endpoint, cfg.Timeout); err != nil {
log.Error("Failed to connect to L1 endpoint, retrying", "endpoint", cfg.L1Endpoint, "err", err)
return err
}

if l2Client, err = NewEthClient(ctxWithTimeout, cfg.L2Endpoint, cfg.Timeout); err != nil {
log.Error("Failed to connect to L2 endpoint, retrying", "endpoint", cfg.L2Endpoint, "err", err)
return err
}

// NOTE: when running tests, we do not have a L1 beacon endpoint.
if cfg.L1BeaconEndpoint != "" && os.Getenv("RUN_TESTS") == "" {
if l1BeaconClient, err = NewBeaconClient(cfg.L1BeaconEndpoint, defaultTimeout); err != nil {
log.Error("Failed to connect to L1 beacon endpoint, retrying", "endpoint", cfg.L1BeaconEndpoint, "err", err)
return err
}
}

if cfg.L2CheckPoint != "" {
l2CheckPoint, err = NewEthClient(ctxWithTimeout, cfg.L2CheckPoint, cfg.Timeout)
if err != nil {
log.Error("Failed to connect to L2 checkpoint endpoint, retrying", "endpoint", cfg.L2CheckPoint, "err", err)
return err
}
}

return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)); err != nil {
return nil, err
}

ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout)
defer cancel()

taikoL1, err := bindings.NewTaikoL1Client(cfg.TaikoL1Address, l1Client)
if err != nil {
return nil, err
Expand Down Expand Up @@ -96,21 +133,6 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
}
}

var l1BeaconClient *BeaconClient
if cfg.L1BeaconEndpoint != "" {
if l1BeaconClient, err = NewBeaconClient(cfg.L1BeaconEndpoint, defaultTimeout); err != nil {
return nil, err
}
}

var l2CheckPoint *EthClient
if cfg.L2CheckPoint != "" {
l2CheckPoint, err = NewEthClient(ctxWithTimeout, cfg.L2CheckPoint, cfg.Timeout)
if err != nil {
return nil, err
}
}

client := &Client{
L1: l1Client,
L1Beacon: l1BeaconClient,
Expand Down
4 changes: 2 additions & 2 deletions prover/prover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *ProverTestSuite) TestInitError() {

p := new(Prover)

s.ErrorContains(InitFromConfig(ctx, p, &Config{
s.NotNil(InitFromConfig(ctx, p, &Config{
L1WsEndpoint: os.Getenv("L1_NODE_WS_ENDPOINT"),
L1HttpEndpoint: os.Getenv("L1_NODE_HTTP_ENDPOINT"),
L2WsEndpoint: os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"),
Expand Down Expand Up @@ -155,7 +155,7 @@ func (s *ProverTestSuite) TestInitError() {
TxSendTimeout: txmgr.DefaultBatcherFlagValues.TxSendTimeout,
TxNotInMempoolTimeout: txmgr.DefaultBatcherFlagValues.TxNotInMempoolTimeout,
},
}), "dial tcp:")
}))
}

func (s *ProverTestSuite) TestOnBlockProposed() {
Expand Down
Loading