From 1791b162fe31e120e85d5c59930dda08c10aaa50 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Tue, 17 Sep 2024 21:59:53 -0500 Subject: [PATCH] refactor tx sending Refactor sending evm txs so that we store the txs before sending and are accepting of send errors. This avoids a scenario where we send the tx, but still receive an error, perhaps because of a bad connection or other problem with the rpc provider. Implement a send queue so that the caller doesn't have to wait for the send. This should speed up ticks in core if the rpc provider is functional but slow to respond. --- client/asset/eth/deploy.go | 6 +- client/asset/eth/eth.go | 177 +++++++++++++++----- client/asset/eth/eth_test.go | 134 ++++++++++----- client/asset/eth/multirpc.go | 22 +-- client/asset/eth/multirpc_test_util.go | 8 +- client/asset/eth/node.go | 47 ++++-- client/asset/eth/nodeclient_harness_test.go | 34 ++-- client/asset/eth/txdb.go | 4 + 8 files changed, 289 insertions(+), 143 deletions(-) diff --git a/client/asset/eth/deploy.go b/client/asset/eth/deploy.go index 975f155ec2..c8c3a6508a 100644 --- a/client/asset/eth/deploy.go +++ b/client/asset/eth/deploy.go @@ -351,7 +351,11 @@ func (contractDeployer) nodeAndRate( return nil, nil, nil, fmt.Errorf("error creating wallet: %w", err) } - cl, err := newMultiRPCClient(walletDir, providers, log, chainCfg, 3, net) + creds, err := walletCredentials(chainCfg.ChainID, walletDir, net) + if err != nil { + return nil, nil, nil, fmt.Errorf("error generating wallet credentials: %w", err) + } + cl, err := newMultiRPCClient(creds, providers, log, chainCfg, 3, net) if err != nil { return nil, nil, nil, fmt.Errorf("error creating rpc client: %w", err) } diff --git a/client/asset/eth/eth.go b/client/asset/eth/eth.go index 07579b3b21..69ce0d23ce 100644 --- a/client/asset/eth/eth.go +++ b/client/asset/eth/eth.go @@ -121,6 +121,7 @@ const ( maxUnindexedTxs = 10 peerCountTicker = 5 * time.Second // no rpc calls here contractVersionNewest = ^uint32(0) + maxQueuedSends = 20 ) var ( @@ -372,7 +373,6 @@ type ethFetcher interface { locked() bool shutdown() sendSignedTransaction(ctx context.Context, tx *types.Transaction, filts ...acceptabilityFilter) error - sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) signData(data []byte) (sig, pubKey []byte, err error) syncProgress(context.Context) (progress *ethereum.SyncProgress, tipTime uint64, err error) transactionConfirmations(context.Context, common.Hash) (uint32, error) @@ -403,6 +403,12 @@ type cachedBalance struct { bal *big.Int } +type queuedSend struct { + et *extendedWalletTx + tx *types.Transaction + filts []acceptabilityFilter +} + // Check that assetWallet satisfies the asset.Wallet interface. var _ asset.Wallet = (*ETHWallet)(nil) var _ asset.Wallet = (*TokenWallet)(nil) @@ -427,6 +433,7 @@ type baseWallet struct { ctx context.Context net dex.Network node ethFetcher + creds *accountCredentials addr common.Address log dex.Logger dir string @@ -475,6 +482,10 @@ type baseWallet struct { } txDB txDB + + sendQ chan *queuedSend + queuedSends atomic.Uint32 + dontQSends bool } // assetWallet is a wallet backend for Ethereum and Eth tokens. The backend is @@ -759,6 +770,12 @@ func NewEVMWallet(cfg *EVMWalletConfig) (w *ETHWallet, err error) { if gasFeeLimit == 0 { gasFeeLimit = defaultGasFeeLimit } + + creds, err := walletCredentials(cfg.ChainCfg.ChainID, cfg.AssetCfg.DataDir, cfg.Net) + if err != nil { + return nil, fmt.Errorf("failed to generate wallet credentials: %w", err) + } + eth := &baseWallet{ net: cfg.Net, baseChainID: cfg.BaseChainID, @@ -768,12 +785,15 @@ func NewEVMWallet(cfg *EVMWalletConfig) (w *ETHWallet, err error) { tokens: cfg.Tokens, log: cfg.Logger, dir: cfg.AssetCfg.DataDir, + creds: creds, + addr: creds.addr, walletType: cfg.AssetCfg.Type, finalizeConfs: cfg.FinalizeConfs, settings: cfg.AssetCfg.Settings, gasFeeLimitV: gasFeeLimit, wallets: make(map[uint32]*assetWallet), multiBalanceAddress: cfg.MultiBalAddress, + sendQ: make(chan *queuedSend, maxQueuedSends), } var maxSwapGas, maxRedeemGas uint64 @@ -852,7 +872,7 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) if providerDef, found := w.settings[providersKey]; found && len(providerDef) > 0 { endpoints = strings.Split(providerDef, " ") } - rpcCl, err := newMultiRPCClient(w.dir, endpoints, w.log.SubLogger("RPC"), w.chainCfg, w.finalizeConfs, w.net) + rpcCl, err := newMultiRPCClient(w.creds, endpoints, w.log.SubLogger("RPC"), w.chainCfg, w.finalizeConfs, w.net) if err != nil { return nil, err } @@ -863,7 +883,6 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) } w.node = cl - w.addr = cl.address() w.ctx = ctx // TokenWallet will re-use this ctx. err = w.node.connect(ctx) @@ -971,6 +990,12 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) w.connected.Store(false) }() + wg.Add(1) + go func() { + defer wg.Done() + w.runSendQueue(ctx) + }() + return &wg, nil } @@ -1009,6 +1034,43 @@ func (w *baseWallet) tipHeight() uint64 { return w.currentTip.Number.Uint64() } +func (w *baseWallet) runSendQueue(ctx context.Context) { + queued := make([]*queuedSend, 0, maxQueuedSends) + for { + if ctx.Err() != nil { + return + } + w.queuedSends.Store(uint32(len(queued))) + if len(queued) > 0 { + qt := queued[0] + queued = queued[1:] + if err := w.node.sendSignedTransaction(ctx, qt.tx, qt.filts...); err != nil { + w.log.Errorf("error sending queued transaction %s: %v", qt.et.txHash, err) + } + qt.et.initialSendComplete.Store(true) + continue + } + select { + case qt := <-w.sendQ: + queued = append(queued, qt) + case <-ctx.Done(): + return + } + } +} + +func (w *baseWallet) queueForSending(tx *types.Transaction, et *extendedWalletTx, filts ...acceptabilityFilter) error { + if w.dontQSends { + return w.node.sendSignedTransaction(w.ctx, tx, filts...) + } + w.sendQ <- &queuedSend{ + tx: tx, + et: et, + filts: filts, + } + return nil +} + // Reconfigure attempts to reconfigure the wallet. func (w *ETHWallet) Reconfigure(ctx context.Context, cfg *asset.WalletConfig, currentAddress string) (restart bool, err error) { walletCfg, err := parseWalletConfig(cfg.Settings) @@ -1087,15 +1149,15 @@ func (eth *baseWallet) gasFeeLimit() uint64 { // type specifier, and its value. type transactionGenerator func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) -// withNonce is called with a function intended to generate a new transaction -// using the next available nonce. If the function returns a non-nil tx, the -// nonce will be treated as used, and an extendedWalletTransaction will be -// generated, stored, and queued for monitoring. -func (w *assetWallet) withNonce(ctx context.Context, f transactionGenerator) (err error) { +// generateNoncedTx is called with a function intended to generate a new +// transaction using the next available nonce. If the function returns a non-nil +// tx, the nonce will be treated as used, and an extendedWalletTransaction will +// be generated, stored, and queued for monitoring. +func (w *assetWallet) generateNoncedTx(ctx context.Context, f transactionGenerator) (tx *types.Transaction, et *extendedWalletTx, err error) { w.nonceMtx.Lock() defer w.nonceMtx.Unlock() if err = nonceIsSane(w.pendingTxs, w.pendingNonceAt); err != nil { - return err + return nil, nil, err } nonce := func() *big.Int { n := new(big.Int).Set(w.confirmedNonceAt) @@ -1121,7 +1183,7 @@ func (w *assetWallet) withNonce(ctx context.Context, f transactionGenerator) (er w.log.Warnf("Too-low nonce detected. Attempting recovery") confirmedNonceAt, pendingNonceAt, err := w.node.nonce(ctx) if err != nil { - return fmt.Errorf("error during too-low nonce recovery: %v", err) + return nil, nil, fmt.Errorf("error during too-low nonce recovery: %v", err) } w.confirmedNonceAt = confirmedNonceAt w.pendingNonceAt = pendingNonceAt @@ -1130,16 +1192,16 @@ func (w *assetWallet) withNonce(ctx context.Context, f transactionGenerator) (er // Try again. tx, txType, amt, recipient, err = f(n) if err != nil { - return err + return nil, nil, err } w.log.Info("Nonce recovered and transaction broadcast") } else { - return fmt.Errorf("best RPC nonce %d not better than our best nonce %d", newNonce, n) + return nil, nil, fmt.Errorf("best RPC nonce %d not better than our best nonce %d", newNonce, n) } } if tx != nil { - et := w.extendedTx(tx, txType, amt, recipient) + et = w.extendedTx(tx, txType, amt, recipient) w.pendingTxs = append(w.pendingTxs, et) if n.Cmp(w.pendingNonceAt) >= 0 { w.pendingNonceAt.Add(n, big.NewInt(1)) @@ -1147,7 +1209,22 @@ func (w *assetWallet) withNonce(ctx context.Context, f transactionGenerator) (er w.emitTransactionNote(et.WalletTransaction, true) w.log.Tracef("Transaction %s generated for nonce %s", et.ID, n) } - return err + return tx, et, err +} + +// sendWithNewNonce generates a transaction with a new nonce and attempts to +// send it. If the tx is successfully generated by the transaction generator, +// it will be recorded for monitoring and the nonce will be marked as used +// regardless of whether the send operation was successful. +func (w *assetWallet) sendWithNewNonce(ctx context.Context, f transactionGenerator) error { + if queuedN := w.queuedSends.Load(); queuedN > maxQueuedSends { + return errors.New("too many transactions queued for sending") + } + tx, et, err := w.generateNoncedTx(ctx, f) + if err != nil { + return err + } + return w.queueForSending(tx, et) } // nonceIsSane performs sanity checks on pending txs. @@ -2535,7 +2612,7 @@ func (w *assetWallet) tokenAllowance(version uint32) (allowance *big.Int, err er // approveToken approves the token swap contract to spend tokens on behalf of // account handled by the wallet. func (w *assetWallet) approveToken(ctx context.Context, amount *big.Int, gasLimit uint64, maxFeeRate, tipRate *big.Int, contractVer uint32) (tx *types.Transaction, err error) { - return tx, w.withNonce(ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { + return tx, w.sendWithNewNonce(ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { txOpts, err := w.node.txOpts(w.ctx, 0, gasLimit, maxFeeRate, tipRate, nonce) if err != nil { return nil, 0, 0, nil, fmt.Errorf("addSignerToOpts error: %w", err) @@ -4192,7 +4269,7 @@ func (w *ETHWallet) sendToAddr(addr common.Address, amt uint64, maxFeeRate, tipR // defer w.borkNonce(tx) // } - return tx, w.withNonce(w.ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { + return tx, w.sendWithNewNonce(w.ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { // Uncomment here and above to test actionTypeMissingNonces. // if nonceFuturized.CompareAndSwap(false, true) { @@ -4211,7 +4288,7 @@ func (w *ETHWallet) sendToAddr(addr common.Address, amt uint64, maxFeeRate, tipR if err != nil { return nil, 0, 0, nil, err } - tx, err = w.node.sendTransaction(w.ctx, txOpts, addr, nil) + tx, err = w.creds.signedTx(txOpts, addr, nil) if err != nil { return nil, 0, 0, nil, err } @@ -4230,7 +4307,7 @@ func (w *TokenWallet) sendToAddr(addr common.Address, amt uint64, maxFeeRate, ti if g == nil { return nil, fmt.Errorf("no gas table") } - return tx, w.withNonce(w.ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { + return tx, w.sendWithNewNonce(w.ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { txOpts, err := w.node.txOpts(w.ctx, 0, g.Transfer, maxFeeRate, tipRate, nonce) if err != nil { return nil, 0, 0, nil, err @@ -4271,7 +4348,7 @@ func (w *assetWallet) initiate( val += c.Value } } - return tx, w.withNonce(ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { + return tx, w.sendWithNewNonce(ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { txOpts, err := w.node.txOpts(ctx, val, gasLimit, maxFeeRate, tipRate, nonce) if err != nil { return nil, 0, 0, nil, err @@ -4416,7 +4493,7 @@ func (w *assetWallet) redeem( // return types.NewTransaction(10, w.addr, big.NewInt(dexeth.GweiFactor), gasLimit, dexeth.GweiToWei(maxFeeRate), nil), nil // } - return tx, w.withNonce(ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { + return tx, w.sendWithNewNonce(ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { var amt uint64 for _, r := range redemptions { amt += r.Spends.Coin.Value() @@ -4446,7 +4523,7 @@ func (w *assetWallet) refund(secretHash [32]byte, amt uint64, maxFeeRate, tipRat if gas == nil { return nil, fmt.Errorf("no gas table for asset %d, version %d", w.assetID, contractVer) } - return tx, w.withNonce(w.ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { + return tx, w.sendWithNewNonce(w.ctx, func(nonce *big.Int) (*types.Transaction, asset.TransactionType, uint64, *string, error) { txOpts, err := w.node.txOpts(w.ctx, 0, gas.Refund, maxFeeRate, tipRate, nonce) if err != nil { return nil, 0, 0, nil, err @@ -4549,6 +4626,9 @@ func (w *baseWallet) missingNoncesActionID() string { // // w.nonceMtx must be held. func (w *baseWallet) updatePendingTx(tip uint64, pendingTx *extendedWalletTx) { + if !pendingTx.initialSendComplete.Load() { + return + } if pendingTx.Confirmed && pendingTx.savedToDB { return } @@ -4875,13 +4955,15 @@ func (w *assetWallet) userActionBumpFees(actionB []byte) error { return errors.New("pending tx has no recipient?") } - newTx, err := w.node.sendTransaction(w.ctx, txOpts, *addr, tx.Data()) + newTx, err := w.creds.signedTx(txOpts, *addr, tx.Data()) if err != nil { return fmt.Errorf("error sending bumped-fee transaction: %w", err) } newPendingTx := w.extendedTx(newTx, pendingTx.Type, pendingTx.Amount, pendingTx.Recipient) - + if err := w.queueForSending(newTx, newPendingTx); err != nil { + return fmt.Errorf("error queuing fee-bumped tx for sending: %w", err) + } pendingTx.NonceReplacement = newPendingTx.ID pendingTx.FeeReplacement = true @@ -4955,6 +5037,7 @@ func (w *assetWallet) userActionNonceReplacement(actionB []byte) error { } recipient := w.addr.Hex() newPendingTx := w.extendedTx(replacementTx, asset.Unknown, 0, &recipient) + newPendingTx.initialSendComplete.Store(true) pendingTx.NonceReplacement = newPendingTx.ID var oldTo, newTo common.Address if oldAddr := oldTx.To(); oldAddr != nil { @@ -5008,28 +5091,21 @@ func (w *assetWallet) userActionRecoverNonces(actionB []byte) error { if err != nil { return fmt.Errorf("error getting tx opts for nonce resolution: %v", err) } - var skip bool - tx, err := w.node.sendTransaction(w.ctx, txOpts, w.addr, nil, func(err error) (discard, propagate, fail bool) { - if errorFilter(err, "replacement transaction underpriced") { - skip = true - return true, false, false - } - return false, false, true - }) + tx, err := w.creds.signedTx(txOpts, w.addr, nil) if err != nil { - return fmt.Errorf("error sending tx %d for nonce resolution: %v", nonce, err) + return fmt.Errorf("error signing tx %d for nonce resolution: %v", nonce, err) } - if skip { - w.log.Warnf("skipping storing underpriced replacement tx for nonce %d", nonce) - } else { - recipient := w.addr.Hex() - pendingTx := w.extendAndStoreTx(tx, asset.SelfSend, 0, nil, &recipient) - w.emitTransactionNote(pendingTx.WalletTransaction, true) - w.pendingTxs = append(w.pendingTxs, pendingTx) - sort.Slice(w.pendingTxs, func(i, j int) bool { - return w.pendingTxs[i].Nonce.Cmp(w.pendingTxs[j].Nonce) < 0 - }) + if err := w.node.sendSignedTransaction(w.ctx, tx); err != nil { + return fmt.Errorf("error sending tx %d for nonce resolution: %v", nonce, err) } + recipient := w.addr.Hex() + pendingTx := w.extendAndStoreTx(tx, asset.SelfSend, 0, nil, &recipient) + pendingTx.initialSendComplete.Store(true) + w.emitTransactionNote(pendingTx.WalletTransaction, true) + w.pendingTxs = append(w.pendingTxs, pendingTx) + sort.Slice(w.pendingTxs, func(i, j int) bool { + return w.pendingTxs[i].Nonce.Cmp(w.pendingTxs[j].Nonce) < 0 + }) if i < len(missingNonces)-1 { select { case <-time.After(time.Second * 1): @@ -5336,7 +5412,12 @@ func quickNode(ctx context.Context, walletDir string, contractVer uint32, return nil, nil, fmt.Errorf("error creating initiator wallet: %v", err) } - cl, err := newMultiRPCClient(walletDir, providers, log, wParams.ChainCfg, 3, net) + creds, err := walletCredentials(wParams.ChainCfg.ChainID, walletDir, net) + if err != nil { + return nil, nil, fmt.Errorf("error getting wallet credentials: %w", err) + } + + cl, err := newMultiRPCClient(creds, providers, log, wParams.ChainCfg, 3, net) if err != nil { return nil, nil, fmt.Errorf("error opening initiator rpc client: %v", err) } @@ -5738,7 +5819,7 @@ func (getGas) returnFunds( if err != nil { return fmt.Errorf("error generating tx opts: %w", err) } - tx, err := cl.sendTransaction(ctx, txOpts, returnAddr, nil) + tx, err := cl.genSignAndSendTransaction(ctx, txOpts, returnAddr, nil) if err != nil { return fmt.Errorf("error sending funds: %w", err) } @@ -5833,7 +5914,7 @@ func (getGas) Estimate(ctx context.Context, net dex.Network, assetID, contractVe return fmt.Errorf("error creating tx opts for sending fees for approval client: %v", err) } - tx, err := cl.sendTransaction(ctx, txOpts, approvalClient.address(), nil) + tx, err := cl.genSignAndSendTransaction(ctx, txOpts, approvalClient.address(), nil) if err != nil { return fmt.Errorf("error sending fee reserves to approval client: %v", err) } @@ -6113,6 +6194,12 @@ func newTxOpts(ctx context.Context, from common.Address, val, maxGas uint64, max GasFeeCap: maxFeeRate, GasTipCap: gasTipCap, GasLimit: maxGas, + // NoSend is set to true so that our abigen methods won't actually send + // the transaction. They just generate and sign the tx. That way, we + // can store a record of the transaction locally before sending, so that + // if we somehow send the transaction but still get an error, we don't + // discard the tx and screw up our nonce ordering. + NoSend: true, } } diff --git a/client/asset/eth/eth_test.go b/client/asset/eth/eth_test.go index 36e7752fe6..211ccfe4d0 100644 --- a/client/asset/eth/eth_test.go +++ b/client/asset/eth/eth_test.go @@ -14,6 +14,7 @@ import ( "fmt" "math/big" "math/rand" + "os" "sort" "strings" "sync" @@ -29,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -74,6 +76,8 @@ var ( SwapConf: 1, } + tCreds *accountCredentials + tPriv *ecdsa.PrivateKey signer = types.LatestSigner(params.AllEthashProtocolChanges) // simBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ @@ -81,6 +85,63 @@ var ( // }, 1e9) ) +func generateTestCredentials() (string, error) { + privB, _, err := privKeyFromSeed(encode.RandomBytes(32)) + if err != nil { + return "", fmt.Errorf("error generating private key bytes: %w", err) + } + + tPriv, err = crypto.ToECDSA(privB) + if err != nil { + return "", fmt.Errorf("error making private key: %w", err) + } + + credsDir, err := os.MkdirTemp("", "") + if err != nil { + return "", fmt.Errorf("error making temp dir: %w", err) + } + + pw := []byte("abc") + + ks := keystore.NewKeyStore(credsDir, keystore.LightScryptN, keystore.LightScryptP) + if err := importKeyToKeyStore(ks, tPriv, pw); err != nil { + os.RemoveAll(credsDir) + return "", fmt.Errorf("error making temp dir: %w", err) + } + + tCreds, err = credentialsFromKeyStore(ks, params.AllEthashProtocolChanges.ChainID) + if err != nil { + os.RemoveAll(credsDir) + return "", fmt.Errorf("error generating credentials from keystore: %w", err) + } + + if err := importKeyToKeyStore(ks, tPriv, []byte("abc")); err != nil { + os.RemoveAll(credsDir) + return "", fmt.Errorf("error making temp dir: %w", err) + } + + if err := ks.Unlock(*tCreds.acct, string(pw)); err != nil { + os.RemoveAll(credsDir) + return "", fmt.Errorf("error unlocking keystore: %w", err) + } + + return credsDir, nil +} + +func TestMain(m *testing.M) { + doIt := func() int { + credsDir, err := generateTestCredentials() + if err != nil { + tLogger.Critical("Error generating test credentials: %v", err) + return 1 + } + defer os.RemoveAll(credsDir) + return m.Run() + } + + os.Exit(doIt()) +} + type tGetTxRes struct { tx *types.Transaction height int64 @@ -120,7 +181,6 @@ type testNode struct { hdrByHash *types.Header lastSignedTx *types.Transaction sentTxs int - sendTxTx *types.Transaction sendTxErr error simBackend bind.ContractBackend maxFeeRate *big.Int @@ -244,13 +304,11 @@ func (n *testNode) signData(data []byte) (sig, pubKey []byte, err error) { return sig, crypto.FromECDSAPub(&n.privKey.PublicKey), nil } -func (n *testNode) sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) { - n.sentTxs++ - return n.sendTxTx, n.sendTxErr -} + func (n *testNode) sendSignedTransaction(ctx context.Context, tx *types.Transaction, filts ...acceptabilityFilter) error { n.lastSignedTx = tx - return nil + n.sentTxs++ + return n.sendTxErr } func tTx(gasFeeCap, gasTipCap, value uint64, to *common.Address, data []byte, gasLimit uint64) *types.Transaction { @@ -694,6 +752,7 @@ func TestCheckPendingTxs(t *testing.T) { pendingTx.SubmissionTime = submissionStamp pendingTx.lastBroadcast = time.Unix(int64(submissionStamp), 0) pendingTx.lastFeeCheck = time.Unix(int64(submissionStamp), 0) + pendingTx.initialSendComplete.Store(true) return pendingTx } @@ -845,17 +904,16 @@ func TestTakeAction(t *testing.T) { pendingTx := eth.extendedTx(node.newTransaction(0, aGwei), asset.Send, 1, nil) eth.pendingTxs = []*extendedWalletTx{pendingTx} - - feeCap := new(big.Int).Mul(aGwei, big.NewInt(5)) - tipCap := new(big.Int).Mul(aGwei, big.NewInt(2)) - replacementTx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ - Nonce: 1, - GasTipCap: tipCap, - GasFeeCap: feeCap, - Gas: 50_000, - ChainID: node.chainConfig().ChainID, - }), signer, node.privKey) - node.sendTxTx = replacementTx + const tipHeight = 100 + eth.currentTip = &types.Header{Number: big.NewInt(tipHeight)} + tipRate := new(big.Int).Mul(aGwei, big.NewInt(2)) + c := ð.currentFees + c.Lock() + c.baseRate = new(big.Int).Mul(aGwei, big.NewInt(5)) + c.tipRate = tipRate + c.blockNum = tipHeight + maxFeeRate := new(big.Int).Add(c.tipRate, new(big.Int).Mul(c.baseRate, big.NewInt(2))) + c.Unlock() tooCheapAction := []byte(fmt.Sprintf(`{"txID":"%s","bump":true}`, pendingTx.ID)) if err := eth.TakeAction(actionTypeTooCheap, tooCheapAction); err != nil { @@ -867,11 +925,11 @@ func TestTakeAction(t *testing.T) { t.Fatal("tx wasn't replaced") } tx, _ := newPendingTx.tx() - if tx.GasFeeCap().Cmp(feeCap) != 0 { - t.Fatalf("wrong fee cap. wanted %s, got %s", feeCap, tx.GasFeeCap()) + if tx.GasFeeCap().Cmp(maxFeeRate) != 0 { + t.Fatalf("wrong fee cap. wanted %s, got %s", maxFeeRate, tx.GasFeeCap()) } - if tx.GasTipCap().Cmp(tipCap) != 0 { - t.Fatalf("wrong tip cap. wanted %s, got %s", tipCap, tx.GasTipCap()) + if tx.GasTipCap().Cmp(tipRate) != 0 { + t.Fatalf("wrong tip cap. wanted %s, got %s", tipRate, tx.GasTipCap()) } if !newPendingTx.savedToDB { t.Fatal("didn't save to DB") @@ -906,6 +964,9 @@ func TestTakeAction(t *testing.T) { t.Fatalf("Tx wasn't abandoned") } eth.pendingTxs = []*extendedWalletTx{pendingTx} + txOpts := newTxOpts(eth.ctx, eth.addr, 1, 1, big.NewInt(1), big.NewInt(1)) + txOpts.Nonce = pendingTx.Nonce + replacementTx, _ := eth.creds.signedTx(txOpts, eth.addr, nil) node.getTxRes = replacementTx lostNonceAction = []byte(fmt.Sprintf(`{"txID":"%s","abandon":false,"replacementID":"%s"}`, pendingTx.ID, replacementTx.Hash())) if err := eth.TakeAction(actionTypeLostNonce, lostNonceAction); err != nil { @@ -1083,12 +1144,6 @@ func TestSyncStatus(t *testing.T) { } func newTestNode(assetID uint32) *tMempoolNode { - privKey, _ := crypto.HexToECDSA("9447129055a25c8496fca9e5ee1b9463e47e6043ff0c288d07169e8284860e34") - addr := common.HexToAddress("2b84C791b79Ee37De042AD2ffF1A253c3ce9bc27") - acct := &accounts.Account{ - Address: addr, - } - tc := &tContractor{ gasEstimates: ethGases, swapMap: make(map[[32]byte]*dexeth.SwapState), @@ -1109,12 +1164,12 @@ func newTestNode(assetID uint32) *tMempoolNode { return &tMempoolNode{ testNode: &testNode{ - acct: acct, - addr: acct.Address, + acct: tCreds.acct, + addr: tCreds.acct.Address, maxFeeRate: dexeth.GweiToWei(100), baseFee: dexeth.GweiToWei(100), tip: dexeth.GweiToWei(2), - privKey: privKey, + privKey: tPriv, contractor: c, tContractor: tc, tokenContractor: ttc, @@ -1163,7 +1218,8 @@ func tassetWallet(assetID uint32) (asset.Wallet, *assetWallet, *tMempoolNode, co baseChainID: BipID, chainID: dexeth.ChainIDs[dex.Simnet], tokens: dexeth.Tokens, - addr: node.addr, + creds: tCreds, + addr: tCreds.addr, net: dex.Simnet, node: node, ctx: ctx, @@ -1175,6 +1231,7 @@ func tassetWallet(assetID uint32) (asset.Wallet, *assetWallet, *tMempoolNode, co txDB: &tTxDB{}, currentTip: &types.Header{Number: new(big.Int)}, finalizeConfs: txConfsNeededToConfirm, + dontQSends: true, }, versionedGases: versionedGases, maxSwapGas: versionedGases[0].Swap, @@ -4492,11 +4549,7 @@ func testSend(t *testing.T, assetID uint32) { w, eth, node, shutdown := tassetWallet(assetID) defer shutdown() - tx := tTx(0, 0, 0, &testAddressA, nil, 21000) - txHash := tx.Hash() - - node.sendTxTx = tx - node.tokenContractor.transferTx = tx + node.tokenContractor.transferTx = tTx(0, 0, 0, &testAddressA, nil, 21000) maxFeeRate, _, _ := eth.recommendedMaxFeeRate(eth.ctx) ethFees := dexeth.WeiToGwei(maxFeeRate) * defaultSendGasLimit @@ -4550,7 +4603,7 @@ func testSend(t *testing.T, assetID uint32) { node.tokenContractor.bal = dexeth.GweiToWei(val - test.sendAdj) node.bal = dexeth.GweiToWei(tokenFees - test.feeAdj) } - coin, err := w.Send(test.addr, val, 0) + _, err := w.Send(test.addr, val, 0) if test.wantErr { if err == nil { t.Fatalf("expected error for test %v", test.name) @@ -4560,9 +4613,6 @@ func testSend(t *testing.T, assetID uint32) { if err != nil { t.Fatalf("unexpected error for test %v: %v", test.name, err) } - if !bytes.Equal(txHash[:], coin.ID()) { - t.Fatal("coin is not the tx hash") - } } } @@ -4749,9 +4799,7 @@ func testEstimateVsActualSendFees(t *testing.T, assetID uint32) { w, _, node, shutdown := tassetWallet(assetID) defer shutdown() - tx := tTx(0, 0, 0, &testAddressA, nil, 21000) - node.sendTxTx = tx - node.tokenContractor.transferTx = tx + node.tokenContractor.transferTx = tTx(0, 0, 0, &testAddressA, nil, 21000) const testAddr = "dd93b447f7eBCA361805eBe056259853F3912E04" diff --git a/client/asset/eth/multirpc.go b/client/asset/eth/multirpc.go index 18a633f6a9..b5331951e1 100644 --- a/client/asset/eth/multirpc.go +++ b/client/asset/eth/multirpc.go @@ -408,19 +408,13 @@ type multiRPCClient struct { var _ ethFetcher = (*multiRPCClient)(nil) func newMultiRPCClient( - dir string, + creds *accountCredentials, endpoints []string, log dex.Logger, cfg *params.ChainConfig, finalizeConfs uint64, net dex.Network, ) (*multiRPCClient, error) { - walletDir := getWalletDir(dir, net) - creds, err := pathCredentials(filepath.Join(walletDir, "keystore")) - if err != nil { - return nil, fmt.Errorf("error parsing credentials from %q: %w", dir, err) - } - m := &multiRPCClient{ net: net, cfg: cfg, @@ -1317,18 +1311,8 @@ func (m *multiRPCClient) sendSignedTransaction(ctx context.Context, tx *types.Tr return nil } -func (m *multiRPCClient) sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) { - tx, err := m.creds.ks.SignTx(*m.creds.acct, types.NewTx(&types.DynamicFeeTx{ - To: &to, - ChainID: m.chainID, - Nonce: txOpts.Nonce.Uint64(), - Gas: txOpts.GasLimit, - GasFeeCap: txOpts.GasFeeCap, - GasTipCap: txOpts.GasTipCap, - Value: txOpts.Value, - Data: data, - }), m.chainID) - +func (m *multiRPCClient) genSignAndSendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte, filts ...acceptabilityFilter) (*types.Transaction, error) { + tx, err := m.creds.signedTx(txOpts, to, data) if err != nil { return nil, fmt.Errorf("signing error: %v", err) } diff --git a/client/asset/eth/multirpc_test_util.go b/client/asset/eth/multirpc_test_util.go index 6bae75c605..4a1f0663c4 100644 --- a/client/asset/eth/multirpc_test_util.go +++ b/client/asset/eth/multirpc_test_util.go @@ -95,7 +95,11 @@ func (m *MRPCTest) rpcClient(dir string, seed []byte, endpoints []string, net de return nil, fmt.Errorf("error creating wallet: %v", err) } - return newMultiRPCClient(dir, endpoints, log, cfg, 3, net) + creds, err := walletCredentials(cfg.ChainID, dir, net) + if err != nil { + return nil, fmt.Errorf("error generating wallet credentials: %w", err) + } + return newMultiRPCClient(creds, endpoints, log, cfg, 3, net) } func (m *MRPCTest) TestHTTP(t *testing.T, port string) { @@ -157,7 +161,7 @@ func (m *MRPCTest) TestSimnetMultiRPCClient(t *testing.T, wsPort, httpPort strin if err != nil { t.Fatal(err) } - if _, err := cl.sendTransaction(ctx, txOpts, alphaAddr, nil); err != nil { + if _, err := cl.genSignAndSendTransaction(ctx, txOpts, alphaAddr, nil); err != nil { t.Fatalf("error sending tx %d-%d: %v", i, j, err) } } diff --git a/client/asset/eth/node.go b/client/asset/eth/node.go index 9a5a1a3afe..96f4ed1e1d 100644 --- a/client/asset/eth/node.go +++ b/client/asset/eth/node.go @@ -7,10 +7,15 @@ import ( "bytes" "crypto/ecdsa" "fmt" + "math/big" + "path/filepath" + "decred.org/dcrdex/dex" "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" ) @@ -34,19 +39,33 @@ func importKeyToKeyStore(ks *keystore.KeyStore, priv *ecdsa.PrivateKey, pw []byt // accountCredentials captures the account-specific geth interfaces. type accountCredentials struct { - ks *keystore.KeyStore - acct *accounts.Account - addr common.Address - wallet accounts.Wallet + ks *keystore.KeyStore + acct *accounts.Account + addr common.Address + wallet accounts.Wallet + chainID *big.Int } -func pathCredentials(dir string) (*accountCredentials, error) { +func (c *accountCredentials) signedTx(txOpts *bind.TransactOpts, to common.Address, data []byte) (*types.Transaction, error) { + return c.ks.SignTx(*c.acct, types.NewTx(&types.DynamicFeeTx{ + To: &to, + ChainID: c.chainID, + Nonce: txOpts.Nonce.Uint64(), + Gas: txOpts.GasLimit, + GasFeeCap: txOpts.GasFeeCap, + GasTipCap: txOpts.GasTipCap, + Value: txOpts.Value, + Data: data, + }), c.chainID) +} + +func pathCredentials(chainID *big.Int, dir string) (*accountCredentials, error) { // TODO: Use StandardScryptN and StandardScryptP? - return credentialsFromKeyStore(keystore.NewKeyStore(dir, keystore.LightScryptN, keystore.LightScryptP)) + return credentialsFromKeyStore(keystore.NewKeyStore(dir, keystore.LightScryptN, keystore.LightScryptP), chainID) } -func credentialsFromKeyStore(ks *keystore.KeyStore) (*accountCredentials, error) { +func credentialsFromKeyStore(ks *keystore.KeyStore, chainID *big.Int) (*accountCredentials, error) { accts := ks.Accounts() if len(accts) != 1 { return nil, fmt.Errorf("unexpected number of accounts, %d", len(accts)) @@ -57,10 +76,11 @@ func credentialsFromKeyStore(ks *keystore.KeyStore) (*accountCredentials, error) return nil, fmt.Errorf("unexpected number of wallets, %d", len(wallets)) } return &accountCredentials{ - ks: ks, - acct: &acct, - addr: acct.Address, - wallet: wallets[0], + ks: ks, + acct: &acct, + addr: acct.Address, + wallet: wallets[0], + chainID: chainID, }, nil } @@ -85,3 +105,8 @@ func signData(creds *accountCredentials, data []byte) (sig, pubKey []byte, err e return } + +func walletCredentials(chainID *big.Int, dir string, net dex.Network) (*accountCredentials, error) { + walletDir := getWalletDir(dir, net) + return pathCredentials(chainID, filepath.Join(walletDir, "keystore")) +} diff --git a/client/asset/eth/nodeclient_harness_test.go b/client/asset/eth/nodeclient_harness_test.go index de0a19721f..6a1b661044 100644 --- a/client/asset/eth/nodeclient_harness_test.go +++ b/client/asset/eth/nodeclient_harness_test.go @@ -80,11 +80,11 @@ var ( simnetWalletSeed = "0812f5244004217452059e2fd11603a511b5d0870ead753df76c966ce3c71531" simnetAddr common.Address simnetAcct *accounts.Account - ethClient ethFetcher + ethClient *multiRPCClient participantWalletSeed = "a897afbdcba037c8c735cc63080558a30d72851eb5a3d05684400ec4123a2d00" participantAddr common.Address participantAcct *accounts.Account - participantEthClient ethFetcher + participantEthClient *multiRPCClient ethSwapContractAddr common.Address simnetContractor contractor participantContractor contractor @@ -198,7 +198,12 @@ func prepareRPCClient(name, dataDir string, providers []string, net dex.Network) return nil, nil, err } - c, err := newMultiRPCClient(dataDir, providers, tLogger.SubLogger(name), cfg, 3, net) + creds, err := walletCredentials(cfg.ChainID, dataDir, net) + if err != nil { + return nil, nil, fmt.Errorf("error generating wallet credentials: %w", err) + } + + c, err := newMultiRPCClient(creds, providers, tLogger.SubLogger(name), cfg, 3, net) if err != nil { return nil, nil, fmt.Errorf("(%s) prepareRPCClient error: %v", name, err) } @@ -650,7 +655,6 @@ func TestBasicRetrieval(t *testing.T) { t.Fatal("not enough funds") } t.Run("testBestHeader", testBestHeader) - t.Run("testPendingTransactions", testPendingTransactions) t.Run("testHeaderByHash", testHeaderByHash) t.Run("testTransactionReceipt", testTransactionReceipt) } @@ -804,7 +808,7 @@ func testSendTransaction(t *testing.T) { t.Fatalf("txOpts error: %v", err) } - tx, err := ethClient.sendTransaction(ctx, txOpts, participantAddr, nil) + tx, err := ethClient.genSignAndSendTransaction(ctx, txOpts, participantAddr, nil) if err != nil { t.Fatal(err) } @@ -868,7 +872,6 @@ func testSendSignedTransaction(t *testing.T) { if !errors.Is(err, asset.CoinNotFoundError) { t.Fatalf("no CoinNotFoundError") } - c := ethClient.(*multiRPCClient) var nonce uint64 var chainID *big.Int var ks *keystore.KeyStore @@ -877,8 +880,8 @@ func testSendSignedTransaction(t *testing.T) { t.Fatalf("error getting nonce: %v", err) } nonce = n.Uint64() - ks = c.creds.ks - chainID = c.chainID + ks = ethClient.creds.ks + chainID = ethClient.chainID tx := types.NewTx(&types.DynamicFeeTx{ To: &simnetAddr, @@ -930,7 +933,7 @@ func testTransactionReceipt(t *testing.T) { if err != nil { t.Fatalf("txOpts error: %v", err) } - tx, err := ethClient.sendTransaction(ctx, txOpts, simnetAddr, nil) + tx, err := ethClient.genSignAndSendTransaction(ctx, txOpts, simnetAddr, nil) if err != nil { t.Fatal(err) } @@ -944,19 +947,6 @@ func testTransactionReceipt(t *testing.T) { spew.Dump(receipt) } -func testPendingTransactions(t *testing.T) { - mf, is := ethClient.(txPoolFetcher) - if !is { - return - } - txs, err := mf.pendingTransactions() - if err != nil { - t.Fatal(err) - } - // Should be empty. - spew.Dump(txs) -} - func testSwap(t *testing.T, assetID uint32) { var secretHash [32]byte copy(secretHash[:], encode.RandomBytes(32)) diff --git a/client/asset/eth/txdb.go b/client/asset/eth/txdb.go index a9bc6d1f73..4d8eba04a5 100644 --- a/client/asset/eth/txdb.go +++ b/client/asset/eth/txdb.go @@ -12,6 +12,7 @@ import ( "math" "math/big" "sync" + "sync/atomic" "time" "decred.org/dcrdex/client/asset" @@ -50,6 +51,8 @@ type extendedWalletTx struct { actionRequested bool actionIgnored time.Time indexed bool + + initialSendComplete atomic.Bool } func (t *extendedWalletTx) age() time.Duration { @@ -343,6 +346,7 @@ func unmarshalTx(wtB []byte) (wt *extendedWalletTx, err error) { wt.txHash = common.HexToHash(wt.ID) wt.lastBroadcast = time.Unix(int64(wt.SubmissionTime), 0) wt.savedToDB = true + wt.initialSendComplete.Store(true) return }