Skip to content

Commit

Permalink
relayer/pkg/chainlink/txm: resync nonce on rejection and broadcast er…
Browse files Browse the repository at this point in the history
…rors (#391)
  • Loading branch information
cfal authored Apr 3, 2024
1 parent 2c626d9 commit 85200af
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 14 deletions.
97 changes: 84 additions & 13 deletions relayer/pkg/chainlink/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,30 @@ func (txm *starktxm) broadcastLoop() {
const FEE_MARGIN uint32 = 115
const RPC_NONCE_ERR = "Invalid transaction nonce"

func (txm *starktxm) estimateFriFee(ctx context.Context, client *starknet.Client, accountAddress *felt.Felt, tx starknetrpc.InvokeTxnV3) (*starknetrpc.FeeEstimate, error) {
func (txm *starktxm) estimateFriFee(ctx context.Context, client *starknet.Client, accountAddress *felt.Felt, tx starknetrpc.InvokeTxnV3) (*starknetrpc.FeeEstimate, *felt.Felt, error) {
// skip prevalidation, which is known to overestimate amount of gas needed and error with L1GasBoundsExceedsBalance
simFlags := []starknetrpc.SimulationFlag{starknetrpc.SKIP_VALIDATE}

var largestEstimateNonce *felt.Felt

for i := 1; i <= 5; i++ {
txm.lggr.Infow("attempt to estimate fee", "attempt", i)

estimateNonce, err := client.AccountNonce(ctx, accountAddress)
if err != nil {
return nil, fmt.Errorf("failed to check account nonce: %+w", err)
return nil, nil, fmt.Errorf("failed to check account nonce: %+w", err)
}
tx.Nonce = estimateNonce

if largestEstimateNonce == nil || estimateNonce.Cmp(largestEstimateNonce) > 0 {
largestEstimateNonce = estimateNonce
}

feeEstimate, err := client.Provider.EstimateFee(ctx, []starknetrpc.BroadcastTxn{tx}, simFlags, starknetrpc.BlockID{Tag: "pending"})
if err != nil {
var dataErr ethrpc.DataError
if !errors.As(err, &dataErr) {
return nil, fmt.Errorf("failed to read EstimateFee error: %T %+v", err, err)
return nil, nil, fmt.Errorf("failed to read EstimateFee error: %T %+v", err, err)
}
data := dataErr.ErrorData()
dataStr := fmt.Sprintf("%+v", data)
Expand All @@ -147,26 +153,26 @@ func (txm *starktxm) estimateFriFee(ctx context.Context, client *starknet.Client
continue
}

return nil, fmt.Errorf("failed to estimate fee: %T %+v", err, err)
return nil, nil, fmt.Errorf("failed to estimate fee: %T %+v", err, err)
}

// track the FRI estimate, but keep looping so we print out all estimates
var friEstimate *starknetrpc.FeeEstimate
for j, f := range feeEstimate {
txm.lggr.Infow("Estimated fee", "attempt", i, "index", j, "GasConsumed", f.GasConsumed.String(), "GasPrice", f.GasPrice.String(), "OverallFee", f.OverallFee.String(), "FeeUnit", string(f.FeeUnit))
txm.lggr.Infow("Estimated fee", "attempt", i, "index", j, "EstimateNonce", estimateNonce, "GasConsumed", f.GasConsumed.String(), "GasPrice", f.GasPrice.String(), "OverallFee", f.OverallFee.String(), "FeeUnit", string(f.FeeUnit))
if f.FeeUnit == "FRI" {
friEstimate = &feeEstimate[j]
}
}
if friEstimate != nil {
return friEstimate, nil
return friEstimate, largestEstimateNonce, nil
}

txm.lggr.Errorw("No FRI estimate was returned", "attempt", i)
}

txm.lggr.Errorw("all attempts to estimate fee failed")
return nil, fmt.Errorf("all attempts to estimate fee failed")
return nil, nil, fmt.Errorf("all attempts to estimate fee failed")
}

func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accountAddress *felt.Felt, call starknetrpc.FunctionCall) (txhash string, err error) {
Expand Down Expand Up @@ -225,11 +231,24 @@ func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accoun
return txhash, err
}

friEstimate, err := txm.estimateFriFee(ctx, client, accountAddress, tx)
friEstimate, largestEstimateNonce, err := txm.estimateFriFee(ctx, client, accountAddress, tx)
if err != nil {
return txhash, fmt.Errorf("failed to get FRI estimate: %+w", err)
}

nonce := txStore.GetNextNonce()
if largestEstimateNonce.Cmp(nonce) > 0 {
// The nonce value returned from the node during estimation is greater than our expected next nonce
// - which means that we are behind, due to a resync. Fast forward our locally tracked nonce value.
// See resyncNonce for a more detailed explanation.
staleTxs := txStore.SetNextNonce(largestEstimateNonce)
txm.lggr.Infow("fast-forwarding nonce after resync", "previousNonce", nonce, "updatedNonce", largestEstimateNonce, "staleTxs", len(staleTxs))
if len(staleTxs) > 0 {
txm.lggr.Errorw("unexpected stale transactions after nonce fast-forward", "accountAddress", accountAddress)
}
nonce = largestEstimateNonce
}

// TODO: consider making this configurable
// pad estimate to 250% (add extra because estimate did not include validation)
gasConsumed := friEstimate.GasConsumed.BigInt(new(big.Int))
Expand All @@ -245,8 +264,6 @@ func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accoun

txm.lggr.Infow("Set resource bounds", "L1MaxAmount", tx.ResourceBounds.L1Gas.MaxAmount, "L1MaxPricePerUnit", tx.ResourceBounds.L1Gas.MaxPricePerUnit)

nonce := txStore.GetNextNonce()

tx.Nonce = nonce
// Re-sign transaction now that we've determined MaxFee
// TODO: SignInvokeTransaction for V3 is missing so we do it by hand
Expand All @@ -269,11 +286,21 @@ func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accoun
// TODO: handle initial broadcast errors - what kind of errors occur?
var dataErr ethrpc.DataError
var dataStr string
if errors.As(err, &dataErr) {
data := dataErr.ErrorData()
dataStr = fmt.Sprintf("%+v", data)
if !errors.As(err, &dataErr) {
return txhash, fmt.Errorf("failed to read EstimateFee error: %T %+v", err, err)
}
data := dataErr.ErrorData()
dataStr = fmt.Sprintf("%+v", data)
txm.lggr.Errorw("failed to invoke tx", "accountAddress", accountAddress, "error", err, "data", dataStr)

if strings.Contains(dataStr, RPC_NONCE_ERR) {
// if we see an invalid nonce error at the broadcast stage, that means that we are out of sync.
// see the comment at resyncNonce for more details.
if resyncErr := txm.resyncNonce(ctx, client, accountAddress); resyncErr != nil {
txm.lggr.Errorw("failed to resync nonce after unsuccessful invoke", "error", err, "resyncError", resyncErr)
return txhash, fmt.Errorf("failed to resync after bad invoke: %+w", err)
}
}
return txhash, fmt.Errorf("failed to invoke tx: %+w", err)
}
// handle nil pointer
Expand Down Expand Up @@ -349,6 +376,12 @@ func (txm *starktxm) confirmLoop() {

// currently, feeder client is only way to get rejected reason
if finalityStatus == starknetrpc.TxnStatus_Rejected {
// we assume that all rejected transactions results in a unused rejected nonce, so
// resync. see the comment at resyncNonce for more details.
if resyncErr := txm.resyncNonce(ctx, client, accountAddress); resyncErr != nil {
txm.lggr.Errorw("resync failed for rejected tx", "error", resyncErr)
}

go txm.logFeederError(ctx, hash, f)
}

Expand Down Expand Up @@ -383,6 +416,44 @@ func (txm *starktxm) logFeederError(ctx context.Context, hash string, f *felt.Fe
txm.lggr.Errorw("feeder rejected reason", "hash", hash, "errorMessage", rejectedTx.ErrorMessage)
}

func (txm *starktxm) resyncNonce(ctx context.Context, client *starknet.Client, accountAddress *felt.Felt) error {
/*
the follow errors indicate that there could be a problem with our locally tracked nonce value:
1. a EstimateFee was successful, but broadcasting using the locally tracked nonce results in a nonce error,
2. a transaction was rejected after a successful broadcast.
for these cases, we call starknet_getNonce from the RPC node and resync the locally tracked next nonce
with the RPC node's value.
however, while the value returned by starknet_getNonce is eventually consistent, it can be lower than the actual
next nonce value when pending transactions haven't yet been processed - resulting in more category 1
invalid nonce broadcast errors.
in order to recover from these cases, each time we do starknet_getNonce during estimation (see estimateFriFee),
we compare it with our locally tracked nonce - if it is greater, than that means our locally tracked value is
behind, and we fast forward. this ensures our locally tracked value will also eventually be correct.
*/

rpcNonce, err := client.AccountNonce(ctx, accountAddress)
if err != nil {
return fmt.Errorf("failed to check nonce during resync: %+w", err)
}

txStore := txm.accountStore.GetTxStore(accountAddress)
currentNonce := txStore.GetNextNonce()

if rpcNonce.Cmp(currentNonce) == 0 {
txm.lggr.Infow("resync nonce skipped, nonce value is the same", "accountAddress", accountAddress, "nonce", currentNonce)
return nil
}

staleTxs := txStore.SetNextNonce(rpcNonce)

txm.lggr.Infow("resynced nonce", "accountAddress", "accountAddress", "previousNonce", currentNonce, "updatedNonce", rpcNonce, "staleTxCount", len(staleTxs))

return nil
}

func (txm *starktxm) Close() error {
return txm.starter.StopOnce("starktxm", func() error {
close(txm.stop)
Expand Down
12 changes: 11 additions & 1 deletion relayer/pkg/chainlink/txm/txstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,28 @@ func NewTxStore(initialNonce *felt.Felt) *TxStore {
}
}

func (s *TxStore) SetNextNonce(newNextNonce *felt.Felt) {
func (s *TxStore) SetNextNonce(newNextNonce *felt.Felt) []*UnconfirmedTx {
s.lock.Lock()
defer s.lock.Unlock()

staleTxs := []*UnconfirmedTx{}
s.nextNonce = new(felt.Felt).Set(newNextNonce)

// Remove any stale transactions with nonces greater than the new next nonce.
for nonceStr, tx := range s.unconfirmedNonces {
if tx.Nonce.Cmp(s.nextNonce) >= 0 {
staleTxs = append(staleTxs, tx)
delete(s.unconfirmedNonces, nonceStr)
}
}

sort.Slice(staleTxs, func(i, j int) bool {
a := staleTxs[i]
b := staleTxs[j]
return a.Nonce.Cmp(b.Nonce) < 0
})

return staleTxs
}

func (s *TxStore) GetNextNonce() *felt.Felt {
Expand Down
41 changes: 41 additions & 0 deletions relayer/pkg/chainlink/txm/txstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestTxStore(t *testing.T) {
publicKey := new(felt.Felt).SetUint64(7)

s := NewTxStore(nonce)
assert.True(t, s.GetNextNonce().Cmp(nonce) == 0)
assert.Equal(t, 0, s.InflightCount())
require.NoError(t, s.AddUnconfirmed(nonce, "0x42", call, publicKey))
assert.Equal(t, 1, s.InflightCount())
Expand Down Expand Up @@ -142,6 +143,46 @@ func TestTxStore(t *testing.T) {
assert.True(t, !errors.Is(err0, err1) && ((err0 != nil && err1 == nil) || (err0 == nil && err1 != nil)))
assert.Equal(t, 0, s.InflightCount())
})

t.Run("resync", func(t *testing.T) {
t.Parallel()

call := starknetrpc.FunctionCall{
ContractAddress: new(felt.Felt).SetUint64(0),
EntryPointSelector: new(felt.Felt).SetUint64(0),
}

publicKey := new(felt.Felt).SetUint64(7)
txCount := 6

// init store
s := NewTxStore(new(felt.Felt).SetUint64(0))
for i := 0; i < txCount; i++ {
require.NoError(t, s.AddUnconfirmed(new(felt.Felt).SetUint64(uint64(i)), "0x"+fmt.Sprintf("%d", i), call, publicKey))
}
assert.Equal(t, s.InflightCount(), txCount)

staleTxs := s.SetNextNonce(new(felt.Felt).SetUint64(0))

assert.Equal(t, len(staleTxs), txCount)
for i := 0; i < txCount; i++ {
staleTx := staleTxs[i]
assert.Equal(t, staleTx.Nonce.Cmp(new(felt.Felt).SetUint64(uint64(i))), 0)
assert.Equal(t, staleTx.Call, call)
assert.Equal(t, staleTx.PublicKey.Cmp(publicKey), 0)
assert.Equal(t, staleTx.Hash, "0x"+fmt.Sprintf("%d", i))
}
assert.Equal(t, s.InflightCount(), 0)

for i := 0; i < txCount; i++ {
require.NoError(t, s.AddUnconfirmed(new(felt.Felt).SetUint64(uint64(i)), "0x"+fmt.Sprintf("%d", i), call, publicKey))
}

newNextNonce := uint64(txCount - 1)
staleTxs = s.SetNextNonce(new(felt.Felt).SetUint64(newNextNonce))
assert.Equal(t, len(staleTxs), 1)
assert.Equal(t, staleTxs[0].Nonce.Cmp(new(felt.Felt).SetUint64(newNextNonce)), 0)
})
}

func TestAccountStore(t *testing.T) {
Expand Down

0 comments on commit 85200af

Please sign in to comment.