From 415ae2b9d10cfd38c48dd5259bc44eb2b4ad8d7a Mon Sep 17 00:00:00 2001 From: amit-momin Date: Wed, 14 Aug 2024 15:03:08 -0500 Subject: [PATCH 1/5] Added error classification on send for terminally stuck transactions --- common/txmgr/broadcaster.go | 4 +- common/txmgr/confirmer.go | 17 +++++++- core/chains/evm/client/errors.go | 5 +++ core/chains/evm/client/errors_test.go | 18 ++++++++ core/chains/evm/txmgr/broadcaster_test.go | 19 +++++++++ core/chains/evm/txmgr/confirmer_test.go | 52 +++++++++++++++++++++++ core/chains/evm/txmgr/txmgr_test.go | 22 ++++++++++ 7 files changed, 134 insertions(+), 3 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index b2fb1dabff7..2fcb7024199 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -493,13 +493,13 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount) } - if errType != client.Fatal { + if errType != client.Fatal && errType != client.TerminallyStuck { etx.InitialBroadcastAt = &initialBroadcastAt etx.BroadcastAt = &initialBroadcastAt } switch errType { - case client.Fatal: + case client.Fatal, client.TerminallyStuck: eb.SvcErrBuffer.Append(err) etx.Error = null.StringFrom(err.Error()) return eb.saveFatallyErroredTransaction(lgr, &etx), true diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 3b421191782..cf356609e96 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -491,7 +491,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { defer wg.Done() lggr := tx.GetLogger(ec.lggr) - // Create an purge attempt for tx + // Create a purge attempt for tx purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr) if err != nil { errMu.Lock() @@ -999,6 +999,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han ec.SvcErrBuffer.Append(sendError) // This will loop continuously on every new head so it must be handled manually by the node operator! return ec.txStore.DeleteInProgressAttempt(ctx, attempt) + case client.TerminallyStuck: + // A transaction could broadcast successfully but then be considered terminally stuck on another attempt + // Even though the transaction can succeeed under different circumstances, we want to purge this transaction as soon as we get this error + lggr.Errorw("terminally stuck transaction detected", "err", sendError.Error()) + ec.SvcErrBuffer.Append(sendError) + // Create a purge attempt for tx + purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr) + if err != nil { + return fmt.Errorf("NewPurgeTxAttempt failed: %w", err) + } + // Replace the in progress attempt with the purge attempt + if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &purgeAttempt); err != nil { + return fmt.Errorf("saveReplacementInProgressAttempt failed: %w", err) + } + return ec.handleInProgressAttempt(ctx, lggr, etx, purgeAttempt, blockHeight) case client.TransactionAlreadyKnown: // Sequence too low indicated that a transaction at this sequence was confirmed already. // Mark confirmed_missing_receipt and wait for the next cycle to try to get a receipt diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index 83c2d9566fe..a298cc82031 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -596,6 +596,11 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger. ) return commonclient.ExceedsMaxFee } + if sendError.IsTerminallyStuckConfigError(configErrors) { + lggr.Criticalw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx) + // Attempt is thrown away in this case; we don't need it since it never got accepted by a node + return commonclient.TerminallyStuck + } lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx) return commonclient.Unknown } diff --git a/core/chains/evm/client/errors_test.go b/core/chains/evm/client/errors_test.go index 00bc1a9a5b4..e4f2db77e91 100644 --- a/core/chains/evm/client/errors_test.go +++ b/core/chains/evm/client/errors_test.go @@ -311,6 +311,24 @@ func Test_Eth_Errors(t *testing.T) { assert.False(t, err.IsNonceTooLowError(clientErrors)) assert.False(t, err.Fatal(clientErrors)) }) + + t.Run("IsTerminallyStuck", func(t *testing.T) { + tests := []errorCase{ + {"failed to add tx to the pool: not enough step counters to continue the execution", true, "zkEVM"}, + {"failed to add tx to the pool: not enough step counters to continue the execution", true, "Xlayer"}, + {"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "zkEVM"}, + {"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "Xlayer"}, + } + + for _, test := range tests { + t.Run(test.network, func(t *testing.T) { + err = evmclient.NewSendErrorS(test.message) + assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect) + err = newSendErrorWrapped(test.message) + assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect) + }) + } + }) } func Test_Eth_Errors_Fatal(t *testing.T) { diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 537875a6473..032751f74e5 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -521,6 +521,25 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { assert.True(t, ethTx.Error.Valid) assert.Equal(t, "transaction reverted during simulation: json-rpc error { Code = 42, Message = 'oh no, it reverted', Data = 'KqYi' }", ethTx.Error.String) }) + + t.Run("terminally stuck transaction is marked as fatal", func(t *testing.T) { + terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution" + etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, []byte{42, 42, 0}, gasLimit, big.Int(assets.NewEthValue(243)), testutils.FixtureChainID) + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { + return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0 + }), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once() + + // Do the thing + retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress) + assert.NoError(t, err) + assert.False(t, retryable) + + dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State) + assert.True(t, dbTx.Error.Valid) + assert.Equal(t, terminallyStuckError, dbTx.Error.String) + }) }) } diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index cce6dc8fc65..72219fcc4f7 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2673,6 +2673,58 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { }) } +func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].GasEstimator.PriceMax = assets.GWei(500) + }) + txStore := cltest.NewTestTxStore(t, db) + ctx := tests.Context(t) + + ethClient := testutils.NewEthClientMockWithDefaultChain(t) + ethKeyStore := cltest.NewKeyStore(t, db).Eth() + _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + + // Use a mock keystore for this test + ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) + currentHead := int64(30) + oldEnough := int64(19) + nonce := int64(0) + terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution" + + t.Run("terminally stuck transaction replaced with purge attempt", func(t *testing.T) { + originalBroadcastAt := time.Unix(1616509100, 0) + etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt) + nonce++ + attempt1_1 := etx.TxAttempts[0] + var dbAttempt txmgr.DbEthTxAttempt + require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID)) + + // Return terminally stuck error on first rebroadcast + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { + return tx.Nonce() == uint64(*etx.Sequence) + }), fromAddress).Return(commonclient.TerminallyStuck, errors.New(terminallyStuckError)).Once() + // Return successful for purge attempt + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { + return tx.Nonce() == uint64(*etx.Sequence) + }), fromAddress).Return(commonclient.Successful, nil).Once() + + // Do the thing + require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead)) + var err error + etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + + require.Len(t, etx.TxAttempts, 2) + purgeAttempt := etx.TxAttempts[0] + require.True(t, purgeAttempt.IsPurgeAttempt) + }) +} + func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 3d52e6eb4f0..86bf5fcc4bd 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -786,6 +786,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) { idempotencyKey := uuid.New().String() _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + // Test the internal terminally stuck error returns Fatal nonce := evmtypes.Nonce(0) broadcast := time.Now() tx := &txmgr.Tx{ @@ -804,6 +805,27 @@ func TestTxm_GetTransactionStatus(t *testing.T) { state, err := txm.GetTransactionStatus(ctx, idempotencyKey) require.Equal(t, commontypes.Fatal, state) require.Error(t, err, evmclient.TerminallyStuckMsg) + + // Test a terminally stuck client error returns Fatal + nonce = evmtypes.Nonce(1) + idempotencyKey = uuid.New().String() + terminallyStuckClientError := "failed to add tx to the pool: not enough step counters to continue the execution" + tx = &txmgr.Tx{ + Sequence: &nonce, + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + EncodedPayload: []byte{1, 2, 3}, + FeeLimit: feeLimit, + State: txmgrcommon.TxFatalError, + Error: null.NewString(terminallyStuckClientError, true), + BroadcastAt: &broadcast, + InitialBroadcastAt: &broadcast, + } + err = txStore.InsertTx(ctx, tx) + require.NoError(t, err) + state, err = txm.GetTransactionStatus(ctx, idempotencyKey) + require.Equal(t, commontypes.Fatal, state) + require.Error(t, err, evmclient.TerminallyStuckMsg) }) t.Run("returns failed for fatal error state with other error", func(t *testing.T) { From ed80b43c592d2c6f2cac00ff2b1a8c11c4edf574 Mon Sep 17 00:00:00 2001 From: amit-momin Date: Wed, 14 Aug 2024 15:09:35 -0500 Subject: [PATCH 2/5] Added changeset --- .changeset/yellow-cougars-act.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/yellow-cougars-act.md diff --git a/.changeset/yellow-cougars-act.md b/.changeset/yellow-cougars-act.md new file mode 100644 index 00000000000..61ed62607a0 --- /dev/null +++ b/.changeset/yellow-cougars-act.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Added client error classification for terminally stuck transactions in the TXM #internal From 321c21abc98531c88d38723a5bf22ba3b2e74fdd Mon Sep 17 00:00:00 2001 From: amit-momin Date: Wed, 14 Aug 2024 16:02:49 -0500 Subject: [PATCH 3/5] Addressed feedback and fixed linting --- common/txmgr/confirmer.go | 2 +- core/chains/evm/client/errors_test.go | 4 ++-- core/chains/evm/txmgr/broadcaster_test.go | 4 ++-- core/chains/evm/txmgr/confirmer_test.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index cf356609e96..e546ed18c33 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -1001,7 +1001,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han return ec.txStore.DeleteInProgressAttempt(ctx, attempt) case client.TerminallyStuck: // A transaction could broadcast successfully but then be considered terminally stuck on another attempt - // Even though the transaction can succeeed under different circumstances, we want to purge this transaction as soon as we get this error + // Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error lggr.Errorw("terminally stuck transaction detected", "err", sendError.Error()) ec.SvcErrBuffer.Append(sendError) // Create a purge attempt for tx diff --git a/core/chains/evm/client/errors_test.go b/core/chains/evm/client/errors_test.go index e4f2db77e91..32a1ba2bf32 100644 --- a/core/chains/evm/client/errors_test.go +++ b/core/chains/evm/client/errors_test.go @@ -290,7 +290,7 @@ func Test_Eth_Errors(t *testing.T) { }) t.Run("Metis gas price errors", func(t *testing.T) { - err := evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") + err = evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") assert.True(t, err.L2FeeTooLow(clientErrors)) err = newSendErrorWrapped("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") assert.True(t, err.L2FeeTooLow(clientErrors)) @@ -302,7 +302,7 @@ func Test_Eth_Errors(t *testing.T) { }) t.Run("moonriver errors", func(t *testing.T) { - err := evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)") + err = evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)") assert.True(t, err.IsNonceTooLowError(clientErrors)) assert.False(t, err.IsTransactionAlreadyInMempool(clientErrors)) assert.False(t, err.Fatal(clientErrors)) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 032751f74e5..c6c342973bb 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -529,11 +529,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0 }), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once() - // Do the thing + // Start processing unstarted transactions retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) - + dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State) diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 72219fcc4f7..eaf79b6aba7 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2713,7 +2713,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing. return tx.Nonce() == uint64(*etx.Sequence) }), fromAddress).Return(commonclient.Successful, nil).Once() - // Do the thing + // Start processing transactions for rebroadcast require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead)) var err error etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) From 6003fed45c60553d6dd6d236c68a09c72fbcfbdb Mon Sep 17 00:00:00 2001 From: amit-momin Date: Wed, 14 Aug 2024 16:11:30 -0500 Subject: [PATCH 4/5] Restructured broadcaster code for fatal and terminally stuck case --- common/txmgr/broadcaster.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 2fcb7024199..be3d3ca2f61 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -493,16 +493,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount) } - if errType != client.Fatal && errType != client.TerminallyStuck { - etx.InitialBroadcastAt = &initialBroadcastAt - etx.BroadcastAt = &initialBroadcastAt - } - - switch errType { - case client.Fatal, client.TerminallyStuck: + if errType == client.Fatal || errType == client.TerminallyStuck { eb.SvcErrBuffer.Append(err) etx.Error = null.StringFrom(err.Error()) return eb.saveFatallyErroredTransaction(lgr, &etx), true + } + + etx.InitialBroadcastAt = &initialBroadcastAt + etx.BroadcastAt = &initialBroadcastAt + + switch errType { case client.TransactionAlreadyKnown: fallthrough case client.Successful: From d786f17e6127d08970f29a7b470e06e2a272008e Mon Sep 17 00:00:00 2001 From: amit-momin Date: Wed, 14 Aug 2024 17:03:42 -0500 Subject: [PATCH 5/5] Reduced log levels for terminally stuck transactions --- common/txmgr/confirmer.go | 2 +- core/chains/evm/client/errors.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index e546ed18c33..d67bd451228 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -1002,7 +1002,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han case client.TerminallyStuck: // A transaction could broadcast successfully but then be considered terminally stuck on another attempt // Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error - lggr.Errorw("terminally stuck transaction detected", "err", sendError.Error()) + lggr.Warnw("terminally stuck transaction detected", "err", sendError.Error()) ec.SvcErrBuffer.Append(sendError) // Create a purge attempt for tx purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr) diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index a298cc82031..e7fff8d0dbc 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -597,7 +597,7 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger. return commonclient.ExceedsMaxFee } if sendError.IsTerminallyStuckConfigError(configErrors) { - lggr.Criticalw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx) + lggr.Warnw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx) // Attempt is thrown away in this case; we don't need it since it never got accepted by a node return commonclient.TerminallyStuck }