From 2a8f3680a62a6dece025fe8d2bcd98e3cbc511e5 Mon Sep 17 00:00:00 2001 From: pavel-raykov Date: Tue, 21 Jan 2025 19:45:50 +0100 Subject: [PATCH] Minor --- chains/txmgr/txmgr.go | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/chains/txmgr/txmgr.go b/chains/txmgr/txmgr.go index 7e9fcc7..d3b839a 100644 --- a/chains/txmgr/txmgr.go +++ b/chains/txmgr/txmgr.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/jpillora/backoff" + "github.com/smartcontractkit/chainlink-framework/chains" nullv4 "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -17,7 +18,6 @@ import ( commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils" - "github.com/smartcontractkit/chainlink-framework/chains" "github.com/smartcontractkit/chainlink-framework/chains/fees" "github.com/smartcontractkit/chainlink-framework/chains/headtracker" txmgrtypes "github.com/smartcontractkit/chainlink-framework/chains/txmgr/types" @@ -65,6 +65,20 @@ type TxManager[ GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error) } +type TxmV2Wrapper[ + CHAIN_ID chains.ID, + HEAD chains.Head[BLOCK_HASH], + ADDR chains.Hashable, + TX_HASH chains.Hashable, + BLOCK_HASH chains.Hashable, + SEQ chains.Sequence, + FEE fees.Fee, +] interface { + services.Service + CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + Reset(addr ADDR, abandon bool) error +} + type reset struct { // f is the function to execute between stopping/starting the // Broadcaster and Confirmer @@ -112,6 +126,7 @@ type Txm[ fwdMgr txmgrtypes.ForwarderManager[ADDR] txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] newErrorClassifier NewErrorClassifier + txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) { @@ -147,6 +162,7 @@ func NewTxm[ tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD], newErrorClassifierFunc NewErrorClassifier, + txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ logger: logger.Sugared(lggr), @@ -169,6 +185,7 @@ func NewTxm[ tracker: tracker, newErrorClassifier: newErrorClassifierFunc, finalizer: finalizer, + txmv2wrapper: txmv2wrapper, } if txCfg.ResendAfterThreshold() <= 0 { @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx return fmt.Errorf("Txm: Finalizer failed to start: %w", err) } + if b.txmv2wrapper != nil { + if err := ms.Start(ctx, b.txmv2wrapper); err != nil { + return fmt.Errorf("Txm: Txmv2 failed to start: %w", err) + } + } + b.logger.Info("Txm starting runLoop") b.wg.Add(1) go b.runLoop() @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr f := func() { if abandon { err = b.abandon(addr) + if b.txmv2wrapper != nil { + if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil { + b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2) + } + } } } @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err) } + if b.txmv2wrapper != nil { + err = b.txmv2wrapper.Close() + if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { + b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err) + } + } return case <-keysChanged: // This check prevents the weird edge-case where you can select @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { // Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing // Skipping CreateTransaction to avoid double send + if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast { + return b.txmv2wrapper.CreateTransaction(ctx, txRequest) + } if txRequest.IdempotencyKey != nil { var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID) if err != nil { - return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err) + return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err) } if existingTx != nil { b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)