From 29ce28f43c13b9c94b742da4f561543e91ddcf7f Mon Sep 17 00:00:00 2001 From: David Date: Fri, 3 May 2024 13:40:54 +0800 Subject: [PATCH] feat(proposer): improve `ProposeOp` (#787) --- driver/driver_test.go | 1 - internal/testutils/helper.go | 12 +-- internal/testutils/interfaces.go | 6 +- pkg/rpc/methods.go | 30 +++++++ proposer/proposer.go | 137 +++++++++++++------------------ proposer/proposer_test.go | 133 +++++++++++++----------------- 6 files changed, 154 insertions(+), 165 deletions(-) diff --git a/driver/driver_test.go b/driver/driver_test.go index fe741b6dc..f74e09bf9 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -127,7 +127,6 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() { // Reorg back to l2Head1 s.RevertL1Snapshot(testnetL1SnapshotID) - s.IncreaseTime(uint64((3 * time.Second).Seconds())) s.InitProposer() // Because of evm_revert operation, the nonce of the proposer need to be adjusted. diff --git a/internal/testutils/helper.go b/internal/testutils/helper.go index fffcb9dbf..b022471dc 100644 --- a/internal/testutils/helper.go +++ b/internal/testutils/helper.go @@ -31,17 +31,13 @@ import ( func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) { invalidTxListBytes := RandomBytes(256) - for _, err := range proposer.ProposeTxLists(context.Background(), [][]byte{invalidTxListBytes}) { - s.Nil(err) - } + s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1)) } func (s *ClientTestSuite) proposeEmptyBlockOp(ctx context.Context, proposer Proposer) { emptyTxListBytes, err := rlp.EncodeToBytes(types.Transactions{}) s.Nil(err) - for _, err := range proposer.ProposeTxLists(ctx, [][]byte{emptyTxListBytes}) { - s.Nil(err) - } + s.Nil(proposer.ProposeTxList(ctx, emptyTxListBytes, 0)) } func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( @@ -67,9 +63,7 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( encoded, err := rlp.EncodeToBytes(emptyTxs) s.Nil(err) - for _, err := range proposer.ProposeTxLists(context.Background(), [][]byte{encoded}) { - s.Nil(err) - } + s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0)) s.ProposeInvalidTxListBytes(proposer) diff --git a/internal/testutils/interfaces.go b/internal/testutils/interfaces.go index 88a5602ab..fb1ff35a6 100644 --- a/internal/testutils/interfaces.go +++ b/internal/testutils/interfaces.go @@ -13,5 +13,9 @@ type BlobSyncer interface { type Proposer interface { utils.SubcommandApplication ProposeOp(ctx context.Context) error - ProposeTxLists(ctx context.Context, txListsBytes [][]byte) []error + ProposeTxList( + ctx context.Context, + txListBytes []byte, + txNum uint, + ) error } diff --git a/pkg/rpc/methods.go b/pkg/rpc/methods.go index b5af64a34..a7112102f 100644 --- a/pkg/rpc/methods.go +++ b/pkg/rpc/methods.go @@ -724,3 +724,33 @@ func (c *Client) GetTaikoDataSlotBByNumber(ctx context.Context, number uint64) ( return nil, fmt.Errorf("failed to get state variables by block number %d", number) } + +// WaitL1NewPendingTransaction waits until the L1 account has a new pending transaction. +func (c *Client) WaitL1NewPendingTransaction( + ctx context.Context, + address common.Address, + oldPendingNonce uint64, +) error { + ctxWithTimeout, cancel := ctxWithTimeoutOrDefault(ctx, defaultTimeout) + defer cancel() + + ticker := time.NewTicker(rpcPollingInterval) + defer ticker.Stop() + + for ; true; <-ticker.C { + if ctxWithTimeout.Err() != nil { + return ctxWithTimeout.Err() + } + + nonce, err := c.L1.PendingNonceAt(ctxWithTimeout, address) + if err != nil { + return err + } + + if nonce != oldPendingNonce { + break + } + } + + return nil +} diff --git a/proposer/proposer.go b/proposer/proposer.go index cbbcbf6fd..3e2d231d2 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -5,11 +5,9 @@ import ( "context" "fmt" "math/rand" - "strings" "sync" "time" - "github.com/ethereum-optimism/optimism/op-challenger/sender" "github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -18,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "github.com/taikoxyz/taiko-client/bindings" "github.com/taikoxyz/taiko-client/bindings/encoding" @@ -60,7 +59,7 @@ type Proposer struct { lastProposedAt time.Time - txSender *sender.TxSender + txmgr *txmgr.SimpleTxManager ctx context.Context wg sync.WaitGroup @@ -104,16 +103,14 @@ func (p *Proposer) InitFromConfig(ctx context.Context, cfg *Config) (err error) return err } - txmgr, err := txmgr.NewSimpleTxManager( + if p.txmgr, err = txmgr.NewSimpleTxManager( "proposer", log.Root(), &metrics.TxMgrMetrics, *cfg.TxmgrConfigs, - ) - if err != nil { + ); err != nil { return err } - p.txSender = sender.NewTxSender(p.ctx, log.Root(), txmgr, p.Config.MaxProposedTxListsPerEpoch) if p.proverSelector, err = selector.NewETHFeeEOASelector( &protocolConfigs, @@ -277,10 +274,7 @@ func (p *Proposer) fetchPoolContent(filterPoolContent bool) ([]types.Transaction // and then proposing them to TaikoL1 contract. func (p *Proposer) ProposeOp(ctx context.Context) error { // Check if it's time to propose unfiltered pool content. - var ( - filterPoolContent = time.Now().Before(p.lastProposedAt.Add(p.MinProposingInternal)) - txListsBytes = make([][]byte, 0) - ) + filterPoolContent := time.Now().Before(p.lastProposedAt.Add(p.MinProposingInternal)) // Wait until L2 execution engine is synced at first. if err := p.rpc.WaitTillL2ExecutionEngineSynced(ctx); err != nil { @@ -293,7 +287,6 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { "lastProposedAt", p.lastProposedAt, ) - // Fetch the pool content with the given limits. txLists, err := p.fetchPoolContent(filterPoolContent) if err != nil { return err @@ -304,92 +297,78 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { return nil } + g, gCtx := errgroup.WithContext(ctx) // Propose all L2 transactions lists. - for i, txs := range txLists { - if i >= int(p.MaxProposedTxListsPerEpoch) { - return nil - } - - txListBytes, err := rlp.EncodeToBytes(txs) + for _, txs := range txLists[:utils.Min(p.MaxProposedTxListsPerEpoch, uint64(len(txLists)))] { + nonce, err := p.rpc.L1.PendingNonceAt(ctx, p.proposerAddress) if err != nil { - return fmt.Errorf("failed to encode transactions: %w", err) + log.Error("Failed to get proposer nonce", "error", err) + break } - txListsBytes = append(txListsBytes, txListBytes) - } + log.Info("Proposer current pending nonce", "nonce", nonce) - for i, err := range p.ProposeTxLists(ctx, txListsBytes) { - if err != nil { - log.Error( - "Failed to send TaikoL1.proposeBlock transaction", - "index", i, - "error", err, - ) - continue - } - - metrics.ProposerProposedTxListsCounter.Add(1) - metrics.ProposerProposedTxsCounter.Add(float64(len(txLists[i]))) + g.Go(func() error { + txListBytes, err := rlp.EncodeToBytes(txs) + if err != nil { + return fmt.Errorf("failed to encode transactions: %w", err) + } + if err := p.ProposeTxList(gCtx, txListBytes, uint(txs.Len())); err != nil { + return err + } + p.lastProposedAt = time.Now() + return nil + }) - log.Info("📝 Propose transactions succeeded", "txs", len(txLists[i])) - p.lastProposedAt = time.Now() + if err := p.rpc.WaitL1NewPendingTransaction(ctx, p.proposerAddress, nonce); err != nil { + log.Error("Failed to wait for new pending transaction", "error", err) + } + } + if err := g.Wait(); err != nil { + return err } return nil } -// ProposeTxLists proposes the given transaction lists to TaikoL1 contract. -func (p *Proposer) ProposeTxLists(ctx context.Context, txListsBytes [][]byte) []error { - txCandidates := make([]txmgr.TxCandidate, 0) - - for i, txListBytes := range txListsBytes { - compressedTxListBytes, err := utils.Compress(txListBytes) - if err != nil { - log.Error("Failed to compress transactions list", "index", i, "error", err) - break - } +// ProposeTxList proposes the given transactions list to TaikoL1 smart contract. +func (p *Proposer) ProposeTxList( + ctx context.Context, + txListBytes []byte, + txNum uint, +) error { + compressedTxListBytes, err := utils.Compress(txListBytes) + if err != nil { + return err + } - candidate, err := p.txBuilder.Build( - ctx, - p.tierFees, - p.IncludeParentMetaHash, - compressedTxListBytes, - ) - if err != nil { - log.Error("Failed to build TaikoL1.proposeBlock transaction", "error", err) - break - } + txCandidate, err := p.txBuilder.Build( + ctx, + p.tierFees, + p.IncludeParentMetaHash, + compressedTxListBytes, + ) + if err != nil { + log.Warn("Failed to build TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) + return err + } - txCandidates = append(txCandidates, *candidate) + receipt, err := p.txmgr.Send(ctx, *txCandidate) + if err != nil { + log.Warn("Failed to send TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) + return err } - if len(txCandidates) == 0 { - return []error{} + if receipt.Status != types.ReceiptStatusSuccessful { + return fmt.Errorf("failed to propose block: %s", receipt.TxHash.Hex()) } - // Send the transactions to the TaikoL1 contract, and if any of them fails, try - // to parse the custom error. - errors := p.txSender.SendAndWaitDetailed("proposeBlock", txCandidates...) - for i, err := range errors { - if err == nil { - continue - } + log.Info("📝 Propose transactions succeeded", "txs", txNum) - // If a transaction is reverted on chain, the error string returned by txSender will like this: - // fmt.Errorf("%w purpose: %v hash: %v", ErrTransactionReverted, txPurpose, rcpt.Receipt.TxHash) - // Then we try parsing the custom error for more details in log. - if strings.Contains(err.Error(), "purpose: ") && strings.Contains(err.Error(), "hash: ") { - txHash := strings.Split(err.Error(), "hash: ")[1] - receipt, err := p.rpc.L1.TransactionReceipt(ctx, common.HexToHash(txHash)) - if err != nil { - log.Error("Failed to fetch receipt", "txHash", txHash, "error", err) - continue - } - errors[i] = encoding.TryParsingCustomErrorFromReceipt(ctx, p.rpc.L1, p.proposerAddress, receipt) - } - } + metrics.ProposerProposedTxListsCounter.Add(1) + metrics.ProposerProposedTxsCounter.Add(float64(txNum)) - return errors + return nil } // updateProposingTicker updates the internal proposing timer. diff --git a/proposer/proposer_test.go b/proposer/proposer_test.go index 1db83e881..98f12fc64 100644 --- a/proposer/proposer_test.go +++ b/proposer/proposer_test.go @@ -3,7 +3,6 @@ package proposer import ( "context" "os" - "strings" "testing" "time" @@ -139,6 +138,64 @@ func parseTxs(client *rpc.Client, event *bindings.TaikoL1ClientBlockProposed) (t var txs types.Transactions return txs, rlp.DecodeBytes(txListBytes, &txs) } +func (s *ProposerTestSuite) TestProposeTxLists() { + p := s.p + ctx := p.ctx + cfg := s.p.Config + + txBuilder := builder.NewBlobTransactionBuilder( + p.rpc, + p.L1ProposerPrivKey, + p.proverSelector, + p.Config.L1BlockBuilderTip, + cfg.TaikoL1Address, + cfg.L2SuggestedFeeRecipient, + cfg.AssignmentHookAddress, + cfg.ProposeBlockTxGasLimit, + cfg.ExtraData, + ) + + emptyTxListBytes, err := rlp.EncodeToBytes(types.Transactions{}) + s.Nil(err) + txListsBytes := [][]byte{emptyTxListBytes} + txCandidates := make([]txmgr.TxCandidate, len(txListsBytes)) + for i, txListBytes := range txListsBytes { + compressedTxListBytes, err := utils.Compress(txListBytes) + if err != nil { + log.Warn("Failed to compress transactions list", "index", i, "error", err) + break + } + + candidate, err := txBuilder.Build( + p.ctx, + p.tierFees, + p.IncludeParentMetaHash, + compressedTxListBytes, + ) + if err != nil { + log.Warn("Failed to build TaikoL1.proposeBlock transaction", "error", err) + break + } + + // trigger the error + candidate.Blobs = []*eth.Blob{} + candidate.GasLimit = 10000000 + + txCandidates[i] = *candidate + } + + var errors []error + for _, txCandidate := range txCandidates { + receipt, err := p.txmgr.Send(ctx, txCandidate) + s.Nil(err) + errors = append(errors, encoding.TryParsingCustomErrorFromReceipt(ctx, p.rpc.L1, p.proposerAddress, receipt)) + } + + // confirm errors handled + for _, err := range errors { + s.Equal("L1_BLOB_NOT_AVAILABLE", err.Error()) + } +} func (s *ProposerTestSuite) getLatestProposedTxs( n int, @@ -289,80 +346,6 @@ func (s *ProposerTestSuite) TestAssignProverSuccessFirstRound() { s.Equal(fee.Uint64(), s.p.OptimisticTierFee.Uint64()) } -func (s *ProposerTestSuite) TestProposeTxLists() { - p := s.p - ctx := p.ctx - cfg := s.p.Config - - txBuilder := builder.NewBlobTransactionBuilder( - p.rpc, - p.L1ProposerPrivKey, - p.proverSelector, - p.Config.L1BlockBuilderTip, - cfg.TaikoL1Address, - cfg.L2SuggestedFeeRecipient, - cfg.AssignmentHookAddress, - cfg.ProposeBlockTxGasLimit, - cfg.ExtraData, - ) - - emptyTxListBytes, err := rlp.EncodeToBytes(types.Transactions{}) - s.Nil(err) - txListsBytes := [][]byte{emptyTxListBytes} - txCandidates := make([]txmgr.TxCandidate, len(txListsBytes)) - for i, txListBytes := range txListsBytes { - compressedTxListBytes, err := utils.Compress(txListBytes) - if err != nil { - log.Warn("Failed to compress transactions list", "index", i, "error", err) - break - } - - candidate, err := txBuilder.Build( - p.ctx, - p.tierFees, - p.IncludeParentMetaHash, - compressedTxListBytes, - ) - if err != nil { - log.Warn("Failed to build TaikoL1.proposeBlock transaction", "error", err) - break - } - - // trigger the error - candidate.Blobs = []*eth.Blob{} - candidate.GasLimit = 10000000 - - txCandidates[i] = *candidate - } - - // Send the transactions to the TaikoL1 contract, and if any of them fails, try - // to parse the custom error. - errors := p.txSender.SendAndWaitDetailed("proposeBlock", txCandidates...) - for i, err := range errors { - if err == nil { - continue - } - - // If a transaction is reverted on chain, the error string returned by txSender will like this: - // fmt.Errorf("%w purpose: %v hash: %v", ErrTransactionReverted, txPurpose, rcpt.Receipt.TxHash) - // Then we try parsing the custom error for more details in log. - if strings.Contains(err.Error(), "purpose: ") && strings.Contains(err.Error(), "hash: ") { - txHash := strings.Split(err.Error(), "hash: ")[1] - receipt, err := p.rpc.L1.TransactionReceipt(ctx, common.HexToHash(txHash)) - if err != nil { - log.Error("Failed to fetch receipt", "txHash", txHash, "error", err) - continue - } - errors[i] = encoding.TryParsingCustomErrorFromReceipt(ctx, p.rpc.L1, p.proposerAddress, receipt) - } - } - - // confirm errors handled - for _, err := range errors { - s.Equal("L1_BLOB_NOT_AVAILABLE", err.Error()) - } -} - func (s *ProposerTestSuite) TestUpdateProposingTicker() { s.p.ProposeInterval = 1 * time.Hour s.NotPanics(s.p.updateProposingTicker)