Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reflect https://github.com/smartcontractkit/chainlink/pull/15286 #12

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion chains/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ type TxManager[
GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error)
}

type TxmV2Wrapper[
CHAIN_ID chains.ID,
HEAD chains.Head[BLOCK_HASH],
ADDR chains.Hashable,
TX_HASH chains.Hashable,
BLOCK_HASH chains.Hashable,
SEQ chains.Sequence,
FEE fees.Fee,
] interface {
services.Service
CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(addr ADDR, abandon bool) error
}

type reset struct {
// f is the function to execute between stopping/starting the
// Broadcaster and Confirmer
Expand Down Expand Up @@ -112,6 +126,7 @@ type Txm[
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -147,6 +162,7 @@ func NewTxm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
Expand All @@ -169,6 +185,7 @@ func NewTxm[
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
txmv2wrapper: txmv2wrapper,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

if b.txmv2wrapper != nil {
if err := ms.Start(ctx, b.txmv2wrapper); err != nil {
return fmt.Errorf("Txm: Txmv2 failed to start: %w", err)
}
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
f := func() {
if abandon {
err = b.abandon(addr)
if b.txmv2wrapper != nil {
if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil {
b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2)
}
}
}
}

Expand Down Expand Up @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
if b.txmv2wrapper != nil {
err = b.txmv2wrapper.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast {
return b.txmv2wrapper.CreateTransaction(ctx, txRequest)
}
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil {
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand Down
Loading