Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(driver): improve driver implementation (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaikocha committed Mar 15, 2024
1 parent 090a466 commit fbd4d06
Show file tree
Hide file tree
Showing 18 changed files with 123 additions and 177 deletions.
12 changes: 5 additions & 7 deletions driver/anchor_tx_constructor/anchor_tx_constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
const AnchorGasLimit = 250_000

// AnchorTxConstructor is responsible for assembling the anchor transaction (TaikoL2.anchor) in
// each L2 block, which is always the first transaction.
// each L2 block, which must be the first transaction, and its sender must be the golden touch account.
type AnchorTxConstructor struct {
rpc *rpc.Client
goldenTouchAddress common.Address
Expand All @@ -39,11 +39,7 @@ func New(rpc *rpc.Client) (*AnchorTxConstructor, error) {
return nil, fmt.Errorf("invalid golden touch private key %s", encoding.GoldenTouchPrivKey)
}

return &AnchorTxConstructor{
rpc: rpc,
goldenTouchAddress: goldenTouchAddress,
signer: signer,
}, nil
return &AnchorTxConstructor{rpc, goldenTouchAddress, signer}, nil
}

// AssembleAnchorTx assembles a signed TaikoL2.anchor transaction.
Expand All @@ -69,9 +65,11 @@ func (c *AnchorTxConstructor) AssembleAnchorTx(

log.Info(
"Anchor arguments",
"l2Height", l2Height,
"l1Height", l1Height,
"l1Hash", l1Hash,
"stateRoot", l1Header.Root,
"l1Height", l1Height,
"baseFee", baseFee,
"gasUsed", parentGasUsed,
)

Expand Down
12 changes: 5 additions & 7 deletions driver/chain_syncer/beaconsync/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
)

// SyncProgressTracker is responsible for tracking the L2 execution engine's sync progress, after
// a beacon sync is triggered in it, and check whether the L2 execution is not able to sync through P2P (due to no
// a beacon sync is triggered, and check whether the L2 execution is not able to sync through P2P (due to no
// connected peer or some other reasons).
type SyncProgressTracker struct {
// RPC client
Expand Down Expand Up @@ -96,18 +96,16 @@ func (t *SyncProgressTracker) track(ctx context.Context) {

if new(big.Int).SetUint64(headHeight).Cmp(t.lastSyncedVerifiedBlockID) >= 0 {
t.lastProgressedTime = time.Now()
log.Info("L2 execution engine has finished the P2P sync work, all verified blocks synced, "+
"will switch to insert pending blocks one by one",
log.Info(
"L2 execution engine has finished the P2P sync work, all verified blocks synced, "+
"will switch to insert pending blocks one by one",
"lastSyncedVerifiedBlockID", t.lastSyncedVerifiedBlockID,
"lastSyncedVerifiedBlockHash", t.lastSyncedVerifiedBlockHash,
)
return
}

log.Debug(
"L2 execution engine has not started P2P syncing yet",
"timeout", t.timeout,
)
log.Info("L2 execution engine has not started P2P syncing yet", "timeout", t.timeout)
}

defer func() { t.lastSyncProgress = progress }()
Expand Down
15 changes: 5 additions & 10 deletions driver/chain_syncer/beaconsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func NewSyncer(
return &Syncer{ctx, rpc, state, progressTracker}
}

// TriggerBeaconSync triggers the L2 execution engine to start performing a beacon sync.
// TriggerBeaconSync triggers the L2 execution engine to start performing a beacon sync, if the
// latest verified block has changed.
func (s *Syncer) TriggerBeaconSync() error {
blockID, latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx)
if err != nil {
Expand All @@ -56,10 +57,7 @@ func (s *Syncer) TriggerBeaconSync() error {
}
}

status, err := s.rpc.L2Engine.NewPayload(
s.ctx,
latestVerifiedHeadPayload,
)
status, err := s.rpc.L2Engine.NewPayload(s.ctx, latestVerifiedHeadPayload)
if err != nil {
return err
}
Expand All @@ -81,10 +79,7 @@ func (s *Syncer) TriggerBeaconSync() error {
}

// Update sync status.
s.progressTracker.UpdateMeta(
blockID,
latestVerifiedHeadPayload.BlockHash,
)
s.progressTracker.UpdateMeta(blockID, latestVerifiedHeadPayload.BlockHash)

log.Info(
"⛓️ Beacon sync triggered",
Expand All @@ -103,7 +98,7 @@ func (s *Syncer) getVerifiedBlockPayload(ctx context.Context) (*big.Int, *engine
return nil, nil, err
}

blockInfo, err := s.rpc.TaikoL1.GetBlock(&bind.CallOpts{Context: ctx}, stateVars.B.LastVerifiedBlockId)
blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
if err != nil {
return nil, nil, err
}
Expand Down
41 changes: 24 additions & 17 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ func (s *Syncer) onBlockProposed(
"removed", event.Raw.Removed,
)

// Fetch the L2 parent block.
// If the event's timestamp is in the future, we wait until the timestamp is reached, should
// only happen when testing.
if event.Meta.Timestamp > uint64(time.Now().Unix()) {
log.Warn("Future L2 block, waiting", "L2BlockTimestamp", event.Meta.Timestamp, "now", time.Now().Unix())
time.Sleep(time.Until(time.Unix(int64(event.Meta.Timestamp), 0)))
}

// Fetch the L2 parent block, if the node is just finished a P2P sync, we simply use the tracker's
// last synced verified block as the parent, otherwise, we fetch the parent block from L2 EE.
var (
parent *types.Header
err error
Expand All @@ -206,18 +214,23 @@ func (s *Syncer) onBlockProposed(
} else {
parent, err = s.rpc.L2ParentByBlockID(ctx, event.BlockId)
}

if err != nil {
return fmt.Errorf("failed to fetch L2 parent block: %w", err)
}

log.Debug("Parent block", "height", parent.Number, "hash", parent.Hash())
log.Debug(
"Parent block",
"height", parent.Number,
"hash", parent.Hash(),
"beaconSyncTriggered", s.progressTracker.Triggered(),
)

tx, err := s.rpc.L1.TransactionInBlock(ctx, event.Raw.BlockHash, event.Raw.TxIndex)
if err != nil {
return fmt.Errorf("failed to fetch original TaikoL1.proposeBlock transaction: %w", err)
}

// Decode transactions list.
var txListDecoder txlistfetcher.TxListFetcher
if event.Meta.BlobUsed {
txListDecoder = txlistfetcher.NewBlobTxListFetcher(s.rpc)
Expand All @@ -234,18 +247,6 @@ func (s *Syncer) onBlockProposed(
}
}

l1Origin := &rawdb.L1Origin{
BlockID: event.BlockId,
L2BlockHash: common.Hash{}, // Will be set by taiko-geth.
L1BlockHeight: new(big.Int).SetUint64(event.Raw.BlockNumber),
L1BlockHash: event.Raw.BlockHash,
}

if event.Meta.Timestamp > uint64(time.Now().Unix()) {
log.Warn("Future L2 block, waiting", "L2BlockTimestamp", event.Meta.Timestamp, "now", time.Now().Unix())
time.Sleep(time.Until(time.Unix(int64(event.Meta.Timestamp), 0)))
}

// If the transactions list is invalid, we simply insert an empty L2 block.
if !s.txListValidator.ValidateTxList(event.BlockId, txListBytes, event.Meta.BlobUsed) {
log.Info("Invalid transactions list, insert an empty L2 block instead", "blockID", event.BlockId)
Expand All @@ -258,7 +259,12 @@ func (s *Syncer) onBlockProposed(
parent,
s.state.GetHeadBlockID(),
txListBytes,
l1Origin,
&rawdb.L1Origin{
BlockID: event.BlockId,
L2BlockHash: common.Hash{}, // Will be set by taiko-geth.
L1BlockHeight: new(big.Int).SetUint64(event.Raw.BlockNumber),
L1BlockHash: event.Raw.BlockHash,
},
)
if err != nil {
return fmt.Errorf("failed to insert new head to L2 execution engine: %w", err)
Expand Down Expand Up @@ -350,6 +356,7 @@ func (s *Syncer) insertNewHead(
return nil, fmt.Errorf("failed to create TaikoL2.anchor transaction: %w", err)
}

// Insert the anchor transaction at the head of the transactions list
txList = append([]*types.Transaction{anchorTx}, txList...)
if txListBytes, err = rlp.EncodeToBytes(txList); err != nil {
log.Error("Encode txList error", "blockID", event.BlockId, "error", err)
Expand Down Expand Up @@ -487,7 +494,7 @@ func (s *Syncer) checkLastVerifiedBlockMismatch(ctx context.Context) (bool, erro
return false, nil
}

blockInfo, err := s.rpc.TaikoL1.GetBlock(&bind.CallOpts{Context: ctx}, stateVars.B.LastVerifiedBlockId)
blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
if err != nil {
return false, err
}
Expand Down
28 changes: 21 additions & 7 deletions driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type L2ChainSyncer struct {
progressTracker *beaconsync.SyncProgressTracker

// If this flag is activated, will try P2P beacon sync if current node is behind of the protocol's
// latest verified block head
// the latest verified block head
p2pSyncVerifiedBlocks bool
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *L2ChainSyncer) Sync(l1End *types.Header) error {
"L2 head information",
"number", l2Head.Number,
"hash", l2Head.Hash(),
"LastSyncedVerifiedBlockID", s.progressTracker.LastSyncedVerifiedBlockID(),
"lastSyncedVerifiedBlockID", s.progressTracker.LastSyncedVerifiedBlockID(),
"lastSyncedVerifiedBlockHash", s.progressTracker.LastSyncedVerifiedBlockHash(),
)

Expand Down Expand Up @@ -126,10 +126,12 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint
// If latest verified head height is equal to L2 execution engine's synced head height minus one,
// we also mark the triggered P2P sync progress as finished to prevent a potential `InsertBlockWithoutSetHead` in
// execution engine, which may cause errors since we do not pass all transactions in ExecutePayload when calling
// NewPayloadV1.
// `NewPayloadV1`.
verifiedHeightToCompare--
}

// If the L2 execution engine's chain is behind of the protocol's latest verified block head,
// we should keep the beacon sync.
if s.state.GetL2Head().Number.Uint64() < verifiedHeightToCompare {
return false
}
Expand All @@ -142,16 +144,28 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint
}

// needNewBeaconSyncTriggered checks whether the current L2 execution engine needs to trigger
// another new beacon sync.
// another new beacon sync, the following conditions should be met:
// 1. The `P2PSyncVerifiedBlocks` flag is set.
// 2. The protocol's latest verified block head is not zero.
// 3. The L2 execution engine's chain is behind of the protocol's latest verified block head.
// 4. The L2 execution engine's chain have met a sync timeout issue.
func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (bool, error) {
// If the flag is not set, we simply return false.
if !s.p2pSyncVerifiedBlocks {
return false, nil
}

stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: s.ctx})
if err != nil {
return false, err
}

return s.p2pSyncVerifiedBlocks &&
stateVars.B.LastVerifiedBlockId > 0 &&
!s.AheadOfProtocolVerifiedHead(stateVars.B.LastVerifiedBlockId) &&
// If the protocol's latest verified block head is zero, we simply return false.
if stateVars.B.LastVerifiedBlockId == 0 {
return false, nil
}

return !s.AheadOfProtocolVerifiedHead(stateVars.B.LastVerifiedBlockId) &&
!s.progressTracker.OutOfSync(), nil
}

Expand Down
27 changes: 14 additions & 13 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) {

// Start starts the driver instance.
func (d *Driver) Start() error {
d.wg.Add(3)
go d.eventLoop()
go d.reportProtocolStatus()
go d.exchangeTransitionConfigLoop()
Expand All @@ -109,6 +108,7 @@ func (d *Driver) Close(_ context.Context) {

// eventLoop starts the main loop of a L2 execution engine's driver.
func (d *Driver) eventLoop() {
d.wg.Add(1)
defer d.wg.Done()

// reqSync requests performing a synchronising operation, won't block
Expand Down Expand Up @@ -152,30 +152,33 @@ func (d *Driver) doSync() error {
return nil
}

l1Head := d.state.GetL1Head()

if err := d.l2ChainSyncer.Sync(l1Head); err != nil {
if err := d.l2ChainSyncer.Sync(d.state.GetL1Head()); err != nil {
log.Error("Process new L1 blocks error", "error", err)
return err
}

return nil
}

// ChainSyncer returns the driver's chain syncer.
// ChainSyncer returns the driver's chain syncer, this method
// should only be used for testing.
func (d *Driver) ChainSyncer() *chainSyncer.L2ChainSyncer {
return d.l2ChainSyncer
}

// reportProtocolStatus reports some protocol status intervally.
func (d *Driver) reportProtocolStatus() {
ticker := time.NewTicker(protocolStatusReportInterval)
var (
ticker = time.NewTicker(protocolStatusReportInterval)
maxNumBlocks uint64
)
d.wg.Add(1)

defer func() {
ticker.Stop()
d.wg.Done()
}()

var maxNumBlocks uint64
if err := backoff.Retry(
func() error {
if d.ctx.Err() != nil {
Expand Down Expand Up @@ -220,6 +223,8 @@ func (d *Driver) reportProtocolStatus() {
// L2 execution engine.
func (d *Driver) exchangeTransitionConfigLoop() {
ticker := time.NewTicker(exchangeTransitionConfigInterval)
d.wg.Add(1)

defer func() {
ticker.Stop()
d.wg.Done()
Expand All @@ -238,13 +243,9 @@ func (d *Driver) exchangeTransitionConfigLoop() {
})
if err != nil {
log.Error("Failed to exchange Transition Configuration", "error", err)
return
} else {
log.Debug("Exchanged transition config", "transitionConfig", tc)
}

log.Debug(
"Exchanged transition config",
"transitionConfig", tc,
)
}()
}
}
Expand Down
11 changes: 4 additions & 7 deletions driver/state/l1_current.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -28,16 +27,14 @@ func (s *State) SetL1Current(h *types.Header) {

// ResetL1Current resets the l1Current cursor to the L1 height which emitted a
// BlockProposed event with given blockID / blockHash.
func (s *State) ResetL1Current(
ctx context.Context,
blockID *big.Int,
) error {
func (s *State) ResetL1Current(ctx context.Context, blockID *big.Int) error {
if blockID == nil {
return fmt.Errorf("empty block ID")
}

log.Info("Reset L1 current cursor", "blockID", blockID)

// If blockID is zero, reset to genesis L1 height.
if blockID.Cmp(common.Big0) == 0 {
l1Current, err := s.rpc.L1.HeaderByNumber(ctx, s.GenesisL1Height)
if err != nil {
Expand All @@ -47,11 +44,11 @@ func (s *State) ResetL1Current(
return nil
}

blockInfo, err := s.rpc.TaikoL1.GetBlock(&bind.CallOpts{Context: ctx}, blockID.Uint64())
// Fetch the block info from TaikoL1 contract, and set the L1 height.
blockInfo, err := s.rpc.GetL2BlockInfo(ctx, blockID)
if err != nil {
return err
}

l1Current, err := s.rpc.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(blockInfo.Blk.ProposedIn))
if err != nil {
return err
Expand Down
Loading

0 comments on commit fbd4d06

Please sign in to comment.