Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(proposer): improve ProposeOp (#787)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaikocha committed May 3, 2024
1 parent 3a01470 commit 29ce28f
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 165 deletions.
1 change: 0 additions & 1 deletion driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() {

// Reorg back to l2Head1
s.RevertL1Snapshot(testnetL1SnapshotID)
s.IncreaseTime(uint64((3 * time.Second).Seconds()))
s.InitProposer()

// Because of evm_revert operation, the nonce of the proposer need to be adjusted.
Expand Down
12 changes: 3 additions & 9 deletions internal/testutils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ import (
func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) {
invalidTxListBytes := RandomBytes(256)

for _, err := range proposer.ProposeTxLists(context.Background(), [][]byte{invalidTxListBytes}) {
s.Nil(err)
}
s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1))
}

func (s *ClientTestSuite) proposeEmptyBlockOp(ctx context.Context, proposer Proposer) {
emptyTxListBytes, err := rlp.EncodeToBytes(types.Transactions{})
s.Nil(err)
for _, err := range proposer.ProposeTxLists(ctx, [][]byte{emptyTxListBytes}) {
s.Nil(err)
}
s.Nil(proposer.ProposeTxList(ctx, emptyTxListBytes, 0))
}

func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks(
Expand All @@ -67,9 +63,7 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks(
encoded, err := rlp.EncodeToBytes(emptyTxs)
s.Nil(err)

for _, err := range proposer.ProposeTxLists(context.Background(), [][]byte{encoded}) {
s.Nil(err)
}
s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0))

s.ProposeInvalidTxListBytes(proposer)

Expand Down
6 changes: 5 additions & 1 deletion internal/testutils/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,9 @@ type BlobSyncer interface {
type Proposer interface {
utils.SubcommandApplication
ProposeOp(ctx context.Context) error
ProposeTxLists(ctx context.Context, txListsBytes [][]byte) []error
ProposeTxList(
ctx context.Context,
txListBytes []byte,
txNum uint,
) error
}
30 changes: 30 additions & 0 deletions pkg/rpc/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,3 +724,33 @@ func (c *Client) GetTaikoDataSlotBByNumber(ctx context.Context, number uint64) (

return nil, fmt.Errorf("failed to get state variables by block number %d", number)
}

// WaitL1NewPendingTransaction waits until the L1 account has a new pending transaction.
func (c *Client) WaitL1NewPendingTransaction(
ctx context.Context,
address common.Address,
oldPendingNonce uint64,
) error {
ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout)
defer cancel()

ticker := time.NewTicker(rpcPollingInterval)
defer ticker.Stop()

for ; true; <-ticker.C {
if ctxWithTimeout.Err() != nil {
return ctxWithTimeout.Err()
}

nonce, err := c.L1.PendingNonceAt(ctxWithTimeout, address)
if err != nil {
return err
}

if nonce != oldPendingNonce {
break
}
}

return nil
}
137 changes: 58 additions & 79 deletions proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-challenger/sender"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -18,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"

"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/bindings/encoding"
Expand Down Expand Up @@ -60,7 +59,7 @@ type Proposer struct {

lastProposedAt time.Time

txSender *sender.TxSender
txmgr *txmgr.SimpleTxManager

ctx context.Context
wg sync.WaitGroup
Expand Down Expand Up @@ -104,16 +103,14 @@ func (p *Proposer) InitFromConfig(ctx context.Context, cfg *Config) (err error)
return err
}

