Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(proposer): handle transaction replacement underpriced error #322

Merged
merged 6 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/flags/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ var (
}
ProposeBlockTxGasLimit = &cli.Uint64Flag{
Name: "proposeBlockTxGasLimit",
Usage: "Gas limit will be used for TaikoL1.proposeBlock transactions",
Category: proposerCategory,
}
ProposeBlockTxReplacementMultiplier = &cli.Uint64Flag{
Name: "proposeBlockTxReplacementMultiplier",
Value: 2,
Usage: "Gas tip multiplier when replacing a TaikoL1.proposeBlock transaction with same nonce",
Category: proposerCategory,
}
)
Expand All @@ -73,4 +80,5 @@ var ProposerFlags = MergeFlags(CommonFlags, []cli.Flag{
MinBlockGasLimit,
MaxProposedTxListsPerEpoch,
ProposeBlockTxGasLimit,
ProposeBlockTxReplacementMultiplier,
})
41 changes: 41 additions & 0 deletions pkg/rpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -119,6 +120,46 @@ func NeedNewProof(
return false, nil
}

type AccountPoolContent map[string]map[string]*types.Transaction

// ContentFrom fetches a given account's transactions list from a node's transactions pool.
func ContentFrom(
ctx context.Context,
rawRPC *rpc.Client,
address common.Address,
) (AccountPoolContent, error) {
var result AccountPoolContent
return result, rawRPC.CallContext(
ctx,
&result,
"txpool_contentFrom",
address,
)
}

// GetPendingTxByNonce tries to retrieve a pending transaction with a given nonce in a node's mempool.
func GetPendingTxByNonce(
ctx context.Context,
cli *Client,
address common.Address,
nonce uint64,
) (*types.Transaction, error) {
content, err := ContentFrom(ctx, cli.L1RawRPC, address)
if err != nil {
return nil, err
}

for _, txMap := range content {
for txNonce, tx := range txMap {
if txNonce == strconv.Itoa(int(nonce)) {
return tx, nil
}
}
}

return nil, nil
}

// SetHead makes a `debug_setHead` RPC call to set the chain's head, should only be used
// for testing purpose.
func SetHead(ctx context.Context, rpc *rpc.Client, headNum *big.Int) error {
Expand Down
38 changes: 38 additions & 0 deletions pkg/rpc/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package rpc

import (
"context"
"os"
"strconv"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
)

Expand All @@ -31,3 +34,38 @@ func TestStringToBytes32(t *testing.T) {
require.Equal(t, [32]byte{}, StringToBytes32(""))
require.Equal(t, [32]byte{0x61, 0x62, 0x63}, StringToBytes32("abc"))
}

func TestL1ContentFrom(t *testing.T) {
client := newTestClient(t)
l2Head, err := client.L2.HeaderByNumber(context.Background(), nil)
require.Nil(t, err)

baseFee, err := client.TaikoL2.GetBasefee(nil, 0, 60000000, uint32(l2Head.GasUsed))
require.Nil(t, err)

testAddrPrivKey, err := crypto.ToECDSA(common.Hex2Bytes(os.Getenv("L1_PROPOSER_PRIVATE_KEY")))
require.Nil(t, err)

testAddr := crypto.PubkeyToAddress(testAddrPrivKey.PublicKey)

nonce, err := client.L2.PendingNonceAt(context.Background(), testAddr)
require.Nil(t, err)

tx := types.NewTransaction(
nonce,
testAddr,
common.Big1,
100000,
baseFee,
[]byte{},
)
signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(client.L2ChainID), testAddrPrivKey)
require.Nil(t, err)
require.Nil(t, client.L2.SendTransaction(context.Background(), signedTx))

content, err := ContentFrom(context.Background(), client.L2RawRPC, testAddr)
require.Nil(t, err)

require.NotZero(t, len(content["pending"]))
require.Equal(t, signedTx.Nonce(), content["pending"][strconv.Itoa(int(signedTx.Nonce()))].Nonce())
}
66 changes: 38 additions & 28 deletions proposer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import (

// Config contains all configurations to initialize a Taiko proposer.
type Config struct {
L1Endpoint string
L2Endpoint string
TaikoL1Address common.Address
TaikoL2Address common.Address
L1ProposerPrivKey *ecdsa.PrivateKey
L2SuggestedFeeRecipient common.Address
ProposeInterval *time.Duration
CommitSlot uint64
LocalAddresses []common.Address
ProposeEmptyBlocksInterval *time.Duration
MinBlockGasLimit uint64
MaxProposedTxListsPerEpoch uint64
ProposeBlockTxGasLimit *uint64
BackOffRetryInterval time.Duration
L1Endpoint string
L2Endpoint string
TaikoL1Address common.Address
TaikoL2Address common.Address
L1ProposerPrivKey *ecdsa.PrivateKey
L2SuggestedFeeRecipient common.Address
ProposeInterval *time.Duration
CommitSlot uint64
LocalAddresses []common.Address
ProposeEmptyBlocksInterval *time.Duration
MinBlockGasLimit uint64
MaxProposedTxListsPerEpoch uint64
ProposeBlockTxGasLimit *uint64
BackOffRetryInterval time.Duration
ProposeBlockTxReplacementMultiplier uint64
}

// NewConfigFromCliContext initializes a Config instance from
Expand Down Expand Up @@ -80,20 +81,29 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
proposeBlockTxGasLimit = &gasLimit
}

