Skip to content

Commit

Permalink
Merge branch 'multinode/expand-health-report' of https://github.com/s…
Browse files Browse the repository at this point in the history
…martcontractkit/chainlink-framework into multinode/expand-health-report
  • Loading branch information
DylanTinianov committed Jan 21, 2025
2 parents fbb46a9 + e0331c7 commit 2ec1a41
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion chains/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ type TxManager[
GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error)
}

type TxmV2Wrapper[
CHAIN_ID chains.ID,
HEAD chains.Head[BLOCK_HASH],
ADDR chains.Hashable,
TX_HASH chains.Hashable,
BLOCK_HASH chains.Hashable,
SEQ chains.Sequence,
FEE fees.Fee,
] interface {
services.Service
CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(addr ADDR, abandon bool) error
}

type reset struct {
// f is the function to execute between stopping/starting the
// Broadcaster and Confirmer
Expand Down Expand Up @@ -112,6 +126,7 @@ type Txm[
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -147,6 +162,7 @@ func NewTxm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
Expand All @@ -169,6 +185,7 @@ func NewTxm[
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
txmv2wrapper: txmv2wrapper,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

if b.txmv2wrapper != nil {
if err := ms.Start(ctx, b.txmv2wrapper); err != nil {
return fmt.Errorf("Txm: Txmv2 failed to start: %w", err)
}
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
f := func() {
if abandon {
err = b.abandon(addr)
if b.txmv2wrapper != nil {
if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil {
b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2)
}
}
}
}

Expand Down Expand Up @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
if b.txmv2wrapper != nil {
err = b.txmv2wrapper.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast {
return b.txmv2wrapper.CreateTransaction(ctx, txRequest)
}
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil {
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand Down

0 comments on commit 2ec1a41

Please sign in to comment.