txmgr, err := txmgr.NewSimpleTxManager(
if p.txmgr, err = txmgr.NewSimpleTxManager(
"proposer",
log.Root(),
&metrics.TxMgrMetrics,
*cfg.TxmgrConfigs,
)
if err != nil {
); err != nil {
return err
}
p.txSender = sender.NewTxSender(p.ctx, log.Root(), txmgr, p.Config.MaxProposedTxListsPerEpoch)

if p.proverSelector, err = selector.NewETHFeeEOASelector(
&protocolConfigs,
Expand Down Expand Up @@ -277,10 +274,7 @@ func (p *Proposer) fetchPoolContent(filterPoolContent bool) ([]types.Transaction
// and then proposing them to TaikoL1 contract.
func (p *Proposer) ProposeOp(ctx context.Context) error {
// Check if it's time to propose unfiltered pool content.
var (
filterPoolContent = time.Now().Before(p.lastProposedAt.Add(p.MinProposingInternal))
txListsBytes = make([][]byte, 0)
)
filterPoolContent := time.Now().Before(p.lastProposedAt.Add(p.MinProposingInternal))

// Wait until L2 execution engine is synced at first.
if err := p.rpc.WaitTillL2ExecutionEngineSynced(ctx); err != nil {
Expand All @@ -293,7 +287,6 @@ func (p *Proposer) ProposeOp(ctx context.Context) error {
"lastProposedAt", p.lastProposedAt,
)

// Fetch the pool content with the given limits.
txLists, err := p.fetchPoolContent(filterPoolContent)
if err != nil {
return err
Expand All @@ -304,92 +297,78 @@ func (p *Proposer) ProposeOp(ctx context.Context) error {
return nil
}

g, gCtx := errgroup.WithContext(ctx)
// Propose all L2 transactions lists.
for i, txs := range txLists {
if i >= int(p.MaxProposedTxListsPerEpoch) {
return nil
}

txListBytes, err := rlp.EncodeToBytes(txs)
for _, txs := range txLists[:utils.Min(p.MaxProposedTxListsPerEpoch, uint64(len(txLists)))] {
nonce, err := p.rpc.L1.PendingNonceAt(ctx, p.proposerAddress)
if err != nil {
return fmt.Errorf("failed to encode transactions: %w", err)
log.Error("Failed to get proposer nonce", "error", err)
break
}

txListsBytes = append(txListsBytes, txListBytes)
}
log.Info("Proposer current pending nonce", "nonce", nonce)

for i, err := range p.ProposeTxLists(ctx, txListsBytes) {
if err != nil {
log.Error(
"Failed to send TaikoL1.proposeBlock transaction",
"index", i,
"error", err,
)
continue
}

metrics.ProposerProposedTxListsCounter.Add(1)
metrics.ProposerProposedTxsCounter.Add(float64(len(txLists[i])))
g.Go(func() error {
txListBytes, err := rlp.EncodeToBytes(txs)
if err != nil {
return fmt.Errorf("failed to encode transactions: %w", err)
}
if err := p.ProposeTxList(gCtx, txListBytes, uint(txs.Len())); err != nil {
return err
}
p.lastProposedAt = time.Now()
return nil
})

log.Info("📝 Propose transactions succeeded", "txs", len(txLists[i]))
p.lastProposedAt = time.Now()
if err := p.rpc.WaitL1NewPendingTransaction(ctx, p.proposerAddress, nonce); err != nil {
log.Error("Failed to wait for new pending transaction", "error", err)
}
}
if err := g.Wait(); err != nil {
return err
}

return nil
}

// ProposeTxLists proposes the given transaction lists to TaikoL1 contract.
func (p *Proposer) ProposeTxLists(ctx context.Context, txListsBytes [][]byte) []error {
txCandidates := make([]txmgr.TxCandidate, 0)

for i, txListBytes := range txListsBytes {
compressedTxListBytes, err := utils.Compress(txListBytes)
if err != nil {
log.Error("Failed to compress transactions list", "index", i, "error", err)
break
}
// ProposeTxList proposes the given transactions list to TaikoL1 smart contract.
func (p *Proposer) ProposeTxList(
ctx context.Context,
txListBytes []byte,
txNum uint,
) error {
compressedTxListBytes, err := utils.Compress(txListBytes)
if err != nil {
return err
}

candidate, err := p.txBuilder.Build(
ctx,
p.tierFees,
p.IncludeParentMetaHash,
compressedTxListBytes,
)
if err != nil {
log.Error("Failed to build TaikoL1.proposeBlock transaction", "error", err)
break
}
txCandidate, err := p.txBuilder.Build(
ctx,
p.tierFees,
p.IncludeParentMetaHash,
compressedTxListBytes,
)
if err != nil {
log.Warn("Failed to build TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err))
return err
}

txCandidates = append(txCandidates, *candidate)
receipt, err := p.txmgr.Send(ctx, *txCandidate)
if err != nil {
log.Warn("Failed to send TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err))
return err
}

if len(txCandidates) == 0 {
return []error{}
if receipt.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("failed to propose block: %s", receipt.TxHash.Hex())
}

// Send the transactions to the TaikoL1 contract, and if any of them fails, try
// to parse the custom error.
errors := p.txSender.SendAndWaitDetailed("proposeBlock", txCandidates...)
for i, err := range errors {
if err == nil {
continue
}
log.Info("📝 Propose transactions succeeded", "txs", txNum)

// If a transaction is reverted on chain, the error string returned by txSender will like this:
// fmt.Errorf("%w purpose: %v hash: %v", ErrTransactionReverted, txPurpose, rcpt.Receipt.TxHash)
// Then we try parsing the custom error for more details in log.
if strings.Contains(err.Error(), "purpose: ") && strings.Contains(err.Error(), "hash: ") {
txHash := strings.Split(err.Error(), "hash: ")[1]
receipt, err := p.rpc.L1.TransactionReceipt(ctx, common.HexToHash(txHash))
if err != nil {
log.Error("Failed to fetch receipt", "txHash", txHash, "error", err)
continue
}
errors[i] = encoding.TryParsingCustomErrorFromReceipt(ctx, p.rpc.L1, p.proposerAddress, receipt)
}
}
metrics.ProposerProposedTxListsCounter.Add(1)
metrics.ProposerProposedTxsCounter.Add(float64(txNum))

return errors
return nil
}

// updateProposingTicker updates the internal proposing timer.
Expand Down
Loading

0 comments on commit 29ce28f

Please sign in to comment.