Skip to content

Commit

Permalink
make fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf committed Jun 28, 2024
1 parent b17c638 commit 2de06a1
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 15 deletions.
7 changes: 6 additions & 1 deletion chainio/gasoracle/gasoracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ func (o *GasOracle) GetLatestGasCaps(ctx context.Context) (gasTipCap, gasFeeCap
return
}

func (o *GasOracle) UpdateGas(ctx context.Context, tx *types.Transaction, value, gasTipCap, gasFeeCap *big.Int, from common.Address) (*types.Transaction, error) {
func (o *GasOracle) UpdateGas(
ctx context.Context,
tx *types.Transaction,
value, gasTipCap, gasFeeCap *big.Int,
from common.Address,
) (*types.Transaction, error) {
gasLimit, err := o.client.EstimateGas(ctx, ethereum.CallMsg{
From: from,
To: tx.To(),
Expand Down
129 changes: 115 additions & 14 deletions chainio/txmgr/geometric.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ type GeometricTxManager struct {

var _ AsyncTxManager = (*GeometricTxManager)(nil)

func NewTxnManager(ethClient eth.Client, gasOracle gasoracle.GasOracle, wallet wallet.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *Metrics) *GeometricTxManager {
func NewTxnManager(
ethClient eth.Client,
gasOracle gasoracle.GasOracle,
wallet wallet.Wallet,
numConfirmations, queueSize int,
txnBroadcastTimeout time.Duration,
txnRefreshInterval time.Duration,
logger logging.Logger,
metrics *Metrics,
) *GeometricTxManager {
return &GeometricTxManager{
ethClient: ethClient,
gasOracle: gasOracle,
Expand Down Expand Up @@ -135,11 +144,22 @@ func (t *GeometricTxManager) Start(ctx context.Context) {

// ProcessTransaction sends the transaction and queues the transaction for monitoring.
// It returns an error if the transaction fails to be confirmed for reasons other than timeouts.
// TxnManager monitors the transaction and resends it with a higher gas price if it is not mined without a timeout until the transaction is confirmed or failed.
// TxnManager monitors the transaction and resends it with a higher gas price if it is not mined without a timeout until
// the transaction is confirmed or failed.
func (t *GeometricTxManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug(
"new transaction",
"tag",
req.Tag,
"nonce",
req.Tx.Nonce(),
"gasFeeCap",
req.Tx.GasFeeCap(),
"gasTipCap",
req.Tx.GasTipCap(),
)

var txn *types.Transaction
var txID wallet.TxID
Expand All @@ -166,7 +186,19 @@ func (t *GeometricTxManager) ProcessTransaction(ctx context.Context, req *TxnReq
didTimeout = urlErr.Timeout()
}
if didTimeout || errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", txn.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn(
"failed to send txn due to timeout",
"tag",
req.Tag,
"hash",
txn.Hash().Hex(),
"numRetries",
retryFromFailure,
"maxRetry",
maxSendTransactionRetry,
"err",
err,
)
retryFromFailure++
continue
} else if err != nil {
Expand Down Expand Up @@ -220,7 +252,10 @@ func (t *GeometricTxManager) ensureAnyTransactionBroadcasted(ctx context.Context
}
}

func (t *GeometricTxManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) {
func (t *GeometricTxManager) ensureAnyTransactionEvaled(
ctx context.Context,
txs []*transaction,
) (*types.Receipt, error) {
queryTicker := time.NewTicker(queryTickerDuration)
defer queryTicker.Stop()
var receipt *types.Receipt
Expand All @@ -238,7 +273,15 @@ func (t *GeometricTxManager) ensureAnyTransactionEvaled(ctx context.Context, txs
chainTip, err := t.ethClient.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip {
t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip)
t.logger.Debug(
"transaction has been mined but don't have enough confirmations at current chain tip",
"txnBlockNumber",
receipt.BlockNumber.Uint64(),
"numConfirmations",
t.numConfirmations,
"chainTip",
chainTip,
)
break
} else {
return receipt, nil
Expand Down Expand Up @@ -273,7 +316,8 @@ func (t *GeometricTxManager) ensureAnyTransactionEvaled(ctx context.Context, txs
}
}

// monitorTransaction waits until the transaction is confirmed (or failed) and resends it with a higher gas price if it is not mined without a timeout.
// monitorTransaction waits until the transaction is confirmed (or failed) and resends it with a higher gas price if it
// is not mined without a timeout.
// It returns the receipt once the transaction has been confirmed.
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
func (t *GeometricTxManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) {
Expand All @@ -291,10 +335,19 @@ func (t *GeometricTxManager) monitorTransaction(ctx context.Context, req *TxnReq

// Ensure transactions are broadcasted to the network before querying the receipt.
// This is to avoid querying the receipt of a transaction that hasn't been broadcasted yet.
// For example, when Fireblocks wallet is used, there may be delays in broadcasting the transaction due to latency from cosigning and MPC operations.
// For example, when Fireblocks wallet is used, there may be delays in broadcasting the transaction due to
// latency from cosigning and MPC operations.
err = t.ensureAnyTransactionBroadcasted(ctxWithTimeout, req.txAttempts)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("transaction not broadcasted within timeout", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn(
"transaction not broadcasted within timeout",
"tag",
req.Tag,
"txHash",
req.Tx.Hash().Hex(),
"nonce",
req.Tx.Nonce(),
)
fireblocksWallet, ok := t.wallet.(interface {
CancelTransactionBroadcast(ctx context.Context, txID wallet.TxID) (bool, error)
})
Expand Down Expand Up @@ -335,10 +388,26 @@ func (t *GeometricTxManager) monitorTransaction(ctx context.Context, req *TxnReq

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn(
"transaction has been mined, but hasn't accumulated the required number of confirmations",
"tag",
req.Tag,
"txHash",
req.Tx.Hash().Hex(),
"nonce",
req.Tx.Nonce(),
)
continue
}
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn(
"transaction not mined within timeout, resending with higher gas price",
"tag",
req.Tag,
"txHash",
req.Tx.Hash().Hex(),
"nonce",
req.Tx.Nonce(),
)
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("failed to speed up transaction", "err", err)
Expand All @@ -348,7 +417,19 @@ func (t *GeometricTxManager) monitorTransaction(ctx context.Context, req *TxnReq
txID, err := t.wallet.SendTransaction(ctx, newTx)
if err != nil {
if retryFromFailure >= maxSendTransactionRetry {
t.logger.Warn("failed to send txn - retries exhausted", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err)
t.logger.Warn(
"failed to send txn - retries exhausted",
"tag",
req.Tag,
"txn",
req.Tx.Hash().Hex(),
"attempt",
retryFromFailure,
"maxRetry",
maxSendTransactionRetry,
"err",
err,
)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
Expand All @@ -375,7 +456,11 @@ func (t *GeometricTxManager) monitorTransaction(ctx context.Context, req *TxnReq

// speedUpTxn increases the gas price of the existing transaction by specified percentage.
// It makes sure the new gas price is not lower than the current gas price.
func (t *GeometricTxManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag string) (*types.Transaction, error) {
func (t *GeometricTxManager) speedUpTxn(
ctx context.Context,
tx *types.Transaction,
tag string,
) (*types.Transaction, error) {
prevGasTipCap := tx.GasTipCap()
prevGasFeeCap := tx.GasFeeCap()
// get the gas tip cap and gas fee cap based on current network condition
Expand All @@ -398,7 +483,23 @@ func (t *GeometricTxManager) speedUpTxn(ctx context.Context, tx *types.Transacti
newGasFeeCap = increasedGasFeeCap
}

t.logger.Info("increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info(
"increasing gas price",
"tag",
tag,
"txHash",
tx.Hash().Hex(),
"nonce",
tx.Nonce(),
"prevGasTipCap",
prevGasTipCap,
"prevGasFeeCap",
prevGasFeeCap,
"newGasTipCap",
newGasTipCap,
"newGasFeeCap",
newGasFeeCap,
)
from, err := t.wallet.SenderAddress(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get sender address: %w", err)
Expand Down

0 comments on commit 2de06a1

Please sign in to comment.