From 5e99bdb764171f584df1fc6e10495c8ec0a3bb63 Mon Sep 17 00:00:00 2001 From: amit-momin <108959691+amit-momin@users.noreply.github.com> Date: Wed, 14 Aug 2024 19:40:40 -0500 Subject: [PATCH] Handle terminally stuck transactions on send (#14127) * Added error classification on send for terminally stuck transactions * Added changeset * Addressed feedback and fixed linting * Restructured broadcaster code for fatal and terminally stuck case * Reduced log levels for terminally stuck transactions --- .changeset/yellow-cougars-act.md | 5 +++ common/txmgr/broadcaster.go | 14 +++--- common/txmgr/confirmer.go | 17 +++++++- core/chains/evm/client/errors.go | 5 +++ core/chains/evm/client/errors_test.go | 22 +++++++++- core/chains/evm/txmgr/broadcaster_test.go | 19 +++++++++ core/chains/evm/txmgr/confirmer_test.go | 52 +++++++++++++++++++++++ core/chains/evm/txmgr/txmgr_test.go | 22 ++++++++++ 8 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 .changeset/yellow-cougars-act.md diff --git a/.changeset/yellow-cougars-act.md b/.changeset/yellow-cougars-act.md new file mode 100644 index 00000000000..61ed62607a0 --- /dev/null +++ b/.changeset/yellow-cougars-act.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Added client error classification for terminally stuck transactions in the TXM #internal diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index b2fb1dabff7..be3d3ca2f61 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -493,16 +493,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount) } - if errType != client.Fatal { - etx.InitialBroadcastAt = &initialBroadcastAt - etx.BroadcastAt = &initialBroadcastAt - } - - switch errType { - case client.Fatal: + if errType == client.Fatal || errType == client.TerminallyStuck { eb.SvcErrBuffer.Append(err) etx.Error = null.StringFrom(err.Error()) return eb.saveFatallyErroredTransaction(lgr, &etx), true + } + + etx.InitialBroadcastAt = &initialBroadcastAt + etx.BroadcastAt = &initialBroadcastAt + + switch errType { case client.TransactionAlreadyKnown: fallthrough case client.Successful: diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 3b421191782..d67bd451228 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -491,7 +491,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { defer wg.Done() lggr := tx.GetLogger(ec.lggr) - // Create an purge attempt for tx + // Create a purge attempt for tx purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr) if err != nil { errMu.Lock() @@ -999,6 +999,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han ec.SvcErrBuffer.Append(sendError) // This will loop continuously on every new head so it must be handled manually by the node operator! return ec.txStore.DeleteInProgressAttempt(ctx, attempt) + case client.TerminallyStuck: + // A transaction could broadcast successfully but then be considered terminally stuck on another attempt + // Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error + lggr.Warnw("terminally stuck transaction detected", "err", sendError.Error()) + ec.SvcErrBuffer.Append(sendError) + // Create a purge attempt for tx + purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr) + if err != nil { + return fmt.Errorf("NewPurgeTxAttempt failed: %w", err) + } + // Replace the in progress attempt with the purge attempt + if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &purgeAttempt); err != nil { + return fmt.Errorf("saveReplacementInProgressAttempt failed: %w", err) + } + return ec.handleInProgressAttempt(ctx, lggr, etx, purgeAttempt, blockHeight) case client.TransactionAlreadyKnown: // Sequence too low indicated that a transaction at this sequence was confirmed already. // Mark confirmed_missing_receipt and wait for the next cycle to try to get a receipt diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index 83c2d9566fe..e7fff8d0dbc 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -596,6 +596,11 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger. ) return commonclient.ExceedsMaxFee } + if sendError.IsTerminallyStuckConfigError(configErrors) { + lggr.Warnw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx) + // Attempt is thrown away in this case; we don't need it since it never got accepted by a node + return commonclient.TerminallyStuck + } lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx) return commonclient.Unknown } diff --git a/core/chains/evm/client/errors_test.go b/core/chains/evm/client/errors_test.go index 00bc1a9a5b4..32a1ba2bf32 100644 --- a/core/chains/evm/client/errors_test.go +++ b/core/chains/evm/client/errors_test.go @@ -290,7 +290,7 @@ func Test_Eth_Errors(t *testing.T) { }) t.Run("Metis gas price errors", func(t *testing.T) { - err := evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") + err = evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") assert.True(t, err.L2FeeTooLow(clientErrors)) err = newSendErrorWrapped("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") assert.True(t, err.L2FeeTooLow(clientErrors)) @@ -302,7 +302,7 @@ func Test_Eth_Errors(t *testing.T) { }) t.Run("moonriver errors", func(t *testing.T) { - err := evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)") + err = evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)") assert.True(t, err.IsNonceTooLowError(clientErrors)) assert.False(t, err.IsTransactionAlreadyInMempool(clientErrors)) assert.False(t, err.Fatal(clientErrors)) @@ -311,6 +311,24 @@ func Test_Eth_Errors(t *testing.T) { assert.False(t, err.IsNonceTooLowError(clientErrors)) assert.False(t, err.Fatal(clientErrors)) }) + + t.Run("IsTerminallyStuck", func(t *testing.T) { + tests := []errorCase{ + {"failed to add tx to the pool: not enough step counters to continue the execution", true, "zkEVM"}, + {"failed to add tx to the pool: not enough step counters to continue the execution", true, "Xlayer"}, + {"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "zkEVM"}, + {"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "Xlayer"}, + } + + for _, test := range tests { + t.Run(test.network, func(t *testing.T) { + err = evmclient.NewSendErrorS(test.message) + assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect) + err = newSendErrorWrapped(test.message) + assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect) + }) + } + }) } func Test_Eth_Errors_Fatal(t *testing.T) { diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 537875a6473..c6c342973bb 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -521,6 +521,25 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { assert.True(t, ethTx.Error.Valid) assert.Equal(t, "transaction reverted during simulation: json-rpc error { Code = 42, Message = 'oh no, it reverted', Data = 'KqYi' }", ethTx.Error.String) }) + + t.Run("terminally stuck transaction is marked as fatal", func(t *testing.T) { + terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution" + etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, []byte{42, 42, 0}, gasLimit, big.Int(assets.NewEthValue(243)), testutils.FixtureChainID) + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { + return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0 + }), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once() + + // Start processing unstarted transactions + retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress) + assert.NoError(t, err) + assert.False(t, retryable) + + dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State) + assert.True(t, dbTx.Error.Valid) + assert.Equal(t, terminallyStuckError, dbTx.Error.String) + }) }) } diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index cce6dc8fc65..eaf79b6aba7 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2673,6 +2673,58 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { }) } +func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].GasEstimator.PriceMax = assets.GWei(500) + }) + txStore := cltest.NewTestTxStore(t, db) + ctx := tests.Context(t) + + ethClient := testutils.NewEthClientMockWithDefaultChain(t) + ethKeyStore := cltest.NewKeyStore(t, db).Eth() + _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + + // Use a mock keystore for this test + ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) + currentHead := int64(30) + oldEnough := int64(19) + nonce := int64(0) + terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution" + + t.Run("terminally stuck transaction replaced with purge attempt", func(t *testing.T) { + originalBroadcastAt := time.Unix(1616509100, 0) + etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt) + nonce++ + attempt1_1 := etx.TxAttempts[0] + var dbAttempt txmgr.DbEthTxAttempt + require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID)) + + // Return terminally stuck error on first rebroadcast + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { + return tx.Nonce() == uint64(*etx.Sequence) + }), fromAddress).Return(commonclient.TerminallyStuck, errors.New(terminallyStuckError)).Once() + // Return successful for purge attempt + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { + return tx.Nonce() == uint64(*etx.Sequence) + }), fromAddress).Return(commonclient.Successful, nil).Once() + + // Start processing transactions for rebroadcast + require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead)) + var err error + etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + + require.Len(t, etx.TxAttempts, 2) + purgeAttempt := etx.TxAttempts[0] + require.True(t, purgeAttempt.IsPurgeAttempt) + }) +} + func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 3d52e6eb4f0..86bf5fcc4bd 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -786,6 +786,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) { idempotencyKey := uuid.New().String() _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + // Test the internal terminally stuck error returns Fatal nonce := evmtypes.Nonce(0) broadcast := time.Now() tx := &txmgr.Tx{ @@ -804,6 +805,27 @@ func TestTxm_GetTransactionStatus(t *testing.T) { state, err := txm.GetTransactionStatus(ctx, idempotencyKey) require.Equal(t, commontypes.Fatal, state) require.Error(t, err, evmclient.TerminallyStuckMsg) + + // Test a terminally stuck client error returns Fatal + nonce = evmtypes.Nonce(1) + idempotencyKey = uuid.New().String() + terminallyStuckClientError := "failed to add tx to the pool: not enough step counters to continue the execution" + tx = &txmgr.Tx{ + Sequence: &nonce, + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + EncodedPayload: []byte{1, 2, 3}, + FeeLimit: feeLimit, + State: txmgrcommon.TxFatalError, + Error: null.NewString(terminallyStuckClientError, true), + BroadcastAt: &broadcast, + InitialBroadcastAt: &broadcast, + } + err = txStore.InsertTx(ctx, tx) + require.NoError(t, err) + state, err = txm.GetTransactionStatus(ctx, idempotencyKey) + require.Equal(t, commontypes.Fatal, state) + require.Error(t, err, evmclient.TerminallyStuckMsg) }) t.Run("returns failed for fatal error state with other error", func(t *testing.T) {