Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle terminally stuck transactions on send #14127

5 changes: 5 additions & 0 deletions .changeset/yellow-cougars-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added client error classification for terminally stuck transactions in the TXM #internal
14 changes: 7 additions & 7 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount)
}

if errType != client.Fatal {
etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt
}

switch errType {
case client.Fatal:
if errType == client.Fatal || errType == client.TerminallyStuck {
eb.SvcErrBuffer.Append(err)
etx.Error = null.StringFrom(err.Error())
return eb.saveFatallyErroredTransaction(lgr, &etx), true
}

etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt

switch errType {
case client.TransactionAlreadyKnown:
fallthrough
case client.Successful:
Expand Down
17 changes: 16 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro
go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
defer wg.Done()
lggr := tx.GetLogger(ec.lggr)
// Create an purge attempt for tx
// Create a purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr)
if err != nil {
errMu.Lock()
Expand Down Expand Up @@ -999,6 +999,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
ec.SvcErrBuffer.Append(sendError)
// This will loop continuously on every new head so it must be handled manually by the node operator!
return ec.txStore.DeleteInProgressAttempt(ctx, attempt)
case client.TerminallyStuck:
// A transaction could broadcast successfully but then be considered terminally stuck on another attempt
// Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error
lggr.Errorw("terminally stuck transaction detected", "err", sendError.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the criteria for a Error log vs a Critical log?

Copy link
Contributor

@huangzhen1997 huangzhen1997 Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my understanding this (tx stuck due to overflow, not enough keccak counters to continue the execution) is expected behavior, as least for zkSync. When it happens we just need to cancel/purge the existing tx by reprocessing, and it's not critical/fatal issue related to chainlink node that we need to raise alert, critial/fatal one for example: Invariant violation: fatal error while re-attempting transaction should not happen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my understanding as well. Since the TXM resolves this on its own, we don't have to raise a signal for NOPs to take any actions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I would say, change log level also to a warn.
The Tx is bad, and is nothing wrong with the TXM.

ec.SvcErrBuffer.Append(sendError)
// Create a purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr)
if err != nil {
return fmt.Errorf("NewPurgeTxAttempt failed: %w", err)
}
// Replace the in progress attempt with the purge attempt
if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &purgeAttempt); err != nil {
return fmt.Errorf("saveReplacementInProgressAttempt failed: %w", err)
}
return ec.handleInProgressAttempt(ctx, lggr, etx, purgeAttempt, blockHeight)
case client.TransactionAlreadyKnown:
// Sequence too low indicated that a transaction at this sequence was confirmed already.
// Mark confirmed_missing_receipt and wait for the next cycle to try to get a receipt
Expand Down
5 changes: 5 additions & 0 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
)
return commonclient.ExceedsMaxFee
}
if sendError.IsTerminallyStuckConfigError(configErrors) {
lggr.Criticalw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be critical log. Should be a warning.
Even the Errorw() log is for cases where we clearly see a failure, although ones that we can recover from.
From example, an important RPC failed, or database writing failed.

A stuck tx is now an expected behavior, so warn log is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do! I was just trying to match the behavior for Fatal. Think we're planning to rework logs in the near future so that might change anyways

// Attempt is thrown away in this case; we don't need it since it never got accepted by a node
return commonclient.TerminallyStuck
}
lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx)
return commonclient.Unknown
}
22 changes: 20 additions & 2 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func Test_Eth_Errors(t *testing.T) {
})