proposeBlockTxReplacementMultiplier := c.Uint64(flags.ProposeBlockTxReplacementMultiplier.Name)
if proposeBlockTxReplacementMultiplier == 0 {
return nil, fmt.Errorf(
"invalid --proposeBlockTxReplacementMultiplier value: %d",
proposeBlockTxReplacementMultiplier,
)
}

return &Config{
L1Endpoint: c.String(flags.L1WSEndpoint.Name),
L2Endpoint: c.String(flags.L2HTTPEndpoint.Name),
TaikoL1Address: common.HexToAddress(c.String(flags.TaikoL1Address.Name)),
TaikoL2Address: common.HexToAddress(c.String(flags.TaikoL2Address.Name)),
L1ProposerPrivKey: l1ProposerPrivKey,
L2SuggestedFeeRecipient: common.HexToAddress(l2SuggestedFeeRecipient),
ProposeInterval: proposingInterval,
CommitSlot: c.Uint64(flags.CommitSlot.Name),
LocalAddresses: localAddresses,
ProposeEmptyBlocksInterval: proposeEmptyBlocksInterval,
MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name),
MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name),
ProposeBlockTxGasLimit: proposeBlockTxGasLimit,
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
L1Endpoint: c.String(flags.L1WSEndpoint.Name),
L2Endpoint: c.String(flags.L2HTTPEndpoint.Name),
TaikoL1Address: common.HexToAddress(c.String(flags.TaikoL1Address.Name)),
TaikoL2Address: common.HexToAddress(c.String(flags.TaikoL2Address.Name)),
L1ProposerPrivKey: l1ProposerPrivKey,
L2SuggestedFeeRecipient: common.HexToAddress(l2SuggestedFeeRecipient),
ProposeInterval: proposingInterval,
CommitSlot: c.Uint64(flags.CommitSlot.Name),
LocalAddresses: localAddresses,
ProposeEmptyBlocksInterval: proposeEmptyBlocksInterval,
MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name),
MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name),
ProposeBlockTxGasLimit: proposeBlockTxGasLimit,
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
ProposeBlockTxReplacementMultiplier: proposeBlockTxReplacementMultiplier,
}, nil
}
3 changes: 3 additions & 0 deletions proposer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() {
&cli.StringFlag{Name: flags.ProposeInterval.Name},
&cli.Uint64Flag{Name: flags.CommitSlot.Name},
&cli.StringFlag{Name: flags.TxPoolLocals.Name},
&cli.Uint64Flag{Name: flags.ProposeBlockTxReplacementMultiplier.Name},
}
app.Action = func(ctx *cli.Context) error {
c, err := NewConfigFromCliContext(ctx)
Expand All @@ -50,6 +51,7 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() {
s.Equal(uint64(commitSlot), c.CommitSlot)
s.Equal(1, len(c.LocalAddresses))
s.Equal(goldenTouchAddress, c.LocalAddresses[0])
s.Equal(uint64(5), c.ProposeBlockTxReplacementMultiplier)
s.Nil(new(Proposer).InitFromCli(context.Background(), ctx))

return err
Expand All @@ -66,5 +68,6 @@ func (s *ProposerTestSuite) TestNewConfigFromCliContext() {
"-" + flags.ProposeInterval.Name, proposeInterval,
"-" + flags.CommitSlot.Name, strconv.Itoa(commitSlot),
"-" + flags.TxPoolLocals.Name, goldenTouchAddress.Hex(),
"-" + flags.ProposeBlockTxReplacementMultiplier.Name, "5",
}))
}
96 changes: 85 additions & 11 deletions proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"math"
"math/big"
"math/rand"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -27,8 +29,9 @@ import (
)

