diff --git a/chains/txmgr/txmgr.go b/chains/txmgr/txmgr.go index 7e9fcc7..299c1e7 100644 --- a/chains/txmgr/txmgr.go +++ b/chains/txmgr/txmgr.go @@ -65,6 +65,20 @@ type TxManager[ GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error) } +type TxmV2Wrapper[ + CHAIN_ID chains.ID, + HEAD chains.Head[BLOCK_HASH], + ADDR chains.Hashable, + TX_HASH chains.Hashable, + BLOCK_HASH chains.Hashable, + SEQ chains.Sequence, + FEE fees.Fee, +] interface { + services.Service + CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + Reset(addr ADDR, abandon bool) error +} + type reset struct { // f is the function to execute between stopping/starting the // Broadcaster and Confirmer @@ -112,6 +126,7 @@ type Txm[ fwdMgr txmgrtypes.ForwarderManager[ADDR] txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] newErrorClassifier NewErrorClassifier + txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) { @@ -147,6 +162,7 @@ func NewTxm[ tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD], newErrorClassifierFunc NewErrorClassifier, + txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ logger: logger.Sugared(lggr), @@ -169,6 +185,7 @@ func NewTxm[ tracker: tracker, newErrorClassifier: newErrorClassifierFunc, finalizer: finalizer, + txmv2wrapper: txmv2wrapper, } if txCfg.ResendAfterThreshold() <= 0 { @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx return fmt.Errorf("Txm: Finalizer failed to start: %w", err) } + if b.txmv2wrapper != nil { + if err := ms.Start(ctx, b.txmv2wrapper); err != nil { + return fmt.Errorf("Txm: Txmv2 failed to start: %w", err) + } + } + b.logger.Info("Txm starting runLoop") b.wg.Add(1) go b.runLoop() @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr f := func() { if abandon { err = b.abandon(addr) + if b.txmv2wrapper != nil { + if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil { + b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2) + } + } } } @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err) } + if b.txmv2wrapper != nil { + err = b.txmv2wrapper.Close() + if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { + b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err) + } + } return case <-keysChanged: // This check prevents the weird edge-case where you can select @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { // Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing // Skipping CreateTransaction to avoid double send + if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast { + return b.txmv2wrapper.CreateTransaction(ctx, txRequest) + } if txRequest.IdempotencyKey != nil { var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID) if err != nil { - return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err) + return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err) } if existingTx != nil { b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)