From ade84c12650568b8be21654a5716b9ab6d3d3565 Mon Sep 17 00:00:00 2001 From: David Date: Wed, 10 Apr 2024 18:17:22 +0800 Subject: [PATCH] feat(rpc): keep retrying when connecting to endpoints (#708) --- driver/config_test.go | 2 +- pkg/rpc/client.go | 68 ++++++++++++++++++++++++++++--------------- prover/prover_test.go | 4 +-- 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/driver/config_test.go b/driver/config_test.go index 5232aa03f..2d528cb72 100644 --- a/driver/config_test.go +++ b/driver/config_test.go @@ -38,7 +38,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() { s.NotEmpty(c.JwtSecret) s.True(c.P2PSyncVerifiedBlocks) s.Equal(l2CheckPoint, c.L2CheckPoint) - s.NotNil(new(Driver).InitFromCli(context.Background(), ctx)) + s.Nil(new(Driver).InitFromCli(context.Background(), ctx)) return err } diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 3aa2b0a51..7380e03fd 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -2,9 +2,12 @@ package rpc import ( "context" + "os" "time" + "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "github.com/taikoxyz/taiko-client/bindings" ) @@ -48,19 +51,53 @@ type ClientConfig struct { // NewClient initializes all RPC clients used by Taiko client software. func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { - ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) - defer cancel() + var ( + l1Client *EthClient + l2Client *EthClient + l1BeaconClient *BeaconClient + l2CheckPoint *EthClient + err error + ) - l1Client, err := NewEthClient(ctxWithTimeout, cfg.L1Endpoint, cfg.Timeout) - if err != nil { - return nil, err - } + // Keep retrying to connect to the RPC endpoints until success or context is cancelled. + if err := backoff.Retry(func() error { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() - l2Client, err := NewEthClient(ctxWithTimeout, cfg.L2Endpoint, cfg.Timeout) - if err != nil { + if l1Client, err = NewEthClient(ctxWithTimeout, cfg.L1Endpoint, cfg.Timeout); err != nil { + log.Error("Failed to connect to L1 endpoint, retrying", "endpoint", cfg.L1Endpoint, "err", err) + return err + } + + if l2Client, err = NewEthClient(ctxWithTimeout, cfg.L2Endpoint, cfg.Timeout); err != nil { + log.Error("Failed to connect to L2 endpoint, retrying", "endpoint", cfg.L2Endpoint, "err", err) + return err + } + + // NOTE: when running tests, we do not have a L1 beacon endpoint. + if cfg.L1BeaconEndpoint != "" && os.Getenv("RUN_TESTS") == "" { + if l1BeaconClient, err = NewBeaconClient(cfg.L1BeaconEndpoint, defaultTimeout); err != nil { + log.Error("Failed to connect to L1 beacon endpoint, retrying", "endpoint", cfg.L1BeaconEndpoint, "err", err) + return err + } + } + + if cfg.L2CheckPoint != "" { + l2CheckPoint, err = NewEthClient(ctxWithTimeout, cfg.L2CheckPoint, cfg.Timeout) + if err != nil { + log.Error("Failed to connect to L2 checkpoint endpoint, retrying", "endpoint", cfg.L2CheckPoint, "err", err) + return err + } + } + + return nil + }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)); err != nil { return nil, err } + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + taikoL1, err := bindings.NewTaikoL1Client(cfg.TaikoL1Address, l1Client) if err != nil { return nil, err @@ -96,21 +133,6 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { } } - var l1BeaconClient *BeaconClient - if cfg.L1BeaconEndpoint != "" { - if l1BeaconClient, err = NewBeaconClient(cfg.L1BeaconEndpoint, defaultTimeout); err != nil { - return nil, err - } - } - - var l2CheckPoint *EthClient - if cfg.L2CheckPoint != "" { - l2CheckPoint, err = NewEthClient(ctxWithTimeout, cfg.L2CheckPoint, cfg.Timeout) - if err != nil { - return nil, err - } - } - client := &Client{ L1: l1Client, L1Beacon: l1BeaconClient, diff --git a/prover/prover_test.go b/prover/prover_test.go index bcf38376f..8caf93479 100644 --- a/prover/prover_test.go +++ b/prover/prover_test.go @@ -125,7 +125,7 @@ func (s *ProverTestSuite) TestInitError() { p := new(Prover) - s.ErrorContains(InitFromConfig(ctx, p, &Config{ + s.NotNil(InitFromConfig(ctx, p, &Config{ L1WsEndpoint: os.Getenv("L1_NODE_WS_ENDPOINT"), L1HttpEndpoint: os.Getenv("L1_NODE_HTTP_ENDPOINT"), L2WsEndpoint: os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"), @@ -155,7 +155,7 @@ func (s *ProverTestSuite) TestInitError() { TxSendTimeout: txmgr.DefaultBatcherFlagValues.TxSendTimeout, TxNotInMempoolTimeout: txmgr.DefaultBatcherFlagValues.TxNotInMempoolTimeout, }, - }), "dial tcp:") + })) } func (s *ProverTestSuite) TestOnBlockProposed() {