Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle terminally stuck transactions on send #14127

5 changes: 5 additions & 0 deletions .changeset/yellow-cougars-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added client error classification for terminally stuck transactions in the TXM #internal
14 changes: 7 additions & 7 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount)
}

if errType != client.Fatal {
etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt
}

switch errType {
case client.Fatal:
if errType == client.Fatal || errType == client.TerminallyStuck {
eb.SvcErrBuffer.Append(err)
etx.Error = null.StringFrom(err.Error())
return eb.saveFatallyErroredTransaction(lgr, &etx), true
}

etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt

switch errType {
case client.TransactionAlreadyKnown:
fallthrough
case client.Successful:
Expand Down
17 changes: 16 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro
go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
defer wg.Done()
lggr := tx.GetLogger(ec.lggr)
// Create an purge attempt for tx
// Create a purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr)
if err != nil {
errMu.Lock()
Expand Down Expand Up @@ -999,6 +999,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
ec.SvcErrBuffer.Append(sendError)
// This will loop continuously on every new head so it must be handled manually by the node operator!
return ec.txStore.DeleteInProgressAttempt(ctx, attempt)
case client.TerminallyStuck:
// A transaction could broadcast successfully but then be considered terminally stuck on another attempt
// Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error
lggr.Warnw("terminally stuck transaction detected", "err", sendError.Error())
ec.SvcErrBuffer.Append(sendError)
// Create a purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr)
if err != nil {
return fmt.Errorf("NewPurgeTxAttempt failed: %w", err)
}
// Replace the in progress attempt with the purge attempt
if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &purgeAttempt); err != nil {
return fmt.Errorf("saveReplacementInProgressAttempt failed: %w", err)
}
return ec.handleInProgressAttempt(ctx, lggr, etx, purgeAttempt, blockHeight)
case client.TransactionAlreadyKnown:
// Sequence too low indicated that a transaction at this sequence was confirmed already.
// Mark confirmed_missing_receipt and wait for the next cycle to try to get a receipt
Expand Down
5 changes: 5 additions & 0 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
)
return commonclient.ExceedsMaxFee
}
if sendError.IsTerminallyStuckConfigError(configErrors) {
lggr.Warnw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx)
// Attempt is thrown away in this case; we don't need it since it never got accepted by a node
return commonclient.TerminallyStuck
}
lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx)
return commonclient.Unknown
}
22 changes: 20 additions & 2 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func Test_Eth_Errors(t *testing.T) {
})

t.Run("Metis gas price errors", func(t *testing.T) {
err := evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
err = evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
assert.True(t, err.L2FeeTooLow(clientErrors))
err = newSendErrorWrapped("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
assert.True(t, err.L2FeeTooLow(clientErrors))
Expand All @@ -302,7 +302,7 @@ func Test_Eth_Errors(t *testing.T) {
})

t.Run("moonriver errors", func(t *testing.T) {
err := evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)")
err = evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)")
assert.True(t, err.IsNonceTooLowError(clientErrors))
assert.False(t, err.IsTransactionAlreadyInMempool(clientErrors))
assert.False(t, err.Fatal(clientErrors))
Expand All @@ -311,6 +311,24 @@ func Test_Eth_Errors(t *testing.T) {
assert.False(t, err.IsNonceTooLowError(clientErrors))
assert.False(t, err.Fatal(clientErrors))
})

t.Run("IsTerminallyStuck", func(t *testing.T) {
tests := []errorCase{
{"failed to add tx to the pool: not enough step counters to continue the execution", true, "zkEVM"},
{"failed to add tx to the pool: not enough step counters to continue the execution", true, "Xlayer"},
{"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "zkEVM"},
{"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "Xlayer"},
}

for _, test := range tests {
t.Run(test.network, func(t *testing.T) {
err = evmclient.NewSendErrorS(test.message)
assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect)
err = newSendErrorWrapped(test.message)
assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect)
})
}
})
}

func Test_Eth_Errors_Fatal(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,25 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) {
assert.True(t, ethTx.Error.Valid)
assert.Equal(t, "transaction reverted during simulation: json-rpc error { Code = 42, Message = 'oh no, it reverted', Data = 'KqYi' }", ethTx.Error.String)
})

t.Run("terminally stuck transaction is marked as fatal", func(t *testing.T) {
terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution"
etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, []byte{42, 42, 0}, gasLimit, big.Int(assets.NewEthValue(243)), testutils.FixtureChainID)
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool {
return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0
}), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once()

// Start processing unstarted transactions
retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress)
assert.NoError(t, err)
assert.False(t, retryable)

dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)
assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State)
assert.True(t, dbTx.Error.Valid)
assert.Equal(t, terminallyStuckError, dbTx.Error.String)
})
})
}

Expand Down
52 changes: 52 additions & 0 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,58 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) {
})
}

func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].GasEstimator.PriceMax = assets.GWei(500)
})
txStore := cltest.NewTestTxStore(t, db)
ctx := tests.Context(t)

ethClient := testutils.NewEthClientMockWithDefaultChain(t)
ethKeyStore := cltest.NewKeyStore(t, db).Eth()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)

evmcfg := evmtest.NewChainScopedConfig(t, cfg)

// Use a mock keystore for this test
ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil)
currentHead := int64(30)
oldEnough := int64(19)
nonce := int64(0)
terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution"

t.Run("terminally stuck transaction replaced with purge attempt", func(t *testing.T) {
originalBroadcastAt := time.Unix(1616509100, 0)
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt)
nonce++
attempt1_1 := etx.TxAttempts[0]
var dbAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID))

// Return terminally stuck error on first rebroadcast
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
return tx.Nonce() == uint64(*etx.Sequence)
}), fromAddress).Return(commonclient.TerminallyStuck, errors.New(terminallyStuckError)).Once()
// Return successful for purge attempt
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
return tx.Nonce() == uint64(*etx.Sequence)
}), fromAddress).Return(commonclient.Successful, nil).Once()

// Start processing transactions for rebroadcast
require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead))
var err error
etx, err = txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)

require.Len(t, etx.TxAttempts, 2)
purgeAttempt := etx.TxAttempts[0]
require.True(t, purgeAttempt.IsPurgeAttempt)
})
}

func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) {
t.Parallel()

Expand Down
22 changes: 22 additions & 0 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) {
idempotencyKey := uuid.New().String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
// Test the internal terminally stuck error returns Fatal
nonce := evmtypes.Nonce(0)
broadcast := time.Now()
tx := &txmgr.Tx{
Expand All @@ -804,6 +805,27 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)

// Test a terminally stuck client error returns Fatal
nonce = evmtypes.Nonce(1)
idempotencyKey = uuid.New().String()
terminallyStuckClientError := "failed to add tx to the pool: not enough step counters to continue the execution"
tx = &txmgr.Tx{
Sequence: &nonce,
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
FeeLimit: feeLimit,
State: txmgrcommon.TxFatalError,
Error: null.NewString(terminallyStuckClientError, true),
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
}
err = txStore.InsertTx(ctx, tx)
require.NoError(t, err)
state, err = txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)
})

t.Run("returns failed for fatal error state with other error", func(t *testing.T) {
Expand Down
Loading