t.Run("Metis gas price errors", func(t *testing.T) {
err := evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
err = evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
assert.True(t, err.L2FeeTooLow(clientErrors))
err = newSendErrorWrapped("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
assert.True(t, err.L2FeeTooLow(clientErrors))
Expand All @@ -302,7 +302,7 @@ func Test_Eth_Errors(t *testing.T) {
})

t.Run("moonriver errors", func(t *testing.T) {
err := evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)")
err = evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)")
assert.True(t, err.IsNonceTooLowError(clientErrors))
assert.False(t, err.IsTransactionAlreadyInMempool(clientErrors))
assert.False(t, err.Fatal(clientErrors))
Expand All @@ -311,6 +311,24 @@ func Test_Eth_Errors(t *testing.T) {
assert.False(t, err.IsNonceTooLowError(clientErrors))
assert.False(t, err.Fatal(clientErrors))
})

t.Run("IsTerminallyStuck", func(t *testing.T) {
tests := []errorCase{
{"failed to add tx to the pool: not enough step counters to continue the execution", true, "zkEVM"},
{"failed to add tx to the pool: not enough step counters to continue the execution", true, "Xlayer"},
{"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "zkEVM"},
{"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "Xlayer"},
}

for _, test := range tests {
t.Run(test.network, func(t *testing.T) {
err = evmclient.NewSendErrorS(test.message)
assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect)
err = newSendErrorWrapped(test.message)
assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect)
})
}
})
}

func Test_Eth_Errors_Fatal(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,25 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) {
assert.True(t, ethTx.Error.Valid)
assert.Equal(t, "transaction reverted during simulation: json-rpc error { Code = 42, Message = 'oh no, it reverted', Data = 'KqYi' }", ethTx.Error.String)
})

t.Run("terminally stuck transaction is marked as fatal", func(t *testing.T) {
terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution"
etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, []byte{42, 42, 0}, gasLimit, big.Int(assets.NewEthValue(243)), testutils.FixtureChainID)
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool {
return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0
}), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once()

// Start processing unstarted transactions
retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress)
assert.NoError(t, err)
assert.False(t, retryable)

dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)
assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State)
assert.True(t, dbTx.Error.Valid)
assert.Equal(t, terminallyStuckError, dbTx.Error.String)
})
})
}

Expand Down
52 changes: 52 additions & 0 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,58 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) {
})
}

func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].GasEstimator.PriceMax = assets.GWei(500)
})
txStore := cltest.NewTestTxStore(t, db)
ctx := tests.Context(t)

ethClient := testutils.NewEthClientMockWithDefaultChain(t)
ethKeyStore := cltest.NewKeyStore(t, db).Eth()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)

evmcfg := evmtest.NewChainScopedConfig(t, cfg)

// Use a mock keystore for this test
ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil)
currentHead := int64(30)
oldEnough := int64(19)
nonce := int64(0)
terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution"

t.Run("terminally stuck transaction replaced with purge attempt", func(t *testing.T) {
originalBroadcastAt := time.Unix(1616509100, 0)
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt)
nonce++
attempt1_1 := etx.TxAttempts[0]
var dbAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID))

// Return terminally stuck error on first rebroadcast
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
return tx.Nonce() == uint64(*etx.Sequence)
}), fromAddress).Return(commonclient.TerminallyStuck, errors.New(terminallyStuckError)).Once()
// Return successful for purge attempt
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
return tx.Nonce() == uint64(*etx.Sequence)
}), fromAddress).Return(commonclient.Successful, nil).Once()

// Start processing transactions for rebroadcast
require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead))
var err error
etx, err = txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)

require.Len(t, etx.TxAttempts, 2)
purgeAttempt := etx.TxAttempts[0]
require.True(t, purgeAttempt.IsPurgeAttempt)
})
}

func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) {
t.Parallel()

Expand Down
22 changes: 22 additions & 0 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) {
idempotencyKey := uuid.New().String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
// Test the internal terminally stuck error returns Fatal
nonce := evmtypes.Nonce(0)
broadcast := time.Now()
tx := &txmgr.Tx{
Expand All @@ -804,6 +805,27 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)

// Test a terminally stuck client error returns Fatal
nonce = evmtypes.Nonce(1)
idempotencyKey = uuid.New().String()
terminallyStuckClientError := "failed to add tx to the pool: not enough step counters to continue the execution"
tx = &txmgr.Tx{
Sequence: &nonce,
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
FeeLimit: feeLimit,
State: txmgrcommon.TxFatalError,
Error: null.NewString(terminallyStuckClientError, true),
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
}
err = txStore.InsertTx(ctx, tx)
require.NoError(t, err)
state, err = txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)
})

t.Run("returns failed for fatal error state with other error", func(t *testing.T) {
Expand Down
Loading