From 4aa7e1c1a42a2450c6955efaa17056bcd667121c Mon Sep 17 00:00:00 2001 From: David Date: Wed, 17 Apr 2024 10:01:07 +0800 Subject: [PATCH] feat(proposer): introduce `TxSender` for proposer (#723) Co-authored-by: Roger <50648015+RogerLamTd@users.noreply.github.com> --- internal/docker/nodes/docker-compose.yml | 2 - internal/testutils/helper.go | 12 +++- internal/testutils/interfaces.go | 6 +- proposer/proposer.go | 82 +++++++++++++----------- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/internal/docker/nodes/docker-compose.yml b/internal/docker/nodes/docker-compose.yml index 748c47c74..7bd9e337d 100644 --- a/internal/docker/nodes/docker-compose.yml +++ b/internal/docker/nodes/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - services: l1_node: container_name: l1_node diff --git a/internal/testutils/helper.go b/internal/testutils/helper.go index 9137be6f0..217d9ed0f 100644 --- a/internal/testutils/helper.go +++ b/internal/testutils/helper.go @@ -30,13 +30,17 @@ import ( func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) { invalidTxListBytes := RandomBytes(256) - s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1)) + for _, err := range proposer.ProposeTxLists(context.Background(), [][]byte{invalidTxListBytes}) { + s.Nil(err) + } } func (s *ClientTestSuite) proposeEmptyBlockOp(ctx context.Context, proposer Proposer) { emptyTxListBytes, err := rlp.EncodeToBytes(types.Transactions{}) s.Nil(err) - s.Nil(proposer.ProposeTxList(ctx, emptyTxListBytes, 0)) + for _, err := range proposer.ProposeTxLists(ctx, [][]byte{emptyTxListBytes}) { + s.Nil(err) + } } func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( @@ -62,7 +66,9 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks( encoded, err := rlp.EncodeToBytes(emptyTxs) s.Nil(err) - s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0)) + for _, err := range proposer.ProposeTxLists(context.Background(), [][]byte{encoded}) { + s.Nil(err) + } s.ProposeInvalidTxListBytes(proposer) diff --git a/internal/testutils/interfaces.go b/internal/testutils/interfaces.go index 04d5b9ae8..3c2759998 100644 --- a/internal/testutils/interfaces.go +++ b/internal/testutils/interfaces.go @@ -13,9 +13,5 @@ type CalldataSyncer interface { type Proposer interface { utils.SubcommandApplication ProposeOp(ctx context.Context) error - ProposeTxList( - ctx context.Context, - txListBytes []byte, - txNum uint, - ) error + ProposeTxLists(ctx context.Context, txListsBytes [][]byte) []error } diff --git a/proposer/proposer.go b/proposer/proposer.go index 86ac048c0..7314ed5b7 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/ethereum-optimism/optimism/op-challenger/sender" "github.com/ethereum-optimism/optimism/op-service/txmgr" txmgrMetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -59,7 +60,7 @@ type Proposer struct { lastProposedAt time.Time - txmgr *txmgr.SimpleTxManager + txSender *sender.TxSender ctx context.Context wg sync.WaitGroup @@ -103,14 +104,16 @@ func (p *Proposer) InitFromConfig(ctx context.Context, cfg *Config) (err error) return err } - if p.txmgr, err = txmgr.NewSimpleTxManager( + txmgr, err := txmgr.NewSimpleTxManager( "proposer", log.Root(), new(txmgrMetrics.NoopTxMetrics), *cfg.TxmgrConfigs, - ); err != nil { + ) + if err != nil { return err } + p.txSender = sender.NewTxSender(p.ctx, log.Root(), txmgr, p.Config.MaxProposedTxListsPerEpoch) if p.proverSelector, err = selector.NewETHFeeEOASelector( &protocolConfigs, @@ -269,7 +272,10 @@ func (p *Proposer) fetchPoolContent(filterPoolContent bool) ([]types.Transaction // and then proposing them to TaikoL1 contract. func (p *Proposer) ProposeOp(ctx context.Context) error { // Check if it's time to propose unfiltered pool content. - filterPoolContent := time.Now().Before(p.lastProposedAt.Add(p.MinProposingInternal)) + var ( + filterPoolContent = time.Now().Before(p.lastProposedAt.Add(p.MinProposingInternal)) + txListsBytes = make([][]byte, p.MaxProposedTxListsPerEpoch) + ) // Wait until L2 execution engine is synced at first. if err := p.rpc.WaitTillL2ExecutionEngineSynced(ctx); err != nil { @@ -282,6 +288,7 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { "lastProposedAt", p.lastProposedAt, ) + // Fetch the pool content with the given limits. txLists, err := p.fetchPoolContent(filterPoolContent) if err != nil { return err @@ -298,54 +305,51 @@ func (p *Proposer) ProposeOp(ctx context.Context) error { return fmt.Errorf("failed to encode transactions: %w", err) } - if err := p.ProposeTxList(ctx, txListBytes, uint(txs.Len())); err != nil { - return fmt.Errorf("failed to send TaikoL1.proposeBlock transactions: %w", err) + txListsBytes[i] = txListBytes + } + + for i, err := range p.ProposeTxLists(ctx, txListsBytes) { + if err != nil { + log.Error("Failed to send TaikoL1.proposeBlock transaction", "index", i, "error", err) + continue } + metrics.ProposerProposedTxListsCounter.Inc(1) + metrics.ProposerProposedTxsCounter.Inc(int64(len(txLists[i]))) + + log.Info("📝 Propose transactions succeeded", "txs", len(txLists[i])) p.lastProposedAt = time.Now() } return nil } -// ProposeTxList proposes the given transactions list to TaikoL1 smart contract. -func (p *Proposer) ProposeTxList( - ctx context.Context, - txListBytes []byte, - txNum uint, -) error { - compressedTxListBytes, err := utils.Compress(txListBytes) - if err != nil { - return err - } +// ProposeTxLists proposes the given transaction lists to TaikoL1 contract. +func (p *Proposer) ProposeTxLists(ctx context.Context, txListsBytes [][]byte) []error { + txCandidates := make([]txmgr.TxCandidate, len(txListsBytes)) - txCandidate, err := p.txBuilder.Build( - ctx, - p.tierFees, - p.IncludeParentMetaHash, - compressedTxListBytes, - ) - if err != nil { - log.Warn("Failed to build TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) - return err - } + for i, txListBytes := range txListsBytes { + compressedTxListBytes, err := utils.Compress(txListBytes) + if err != nil { + log.Warn("Failed to compress transactions list", "index", i, "error", err) + break + } - receipt, err := p.txmgr.Send(p.ctx, *txCandidate) - if err != nil { - log.Warn("Failed to send TaikoL1.proposeBlock transaction", "error", encoding.TryParsingCustomError(err)) - return err - } + candidate, err := p.txBuilder.Build( + ctx, + p.tierFees, + p.IncludeParentMetaHash, + compressedTxListBytes, + ) + if err != nil { + log.Warn("Failed to build TaikoL1.proposeBlock transaction", "error", err) + break + } - if receipt.Status != types.ReceiptStatusSuccessful { - return fmt.Errorf("failed to propose block: %s", receipt.TxHash.Hex()) + txCandidates[i] = *candidate } - log.Info("📝 Propose transactions succeeded", "txs", txNum) - - metrics.ProposerProposedTxListsCounter.Inc(1) - metrics.ProposerProposedTxsCounter.Inc(int64(txNum)) - - return nil + return p.txSender.SendAndWaitDetailed("proposeBlock", txCandidates...) } // updateProposingTicker updates the internal proposing timer.