Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(driver): improve driver implementation (#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp authored Apr 2, 2024
1 parent 2c2b996 commit 55717c8
Show file tree
Hide file tree
Showing 21 changed files with 104 additions and 121 deletions.
15 changes: 5 additions & 10 deletions cmd/utils/sub_command.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package utils

import (
"context"
"os"
"os/signal"
"syscall"
Expand All @@ -14,20 +13,17 @@ import (
)

type SubcommandApplication interface {
InitFromCli(context.Context, *cli.Context) error
InitFromCli(*cli.Context) error
Name() string
Start() error
Close(context.Context)
Close()
}

func SubcommandAction(app SubcommandApplication) cli.ActionFunc {
return func(c *cli.Context) error {
logger.InitLogger(c)

ctx, ctxClose := context.WithCancel(context.Background())
defer ctxClose()

if err := app.InitFromCli(ctx, c); err != nil {
if err := app.InitFromCli(c); err != nil {
return err
}

Expand All @@ -38,14 +34,13 @@ func SubcommandAction(app SubcommandApplication) cli.ActionFunc {
return err
}

if err := metrics.Serve(ctx, c); err != nil {
if err := metrics.Serve(c); err != nil {
log.Error("Starting metrics server error", "error", err)
return err
}

defer func() {
ctxClose()
app.Close(ctx)
app.Close()
log.Info("Application stopped", "name", app.Name())
}()

Expand Down
28 changes: 11 additions & 17 deletions driver/chain_syncer/beaconsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -36,13 +35,13 @@ func NewSyncer(

// TriggerBeaconSync triggers the L2 execution engine to start performing a beacon sync, if the
// latest verified block has changed.
func (s *Syncer) TriggerBeaconSync() error {
blockID, latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx)
func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx, blockID)
if err != nil {
return err
}

if !s.progressTracker.HeadChanged(blockID) {
if !s.progressTracker.HeadChanged(new(big.Int).SetUint64(blockID)) {
log.Debug("Verified head has not changed", "blockID", blockID, "hash", latestVerifiedHeadPayload.BlockHash)
return nil
}
Expand Down Expand Up @@ -79,7 +78,7 @@ func (s *Syncer) TriggerBeaconSync() error {
}

// Update sync status.
s.progressTracker.UpdateMeta(blockID, latestVerifiedHeadPayload.BlockHash)
s.progressTracker.UpdateMeta(new(big.Int).SetUint64(blockID), latestVerifiedHeadPayload.BlockHash)

log.Info(
"⛓️ Beacon sync triggered",
Expand All @@ -92,29 +91,24 @@ func (s *Syncer) TriggerBeaconSync() error {

// getVerifiedBlockPayload fetches the latest verified block's header, and converts it to an Engine API executable data,
// which will be used to let the node start beacon syncing.
func (s *Syncer) getVerifiedBlockPayload(ctx context.Context) (*big.Int, *engine.ExecutableData, error) {
stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx})
func (s *Syncer) getVerifiedBlockPayload(ctx context.Context, blockID uint64) (*engine.ExecutableData, error) {
blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(blockID))
if err != nil {
return nil, nil, err
return nil, err
}

blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(blockID))
if err != nil {
return nil, nil, err
}

header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
if err != nil {
return nil, nil, err
return nil, err
}

if header.Hash() != blockInfo.Ts.BlockHash {
return nil, nil, fmt.Errorf(
return nil, fmt.Errorf(
"latest verified block hash mismatch: %s != %s", header.Hash(), common.BytesToHash(blockInfo.Ts.BlockHash[:]),
)
}

log.Info("Latest verified block header retrieved", "hash", header.Hash())

return new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId), encoding.ToExecutableData(header), nil
return encoding.ToExecutableData(header), nil
}
7 changes: 4 additions & 3 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func NewSyncer(

// ProcessL1Blocks fetches all `TaikoL1.BlockProposed` events between given
// L1 block heights, and then tries inserting them into L2 execution engine's blockchain.
func (s *Syncer) ProcessL1Blocks(ctx context.Context, l1End *types.Header) error {
func (s *Syncer) ProcessL1Blocks(ctx context.Context) error {
for {
if err := s.processL1Blocks(ctx, l1End); err != nil {
if err := s.processL1Blocks(ctx); err != nil {
return err
}

Expand All @@ -98,7 +98,8 @@ func (s *Syncer) ProcessL1Blocks(ctx context.Context, l1End *types.Header) error

// processL1Blocks is the inner method which responsible for processing
// all new L1 blocks.
func (s *Syncer) processL1Blocks(ctx context.Context, l1End *types.Header) error {
func (s *Syncer) processL1Blocks(ctx context.Context) error {
l1End := s.state.GetL1Head()
startL1Current := s.state.GetL1Current()
// If there is a L1 reorg, sometimes this will happen.
if startL1Current.Number.Uint64() >= l1End.Number.Uint64() && startL1Current.Hash() != l1End.Hash() {
Expand Down
8 changes: 2 additions & 6 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() {
}

func (s *CalldataSyncerTestSuite) TestProcessL1Blocks() {
head, err := s.s.rpc.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)
s.Nil(s.s.ProcessL1Blocks(context.Background(), head))
s.Nil(s.s.ProcessL1Blocks(context.Background()))
}

func (s *CalldataSyncerTestSuite) TestProcessL1BlocksReorg() {
head, err := s.s.rpc.L1.HeaderByNumber(context.Background(), nil)
s.ProposeAndInsertEmptyBlocks(s.p, s.s)
s.Nil(err)
s.Nil(s.s.ProcessL1Blocks(context.Background(), head))
s.Nil(s.s.ProcessL1Blocks(context.Background()))
}

func (s *CalldataSyncerTestSuite) TestOnBlockProposed() {
Expand Down
19 changes: 9 additions & 10 deletions driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"

"github.com/taikoxyz/taiko-client/driver/chain_syncer/beaconsync"
Expand Down Expand Up @@ -64,16 +63,16 @@ func New(
}

// Sync performs a sync operation to L2 execution engine's local chain.
func (s *L2ChainSyncer) Sync(l1End *types.Header) error {
needNewBeaconSyncTriggered, err := s.needNewBeaconSyncTriggered()
func (s *L2ChainSyncer) Sync() error {
blockID, needNewBeaconSyncTriggered, err := s.needNewBeaconSyncTriggered()
if err != nil {
return err
}
// If current L2 execution engine's chain is behind of the protocol's latest verified block head, and the
// `P2PSyncVerifiedBlocks` flag is set, try triggering a beacon sync in L2 execution engine to catch up the
// latest verified block head.
if needNewBeaconSyncTriggered {
if err := s.beaconSyncer.TriggerBeaconSync(); err != nil {
if err := s.beaconSyncer.TriggerBeaconSync(blockID); err != nil {
return fmt.Errorf("trigger beacon sync error: %w", err)
}

Expand Down Expand Up @@ -113,7 +112,7 @@ func (s *L2ChainSyncer) Sync(l1End *types.Header) error {
}

// Insert the proposed block one by one.
return s.calldataSyncer.ProcessL1Blocks(s.ctx, l1End)
return s.calldataSyncer.ProcessL1Blocks(s.ctx)
}

// AheadOfProtocolVerifiedHead checks whether the L2 chain is ahead of verified head in protocol.
Expand Down Expand Up @@ -150,23 +149,23 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint
// 2. The protocol's latest verified block head is not zero.
// 3. The L2 execution engine's chain is behind of the protocol's latest verified block head.
// 4. The L2 execution engine's chain have met a sync timeout issue.
func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (bool, error) {
func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (uint64, bool, error) {
// If the flag is not set, we simply return false.
if !s.p2pSyncVerifiedBlocks {
return false, nil
return 0, false, nil
}

stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: s.ctx})
if err != nil {
return false, err
return 0, false, err
}

// If the protocol's latest verified block head is zero, we simply return false.
if stateVars.B.LastVerifiedBlockId == 0 {
return false, nil
return 0, false, nil
}

return !s.AheadOfProtocolVerifiedHead(stateVars.B.LastVerifiedBlockId) &&
return stateVars.B.LastVerifiedBlockId, !s.AheadOfProtocolVerifiedHead(stateVars.B.LastVerifiedBlockId) &&
!s.progressTracker.OutOfSync(), nil
}

Expand Down
4 changes: 1 addition & 3 deletions driver/chain_syncer/chain_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ func (s *ChainSyncerTestSuite) TestGetInnerSyncers() {
}

func (s *ChainSyncerTestSuite) TestSync() {
head, err := s.RPCClient.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)
s.Nil(s.s.Sync(head))
s.Nil(s.s.Sync())
}

func (s *ChainSyncerTestSuite) TestAheadOfProtocolVerifiedHead2() {
Expand Down
3 changes: 1 addition & 2 deletions driver/config_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package driver

import (
"context"
"os"
"time"

Expand Down Expand Up @@ -38,7 +37,7 @@ func (s *DriverTestSuite) TestNewConfigFromCliContext() {
s.NotEmpty(c.JwtSecret)
s.True(c.P2PSyncVerifiedBlocks)
s.Equal(l2CheckPoint, c.L2CheckPoint)
s.NotNil(new(Driver).InitFromCli(context.Background(), ctx))
s.NotNil(new(Driver).InitFromCli(ctx))

return err
}
Expand Down
41 changes: 19 additions & 22 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,26 @@ type Driver struct {
l2ChainSyncer *chainSyncer.L2ChainSyncer
state *state.State

l1HeadCh chan *types.Header
l1HeadSub event.Subscription
syncNotify chan struct{}
l1HeadCh chan *types.Header
l1HeadSub event.Subscription

ctx context.Context
wg sync.WaitGroup
}

// InitFromCli initializes the given driver instance based on the command line flags.
func (d *Driver) InitFromCli(ctx context.Context, c *cli.Context) error {
func (d *Driver) InitFromCli(c *cli.Context) error {
cfg, err := NewConfigFromCliContext(c)
if err != nil {
return err
}

return d.InitFromConfig(ctx, cfg)
return d.InitFromConfig(c.Context, cfg)
}

// InitFromConfig initializes the driver instance based on the given configurations.
func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) {
d.l1HeadCh = make(chan *types.Header, 1024)
d.syncNotify = make(chan struct{}, 1)
d.ctx = ctx
d.Config = cfg

Expand Down Expand Up @@ -101,7 +99,7 @@ func (d *Driver) Start() error {
}

// Close closes the driver instance.
func (d *Driver) Close(_ context.Context) {
func (d *Driver) Close() {
d.l1HeadSub.Unsubscribe()
d.state.Close()
d.wg.Wait()
Expand All @@ -112,11 +110,12 @@ func (d *Driver) eventLoop() {
d.wg.Add(1)
defer d.wg.Done()

syncNotify := make(chan struct{}, 1)
// reqSync requests performing a synchronising operation, won't block
// if we are already synchronising.
reqSync := func() {
select {
case d.syncNotify <- struct{}{}:
case syncNotify <- struct{}{}:
default:
}
}
Expand All @@ -138,7 +137,7 @@ func (d *Driver) eventLoop() {
select {
case <-d.ctx.Done():
return
case <-d.syncNotify:
case <-syncNotify:
doSyncWithBackoff()
case <-d.l1HeadCh:
reqSync()
Expand All @@ -156,7 +155,7 @@ func (d *Driver) doSync() error {
return nil
}

if err := d.l2ChainSyncer.Sync(d.state.GetL1Head()); err != nil {
if err := d.l2ChainSyncer.Sync(); err != nil {
log.Error("Process new L1 blocks error", "error", err)
return err
}
Expand Down Expand Up @@ -239,18 +238,16 @@ func (d *Driver) exchangeTransitionConfigLoop() {
case <-d.ctx.Done():
return
case <-ticker.C:
func() {
tc, err := d.rpc.L2Engine.ExchangeTransitionConfiguration(d.ctx, &engine.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(common.Big0),
TerminalBlockHash: common.Hash{},
TerminalBlockNumber: 0,
})
if err != nil {
log.Error("Failed to exchange Transition Configuration", "error", err)
} else {
log.Debug("Exchanged transition config", "transitionConfig", tc)
}
}()
tc, err := d.rpc.L2Engine.ExchangeTransitionConfiguration(d.ctx, &engine.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(common.Big0),
TerminalBlockHash: common.Hash{},
TerminalBlockNumber: 0,
})
if err != nil {
log.Error("Failed to exchange Transition Configuration", "error", err)
} else {
log.Debug("Exchanged transition config", "transitionConfig", tc)
}
}
}
}
Expand Down
Loading

0 comments on commit 55717c8

Please sign in to comment.