var (
errNoNewTxs = errors.New("no new transactions")
waitReceiptTimeout = 1 * time.Minute
errNoNewTxs = errors.New("no new transactions")
waitReceiptTimeout = 1 * time.Minute
maxSendProposeBlockTxRetry = 10
)

// Proposer keep proposing new transactions from L2 execution engine's tx pool at a fixed interval.
Expand All @@ -50,6 +53,7 @@ type Proposer struct {
minBlockGasLimit *uint64
maxProposedTxListsPerEpoch uint64
proposeBlockTxGasLimit *uint64
txReplacementTipMultiplier uint64

// Protocol configurations
protocolConfigs *bindings.TaikoDataConfig
Expand Down Expand Up @@ -84,6 +88,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) {
p.locals = cfg.LocalAddresses
p.commitSlot = cfg.CommitSlot
p.maxProposedTxListsPerEpoch = cfg.MaxProposedTxListsPerEpoch
p.txReplacementTipMultiplier = cfg.ProposeBlockTxReplacementMultiplier
p.ctx = ctx

// RPC clients
Expand Down Expand Up @@ -270,44 +275,113 @@ func (p *Proposer) ProposeOp(ctx context.Context) error {
return nil
}

// ProposeTxList proposes the given transactions list to TaikoL1 smart contract.
func (p *Proposer) ProposeTxList(
// sendProposeBlockTx tries to send a TaikoL1.proposeBlock transaction.
func (p *Proposer) sendProposeBlockTx(
ctx context.Context,
meta *encoding.TaikoL1BlockMetadataInput,
txListBytes []byte,
txNum uint,
nonce *uint64,
) error {
isReplacement bool,
) (*types.Transaction, error) {
if p.minBlockGasLimit != nil && meta.GasLimit < uint32(*p.minBlockGasLimit) {
meta.GasLimit = uint32(*p.minBlockGasLimit)
}

// Propose the transactions list
inputs, err := encoding.EncodeProposeBlockInput(meta)
if err != nil {
return err
return nil, err
}

opts, err := getTxOpts(ctx, p.rpc.L1, p.l1ProposerPrivKey, p.rpc.L1ChainID)
if err != nil {
return err
return nil, err
}
if nonce != nil {
opts.Nonce = new(big.Int).SetUint64(*nonce)
}
if p.proposeBlockTxGasLimit != nil {
opts.GasLimit = *p.proposeBlockTxGasLimit
}
if isReplacement {
log.Info("Try replacing a transaction with same nonce", "sender", p.l1ProposerAddress, "nonce", nonce)
originalTx, err := rpc.GetPendingTxByNonce(ctx, p.rpc, p.l1ProposerAddress, *nonce)
if err != nil || originalTx == nil {
log.Warn(
"Original transaction not found",
"sender", p.l1ProposerAddress,
"nonce", nonce,
"error", err,
)

opts.GasTipCap = new(big.Int).Mul(opts.GasTipCap, new(big.Int).SetUint64(p.txReplacementTipMultiplier))
} else {
log.Info(
"Original transaction to replace",
"sender", p.l1ProposerAddress,
"nonce", nonce,
"tx", originalTx,
)

opts.GasTipCap = new(big.Int).Mul(
originalTx.GasTipCap(),
new(big.Int).SetUint64(p.txReplacementTipMultiplier),
)
}
}

proposeTx, err := p.rpc.TaikoL1.ProposeBlock(opts, inputs, txListBytes)
if err != nil {
return encoding.TryParsingCustomError(err)
return nil, encoding.TryParsingCustomError(err)
}

return proposeTx, nil
}

// ProposeTxList proposes the given transactions list to TaikoL1 smart contract.
func (p *Proposer) ProposeTxList(
ctx context.Context,
meta *encoding.TaikoL1BlockMetadataInput,
txListBytes []byte,
txNum uint,
nonce *uint64,
) error {
var (
isReplacement bool
tx *types.Transaction
err error
)
if err := backoff.Retry(
func() error {
if ctx.Err() != nil {
return nil
}
if tx, err = p.sendProposeBlockTx(ctx, meta, txListBytes, nonce, isReplacement); err != nil {
log.Warn("Failed to send propose block transaction, retrying", "error", err)
if strings.Contains(err.Error(), "replacement transaction underpriced") {
isReplacement = true
} else {
isReplacement = false
}
return err
}

return nil
},
backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxSendProposeBlockTxRetry)),
); err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
return err
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, waitReceiptTimeout)
defer cancel()

if _, err := rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, proposeTx); err != nil {
if _, err := rpc.WaitReceipt(ctxWithTimeout, p.rpc.L1, tx); err != nil {
return err
}

Expand Down
Loading