Skip to content

Commit

Permalink
Improve-logging (#15)
Browse files Browse the repository at this point in the history
* Improve logging

* Clippy & fmt

* Minor improvements

* Remove redundant comment
  • Loading branch information
Dzejkop authored Jan 10, 2024
1 parent 0829d6b commit f324c53
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 88 deletions.
3 changes: 2 additions & 1 deletion src/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn calculate_gas_fees_from_estimates(
(max_fee_per_gas, max_priority_fee_per_gas)
}

pub async fn should_send_transaction(
pub async fn should_send_relayer_transactions(
app: &App,
relayer: &RelayerInfo,
) -> eyre::Result<bool> {
Expand All @@ -43,6 +43,7 @@ pub async fn should_send_transaction(

if chain_fees.gas_price > gas_limit.value.0 {
tracing::warn!(
relayer_id = relayer.id,
chain_id = relayer.chain_id,
gas_price = ?chain_fees.gas_price,
gas_limit = ?gas_limit.value.0,
Expand Down
48 changes: 27 additions & 21 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority};

pub mod data;

use self::data::{
AddressWrapper, BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind,
};
use self::data::{BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind};
pub use self::data::{TxForEscalation, TxStatus, UnsentTx};

// Statically link in migration files
Expand Down Expand Up @@ -141,6 +139,32 @@ impl Database {
.await?)
}

pub async fn get_relayers_by_chain_id(
&self,
chain_id: u64,
) -> eyre::Result<Vec<RelayerInfo>> {
Ok(sqlx::query_as(
r#"
SELECT
id,
name,
chain_id,
key_id,
address,
nonce,
current_nonce,
max_inflight_txs,
gas_price_limits,
enabled
FROM relayers
WHERE chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_all(&self.pool)
.await?)
}

pub async fn get_relayer(&self, id: &str) -> eyre::Result<RelayerInfo> {
Ok(sqlx::query_as(
r#"
Expand Down Expand Up @@ -781,24 +805,6 @@ impl Database {
.await?)
}

pub async fn get_relayer_addresses(
&self,
chain_id: u64,
) -> eyre::Result<Vec<Address>> {
let items: Vec<(AddressWrapper,)> = sqlx::query_as(
r#"
SELECT address
FROM relayers
WHERE chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_all(&self.pool)
.await?;

Ok(items.into_iter().map(|(wrapper,)| wrapper.0).collect())
}

pub async fn update_relayer_nonce(
&self,
chain_id: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/server/routes/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn send_tx(
)
.await?;

tracing::info!(id = tx_id, "Tx created");
tracing::info!(tx_id, "Transaction created");

Ok(Json(SendTxResponse { tx_id }))
}
Expand Down
6 changes: 3 additions & 3 deletions src/task_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ where
let mut failures = vec![];

loop {
tracing::info!(label, "Running task");
tracing::info!(task_label = label, "Running task");

let result = task(app.clone()).await;

if let Err(err) = result {
tracing::error!(label, error = ?err, "Task failed");
tracing::error!(task_label = label, error = ?err, "Task failed");

failures.push(Instant::now());
let backoff = determine_backoff(&failures);
Expand All @@ -47,7 +47,7 @@ where

prune_failures(&mut failures);
} else {
tracing::info!(label, "Task finished");
tracing::info!(task_label = label, "Task finished");
break;
}
}
Expand Down
65 changes: 39 additions & 26 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ use itertools::Itertools;

use crate::app::App;
use crate::broadcast_utils::{
calculate_gas_fees_from_estimates, should_send_transaction,
calculate_gas_fees_from_estimates, should_send_relayer_transactions,
};
use crate::db::UnsentTx;

const NO_TXS_SLEEP_DURATION: Duration = Duration::from_secs(2);

pub async fn broadcast_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
// Get all unsent txs and broadcast
let txs = app.db.get_unsent_txs().await?;
let num_txs = txs.len();

let txs_by_relayer = sort_txs_by_relayer(txs);

let mut futures = FuturesUnordered::new();
Expand All @@ -31,11 +34,13 @@ pub async fn broadcast_txs(app: Arc<App>) -> eyre::Result<()> {

while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed broadcasting txs");
tracing::error!(error = ?err, "Failed broadcasting transactions");
}
}

tokio::time::sleep(Duration::from_secs(1)).await;
if num_txs == 0 {
tokio::time::sleep(NO_TXS_SLEEP_DURATION).await;
}
}
}

Expand All @@ -49,21 +54,22 @@ async fn broadcast_relayer_txs(
return Ok(());
}

tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs");

let relayer = app.db.get_relayer(&relayer_id).await?;

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer_id,
"Skipping transaction broadcasts"
);
if !should_send_relayer_transactions(app, &relayer).await? {
tracing::warn!(relayer_id = relayer_id, "Skipping relayer broadcasts");

return Ok(());
}

tracing::info!(
relayer_id,
num_txs = txs.len(),
"Broadcasting relayer transactions"
);

for tx in txs {
tracing::info!(id = tx.id, "Sending tx");
tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction");

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
Expand Down Expand Up @@ -103,16 +109,22 @@ async fn broadcast_relayer_txs(
.fill_transaction(&mut typed_transaction, None)
.await?;

tracing::debug!(?tx.id, "Simulating tx");
tracing::debug!(tx_id = tx.id, "Simulating transaction");

// Simulate the transaction
match middleware.call(&typed_transaction, None).await {
Ok(_) => {
tracing::info!(?tx.id, "Tx simulated successfully");
tracing::info!(
tx_id = tx.id,
"Transaction simulated successfully"
);
}
Err(err) => {
tracing::error!(?tx.id, error = ?err, "Failed to simulate tx");
continue;
tracing::error!(tx_id = tx.id, error = ?err, "Failed to simulate transaction");

// If we fail while broadcasting a tx with nonce `n`,
// it doesn't make sense to broadcast tx with nonce `n + 1`
return Ok(());
}
};

Expand All @@ -133,24 +145,25 @@ async fn broadcast_relayer_txs(
)
.await?;

tracing::debug!(?tx.id, "Sending tx");
tracing::debug!(tx_id = tx.id, "Sending transaction");

// TODO: Be smarter about error handling - a tx can fail to be sent
// e.g. because the relayer is out of funds
// but we don't want to retry it forever
let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await;

match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
}
let pending_tx = match pending_tx {
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(?tx.id, error = ?err, "Failed to send tx");
tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction");
continue;
}
};

tracing::info!(id = tx.id, hash = ?tx_hash, "Tx broadcast");
tracing::info!(
tx_id = tx.id,
tx_nonce = tx.nonce,
tx_hash = ?tx_hash,
?pending_tx,
"Transaction broadcast"
);
}

Ok(())
Expand Down
53 changes: 31 additions & 22 deletions src/tasks/escalate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;

use crate::app::App;
use crate::broadcast_utils::should_send_transaction;
use crate::broadcast_utils::should_send_relayer_transactions;
use crate::db::TxForEscalation;

pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
Expand Down Expand Up @@ -46,14 +46,21 @@ async fn escalate_relayer_txs(
let relayer = app.db.get_relayer(&relayer_id).await?;

for tx in txs {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(id = tx.id, "Skipping transaction broadcast");
if !should_send_relayer_transactions(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer.id,
"Skipping relayer escalations"
);

return Ok(());
}

tracing::info!(
tx_id = tx.id,
escalation_count = tx.escalation_count,
"Escalating transaction"
);

let escalation = tx.escalation_count + 1;

let middleware = app
Expand All @@ -71,23 +78,17 @@ async fn escalate_relayer_txs(
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;
let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0;

let max_fee_per_gas_increase =
initial_max_fee_per_gas * increased_gas_price_percentage / factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;
initial_max_fee_per_gas + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
Expand All @@ -106,18 +107,26 @@ async fn escalate_relayer_txs(
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Ok(pending_tx) => pending_tx,
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

tracing::info!(
tx_id = tx.id,
?tx_hash,
?initial_max_fee_per_gas,
?max_fee_per_gas_increase,
?max_fee_per_gas,
?max_priority_fee_per_gas,
?pending_tx,
"Escalated transaction"
);

app.db
.escalate_tx(
&tx.id,
Expand All @@ -127,7 +136,7 @@ async fn escalate_relayer_txs(
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
tracing::info!(tx_id = tx.id, "Escalated transaction saved");
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/handle_reorgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub async fn handle_hard_reorgs(app: Arc<App>) -> eyre::Result<()> {
let reorged_txs = app.db.handle_hard_reorgs().await?;

for tx in reorged_txs {
tracing::info!(id = tx, "Tx hard reorged");
tracing::info!(tx_id = tx, "Transaction hard reorged");
}

tokio::time::sleep(app.config.service.hard_reorg_interval).await;
Expand All @@ -23,7 +23,7 @@ pub async fn handle_soft_reorgs(app: Arc<App>) -> eyre::Result<()> {
let txs = app.db.handle_soft_reorgs().await?;

for tx in txs {
tracing::info!(id = tx, "Tx soft reorged");
tracing::info!(tx_id = tx, "Transaction soft reorged");
}

tokio::time::sleep(app.config.service.soft_reorg_interval).await;
Expand Down
Loading

0 comments on commit f324c53

Please sign in to comment.