diff --git a/db/migrations/003_relayers_tx_limits.sql b/db/migrations/003_relayers_tx_limits.sql new file mode 100644 index 0000000..2edf370 --- /dev/null +++ b/db/migrations/003_relayers_tx_limits.sql @@ -0,0 +1,6 @@ +ALTER TABLE relayers +ADD COLUMN max_queued_txs BIGINT NOT NULL DEFAULT 20, +ADD CONSTRAINT check_max_queued_txs CHECK (max_queued_txs > max_inflight_txs); + +UPDATE relayers +SET max_queued_txs = GREATEST(max_inflight_txs, 20); diff --git a/src/client.rs b/src/client.rs index 9f5c108..c92f639 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,5 @@ use reqwest::Response; +use thiserror::Error; use crate::api_key::ApiKey; use crate::server::routes::network::NewNetworkInfo; @@ -8,12 +9,29 @@ use crate::server::routes::relayer::{ use crate::server::routes::transaction::{ GetTxResponse, SendTxRequest, SendTxResponse, }; +use crate::server::ApiError; +use crate::types::RelayerUpdate; pub struct TxSitterClient { client: reqwest::Client, url: String, } +#[derive(Debug, Error)] +pub enum ClientError { + #[error("Reqwest error: {0}")] + Reqwest(#[from] reqwest::Error), + + #[error("Serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("API error: {0}")] + TxSitter(#[from] ApiError), + + #[error("Invalid API key: {0}")] + InvalidApiKey(eyre::Error), +} + impl TxSitterClient { pub fn new(url: impl ToString) -> Self { Self { @@ -22,7 +40,7 @@ impl TxSitterClient { } } - async fn post(&self, url: &str) -> eyre::Result + async fn post(&self, url: &str) -> Result where R: serde::de::DeserializeOwned, { @@ -33,7 +51,11 @@ impl TxSitterClient { Ok(response.json().await?) } - async fn json_post(&self, url: &str, body: T) -> eyre::Result + async fn json_post( + &self, + url: &str, + body: T, + ) -> Result where T: serde::Serialize, R: serde::de::DeserializeOwned, @@ -45,7 +67,7 @@ impl TxSitterClient { Ok(response.json().await?) } - async fn json_get(&self, url: &str) -> eyre::Result + async fn json_get(&self, url: &str) -> Result where R: serde::de::DeserializeOwned, { @@ -56,19 +78,21 @@ impl TxSitterClient { Ok(response.json().await?) } - async fn validate_response(response: Response) -> eyre::Result { + async fn validate_response( + response: Response, + ) -> Result { if !response.status().is_success() { - let body = response.text().await?; - - return Err(eyre::eyre!("{body}")); + let body: ApiError = response.json().await?; + return Err(ClientError::TxSitter(body)); } Ok(response) } + pub async fn create_relayer( &self, req: &CreateRelayerRequest, - ) -> eyre::Result { + ) -> Result { self.json_post(&format!("{}/1/admin/relayer", self.url), req) .await } @@ -76,18 +100,34 @@ impl TxSitterClient { pub async fn create_relayer_api_key( &self, relayer_id: &str, - ) -> eyre::Result { + ) -> Result { self.post(&format!("{}/1/admin/relayer/{relayer_id}/key", self.url,)) .await } + pub async fn update_relayer( + &self, + relayer_id: &str, + relayer_update: RelayerUpdate, + ) -> Result<(), ClientError> { + self.json_post( + &format!("{}/1/admin/relayer/{relayer_id}", self.url), + relayer_update, + ) + .await + } + pub async fn send_tx( &self, api_key: &ApiKey, req: &SendTxRequest, - ) -> eyre::Result { + ) -> Result { self.json_post( - &format!("{}/1/api/{}/tx", self.url, api_key.reveal()?), + &format!( + "{}/1/api/{}/tx", + self.url, + api_key.reveal().map_err(ClientError::InvalidApiKey)? + ), req, ) .await @@ -97,11 +137,11 @@ impl TxSitterClient { &self, api_key: &ApiKey, tx_id: &str, - ) -> eyre::Result { + ) -> Result { self.json_get(&format!( "{}/1/api/{}/tx/{tx_id}", self.url, - api_key.reveal()?, + api_key.reveal().map_err(ClientError::InvalidApiKey)?, tx_id = tx_id )) .await @@ -111,7 +151,7 @@ impl TxSitterClient { &self, chain_id: u64, req: &NewNetworkInfo, - ) -> eyre::Result<()> { + ) -> Result<(), ClientError> { let response = self .client .post(&format!("{}/1/admin/network/{}", self.url, chain_id)) diff --git a/src/db.rs b/src/db.rs index 91b4507..ec2b221 100644 --- a/src/db.rs +++ b/src/db.rs @@ -74,7 +74,15 @@ impl Database { ) -> eyre::Result<()> { let mut tx = self.pool.begin().await?; - if let Some(name) = &update.relayer_name { + let RelayerUpdate { + relayer_name, + max_inflight_txs, + max_queued_txs, + gas_price_limits, + enabled, + } = update; + + if let Some(name) = relayer_name { sqlx::query( r#" UPDATE relayers @@ -88,7 +96,7 @@ impl Database { .await?; } - if let Some(max_inflight_txs) = update.max_inflight_txs { + if let Some(max_inflight_txs) = max_inflight_txs { sqlx::query( r#" UPDATE relayers @@ -97,12 +105,26 @@ impl Database { "#, ) .bind(id) - .bind(max_inflight_txs as i64) + .bind(*max_inflight_txs as i64) + .execute(tx.as_mut()) + .await?; + } + + if let Some(max_queued_txs) = max_queued_txs { + sqlx::query( + r#" + UPDATE relayers + SET max_queued_txs = $2 + WHERE id = $1 + "#, + ) + .bind(id) + .bind(*max_queued_txs as i64) .execute(tx.as_mut()) .await?; } - if let Some(gas_price_limits) = &update.gas_price_limits { + if let Some(gas_price_limits) = gas_price_limits { sqlx::query( r#" UPDATE relayers @@ -116,11 +138,26 @@ impl Database { .await?; } + if let Some(enabled) = enabled { + sqlx::query( + r#" + UPDATE relayers + SET enabled = $2 + WHERE id = $1 + "#, + ) + .bind(id) + .bind(*enabled) + .execute(tx.as_mut()) + .await?; + } + tx.commit().await?; Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn get_relayers(&self) -> eyre::Result> { Ok(sqlx::query_as( r#" @@ -142,6 +179,7 @@ impl Database { .await?) } + #[instrument(skip(self), level = "debug")] pub async fn get_relayers_by_chain_id( &self, chain_id: u64, @@ -157,6 +195,7 @@ impl Database { nonce, current_nonce, max_inflight_txs, + max_queued_txs, gas_price_limits, enabled FROM relayers @@ -168,6 +207,7 @@ impl Database { .await?) } + #[instrument(skip(self), level = "debug")] pub async fn get_relayer(&self, id: &str) -> eyre::Result { Ok(sqlx::query_as( r#" @@ -180,6 +220,7 @@ impl Database { nonce, current_nonce, max_inflight_txs, + max_queued_txs, gas_price_limits, enabled FROM relayers @@ -191,6 +232,28 @@ impl Database { .await?) } + #[instrument(skip(self), level = "debug")] + pub async fn get_relayer_pending_txs( + &self, + relayer_id: &str, + ) -> eyre::Result { + let (tx_count,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(1) + FROM transactions t + LEFT JOIN sent_transactions s ON (t.id = s.tx_id) + WHERE t.relayer_id = $1 + AND (s.tx_id IS NULL OR s.status = $2) + "#, + ) + .bind(relayer_id) + .bind(TxStatus::Pending) + .fetch_one(&self.pool) + .await?; + + Ok(tx_count as usize) + } + #[instrument(skip(self), level = "debug")] pub async fn create_transaction( &self, @@ -245,6 +308,7 @@ impl Database { Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn get_unsent_txs(&self) -> eyre::Result> { Ok(sqlx::query_as( r#" @@ -309,6 +373,7 @@ impl Database { Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn get_latest_block_number_without_fee_estimates( &self, chain_id: u64, @@ -334,6 +399,7 @@ impl Database { Ok(block_number.map(|(n,)| n as u64)) } + #[instrument(skip(self), level = "debug")] pub async fn get_latest_block_number( &self, chain_id: u64, @@ -354,6 +420,7 @@ impl Database { Ok(block_number.map(|(n,)| n as u64)) } + #[instrument(skip(self), level = "debug")] pub async fn get_latest_block_fees_by_chain_id( &self, chain_id: u64, @@ -387,6 +454,7 @@ impl Database { })) } + #[instrument(skip(self), level = "debug")] pub async fn has_blocks_for_chain( &self, chain_id: u64, @@ -687,6 +755,7 @@ impl Database { Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn get_txs_for_escalation( &self, escalation_interval: Duration, @@ -770,6 +839,7 @@ impl Database { Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn read_tx( &self, tx_id: &str, @@ -789,6 +859,7 @@ impl Database { .await?) } + #[instrument(skip(self), level = "debug")] pub async fn read_txs( &self, relayer_id: &str, @@ -925,6 +996,7 @@ impl Database { Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn get_network_rpc( &self, chain_id: u64, @@ -946,6 +1018,7 @@ impl Database { Ok(row.0) } + #[instrument(skip(self), level = "debug")] pub async fn get_network_chain_ids(&self) -> eyre::Result> { let items: Vec<(i64,)> = sqlx::query_as( r#" @@ -979,6 +1052,7 @@ impl Database { Ok(()) } + #[instrument(skip(self), level = "debug")] pub async fn is_api_key_valid( &self, relayer_id: &str, @@ -1002,6 +1076,7 @@ impl Database { Ok(is_valid) } + #[instrument(skip(self), level = "debug")] pub async fn get_stats(&self, chain_id: u64) -> eyre::Result { let (pending_txs,): (i64,) = sqlx::query_as( r#" @@ -1128,7 +1203,8 @@ mod tests { match Database::new(&DatabaseConfig::connection_string(&url)).await { Ok(db) => return Ok((db, db_container)), - Err(_) => { + Err(err) => { + eprintln!("Failed to connect to the database: {err:?}"); tokio::time::sleep(Duration::from_secs(1)).await; } } @@ -1263,6 +1339,7 @@ mod tests { &RelayerUpdate { relayer_name: None, max_inflight_txs: Some(10), + max_queued_txs: Some(20), gas_price_limits: Some(vec![RelayerGasPriceLimit { chain_id: 1, value: U256Wrapper(U256::from(10_123u64)), @@ -1282,6 +1359,7 @@ mod tests { assert_eq!(relayer.nonce, 0); assert_eq!(relayer.current_nonce, 0); assert_eq!(relayer.max_inflight_txs, 10); + assert_eq!(relayer.max_queued_txs, 20); assert_eq!( relayer.gas_price_limits.0, vec![RelayerGasPriceLimit { diff --git a/src/server.rs b/src/server.rs index c6b5502..3a9b9ac 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,8 @@ use std::sync::Arc; -use axum::http::StatusCode; -use axum::response::IntoResponse; use axum::routing::{get, post, IntoMakeService}; use axum::Router; use hyper::server::conn::AddrIncoming; -use thiserror::Error; use tower_http::validate_request::ValidateRequestHeaderLayer; use self::routes::relayer::{ @@ -16,46 +13,12 @@ use self::routes::transaction::{get_tx, get_txs, send_tx}; use self::trace_layer::MatchedPathMakeSpan; use crate::app::App; +mod error; mod middleware; pub mod routes; mod trace_layer; -#[derive(Debug, Error)] -pub enum ApiError { - #[error("Invalid key encoding")] - KeyEncoding, - - #[error("Invalid key length")] - KeyLength, - - #[error("Unauthorized")] - Unauthorized, - - #[error("Invalid format")] - InvalidFormat, - - #[error("Missing tx")] - MissingTx, - - #[error("Internal error {0}")] - Eyre(#[from] eyre::Report), -} - -impl IntoResponse for ApiError { - fn into_response(self) -> axum::response::Response { - let status_code = match self { - Self::KeyLength | Self::KeyEncoding => StatusCode::BAD_REQUEST, - Self::Unauthorized => StatusCode::UNAUTHORIZED, - Self::Eyre(_) => StatusCode::INTERNAL_SERVER_ERROR, - Self::InvalidFormat => StatusCode::BAD_REQUEST, - Self::MissingTx => StatusCode::NOT_FOUND, - }; - - let message = self.to_string(); - - (status_code, message).into_response() - } -} +pub use self::error::ApiError; pub async fn serve(app: Arc) -> eyre::Result<()> { let server = spawn_server(app).await?; diff --git a/src/server/error.rs b/src/server/error.rs new file mode 100644 index 0000000..2b2456b --- /dev/null +++ b/src/server/error.rs @@ -0,0 +1,123 @@ +use axum::response::IntoResponse; +use hyper::StatusCode; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Debug, Error, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ApiError { + #[error("Invalid key encoding")] + KeyEncoding, + + #[error("Invalid key length")] + KeyLength, + + #[error("Unauthorized")] + Unauthorized, + + #[error("Invalid format")] + InvalidFormat, + + #[error("Missing tx")] + MissingTx, + + #[error("Relayer is disabled")] + RelayerDisabled, + + #[error("Too many queued transactions, max: {max}, current: {current}")] + TooManyTransactions { max: usize, current: usize }, + + #[error("Internal error {0}")] + #[serde(with = "serde_eyre")] + Other(#[from] eyre::Report), +} + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + let status_code = match self { + Self::KeyLength | Self::KeyEncoding => StatusCode::BAD_REQUEST, + Self::Unauthorized => StatusCode::UNAUTHORIZED, + Self::Other(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::InvalidFormat => StatusCode::BAD_REQUEST, + Self::MissingTx => StatusCode::NOT_FOUND, + Self::RelayerDisabled => StatusCode::FORBIDDEN, + Self::TooManyTransactions { .. } => StatusCode::TOO_MANY_REQUESTS, + }; + + let message = serde_json::to_string(&self) + .expect("Failed to serialize error message"); + + (status_code, message).into_response() + } +} + +// Mostly used for tests +impl PartialEq for ApiError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + ( + Self::TooManyTransactions { + max: l_max, + current: l_current, + }, + Self::TooManyTransactions { + max: r_max, + current: r_current, + }, + ) => l_max == r_max && l_current == r_current, + (Self::Other(l0), Self::Other(r0)) => { + l0.to_string() == r0.to_string() + } + _ => { + core::mem::discriminant(self) == core::mem::discriminant(other) + } + } + } +} + +mod serde_eyre { + use std::borrow::Cow; + + use serde::Deserialize; + + pub fn serialize( + error: &eyre::Report, + serializer: S, + ) -> Result + where + S: serde::Serializer, + { + let error = error.to_string(); + serializer.serialize_str(&error) + } + + pub fn deserialize<'de, D>( + deserializer: D, + ) -> Result + where + D: serde::Deserializer<'de>, + { + let error = Cow::<'static, str>::deserialize(deserializer)?; + Ok(eyre::eyre!(error)) + } +} + +#[cfg(test)] +mod tests { + use test_case::test_case; + + use super::*; + + #[test_case(ApiError::KeyLength, r#""keyLength""# ; "Key length")] + #[test_case(ApiError::Other(eyre::eyre!("Test error")), r#"{"other":"Test error"}"# ; "Other error")] + #[test_case(ApiError::TooManyTransactions { max: 10, current: 20 }, r#"{"tooManyTransactions":{"max":10,"current":20}}"# ; "Too many transactions")] + fn serialization(error: ApiError, expected: &str) { + let serialized = serde_json::to_string(&error).unwrap(); + + assert_eq!(serialized, expected); + + let deserialized = serde_json::from_str::(expected).unwrap(); + + assert_eq!(error, deserialized); + } +} diff --git a/src/server/routes/relayer.rs b/src/server/routes/relayer.rs index da87148..066f4ab 100644 --- a/src/server/routes/relayer.rs +++ b/src/server/routes/relayer.rs @@ -83,10 +83,10 @@ pub async fn update_relayer( State(app): State>, Path(relayer_id): Path, Json(req): Json, -) -> Result<(), ApiError> { +) -> Result, ApiError> { app.db.update_relayer(&relayer_id, &req).await?; - Ok(()) + Ok(Json(())) } #[tracing::instrument(skip(app))] @@ -115,10 +115,10 @@ pub async fn get_relayer( pub async fn purge_unsent_txs( State(app): State>, Path(relayer_id): Path, -) -> Result<(), ApiError> { +) -> Result, ApiError> { app.db.purge_unsent_txs(&relayer_id).await?; - Ok(()) + Ok(Json(())) } #[tracing::instrument(skip(app, api_token))] diff --git a/src/server/routes/transaction.rs b/src/server/routes/transaction.rs index a74cf30..45e16d5 100644 --- a/src/server/routes/transaction.rs +++ b/src/server/routes/transaction.rs @@ -90,6 +90,24 @@ pub async fn send_tx( uuid::Uuid::new_v4().to_string() }; + let relayer = app.db.get_relayer(api_token.relayer_id()).await?; + + if !relayer.enabled { + return Err(ApiError::RelayerDisabled); + } + + let relayer_queued_tx_count = app + .db + .get_relayer_pending_txs(api_token.relayer_id()) + .await?; + + if relayer_queued_tx_count > relayer.max_queued_txs as usize { + return Err(ApiError::TooManyTransactions { + max: relayer.max_queued_txs as usize, + current: relayer_queued_tx_count, + }); + } + app.db .create_transaction( &tx_id, diff --git a/src/types.rs b/src/types.rs index b349d08..657e245 100644 --- a/src/types.rs +++ b/src/types.rs @@ -42,6 +42,8 @@ pub struct RelayerInfo { pub current_nonce: u64, #[sqlx(try_from = "i64")] pub max_inflight_txs: u64, + #[sqlx(try_from = "i64")] + pub max_queued_txs: u64, pub gas_price_limits: Json>, pub enabled: bool, } @@ -54,6 +56,8 @@ pub struct RelayerUpdate { #[serde(default)] pub max_inflight_txs: Option, #[serde(default)] + pub max_queued_txs: Option, + #[serde(default)] pub gas_price_limits: Option>, #[serde(default)] pub enabled: Option, @@ -66,6 +70,36 @@ pub struct RelayerGasPriceLimit { pub chain_id: i64, } +impl RelayerUpdate { + pub fn with_relayer_name(mut self, relayer_name: String) -> Self { + self.relayer_name = Some(relayer_name); + self + } + + pub fn with_max_inflight_txs(mut self, max_inflight_txs: u64) -> Self { + self.max_inflight_txs = Some(max_inflight_txs); + self + } + + pub fn with_max_queued_txs(mut self, max_queued_txs: u64) -> Self { + self.max_queued_txs = Some(max_queued_txs); + self + } + + pub fn with_gas_price_limits( + mut self, + gas_price_limits: Vec, + ) -> Self { + self.gas_price_limits = Some(gas_price_limits); + self + } + + pub fn with_enabled(mut self, enabled: bool) -> Self { + self.enabled = Some(enabled); + self + } +} + #[cfg(test)] mod tests { use ethers::types::{Address, U256}; @@ -83,6 +117,7 @@ mod tests { nonce: 0, current_nonce: 0, max_inflight_txs: 0, + max_queued_txs: 0, gas_price_limits: Json(vec![RelayerGasPriceLimit { value: U256Wrapper(U256::zero()), chain_id: 1, @@ -102,6 +137,7 @@ mod tests { "nonce": 0, "currentNonce": 0, "maxInflightTxs": 0, + "maxQueuedTxs": 0, "gasPriceLimits": [ { "value": "0x0", diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 3d5f121..c514749 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,12 +1,13 @@ #![allow(dead_code)] // Needed because this module is imported as module by many test crates use std::sync::Arc; +use std::time::Duration; use ethers::core::k256::ecdsa::SigningKey; use ethers::middleware::SignerMiddleware; use ethers::providers::{Http, Middleware, Provider}; use ethers::signers::{LocalWallet, Signer}; -use ethers::types::{Address, H160}; +use ethers::types::{Address, H160, U256}; use postgres_docker_utils::DockerContainerGuard; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; @@ -104,3 +105,23 @@ pub async fn setup_provider( Ok(provider) } + +pub async fn await_balance( + provider: &Provider, + expected_balance: U256, + address: Address, +) -> eyre::Result<()> { + for _ in 0..50 { + let balance = provider.get_balance(address, None).await?; + + tracing::info!(?balance, ?expected_balance, "Checking balance"); + + if balance >= expected_balance { + return Ok(()); + } else { + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + eyre::bail!("Balance did not reach expected value"); +} diff --git a/tests/disabled_relayer.rs b/tests/disabled_relayer.rs new file mode 100644 index 0000000..20a1953 --- /dev/null +++ b/tests/disabled_relayer.rs @@ -0,0 +1,53 @@ +mod common; + +use tx_sitter::types::RelayerUpdate; + +use crate::common::prelude::*; + +#[tokio::test] +async fn disabled_relayer() -> eyre::Result<()> { + setup_tracing(); + + let (db_url, _db_container) = setup_db().await?; + let anvil = AnvilBuilder::default().spawn().await?; + + let (_service, client) = + ServiceBuilder::default().build(&anvil, &db_url).await?; + + tracing::info!("Creating relayer"); + let CreateRelayerResponse { relayer_id, .. } = client + .create_relayer(&CreateRelayerRequest { + name: "Test relayer".to_string(), + chain_id: DEFAULT_ANVIL_CHAIN_ID, + }) + .await?; + + tracing::info!("Creating API key"); + let CreateApiKeyResponse { api_key } = + client.create_relayer_api_key(&relayer_id).await?; + + tracing::info!("Disabling relayer"); + client + .update_relayer( + &relayer_id, + RelayerUpdate::default().with_enabled(false), + ) + .await?; + + let value: U256 = parse_units("1", "ether")?.into(); + let response = client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + gas_limit: U256::from(21_000), + ..Default::default() + }, + ) + .await; + + assert!(response.is_err()); + + Ok(()) +} diff --git a/tests/send_many_txs.rs b/tests/send_many_txs.rs index 4325423..bb273f6 100644 --- a/tests/send_many_txs.rs +++ b/tests/send_many_txs.rs @@ -47,17 +47,7 @@ async fn send_many_txs() -> eyre::Result<()> { } let expected_balance = value * num_transfers; - for _ in 0..50 { - let balance = provider.get_balance(ARBITRARY_ADDRESS, None).await?; + await_balance(&provider, expected_balance, ARBITRARY_ADDRESS).await?; - tracing::info!(?balance, ?expected_balance, "Checking balance"); - - if balance == expected_balance { - return Ok(()); - } else { - tokio::time::sleep(Duration::from_secs(5)).await; - } - } - - panic!("Transactions were not sent") + Ok(()) } diff --git a/tests/send_too_many_txs.rs b/tests/send_too_many_txs.rs new file mode 100644 index 0000000..226cc43 --- /dev/null +++ b/tests/send_too_many_txs.rs @@ -0,0 +1,117 @@ +mod common; + +use tx_sitter::client::ClientError; +use tx_sitter::server::ApiError; +use tx_sitter::types::{RelayerUpdate, TransactionPriority}; + +use crate::common::prelude::*; + +const MAX_QUEUED_TXS: usize = 20; + +#[tokio::test] +async fn send_too_many_txs() -> eyre::Result<()> { + setup_tracing(); + + let (db_url, _db_container) = setup_db().await?; + let anvil = AnvilBuilder::default().spawn().await?; + + let (_service, client) = + ServiceBuilder::default().build(&anvil, &db_url).await?; + + let CreateApiKeyResponse { api_key } = + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; + + let CreateRelayerResponse { + relayer_id: secondary_relayer_id, + address: secondary_relayer_address, + } = client + .create_relayer(&CreateRelayerRequest { + name: "Secondary Relayer".to_string(), + chain_id: DEFAULT_ANVIL_CHAIN_ID, + }) + .await?; + + let CreateApiKeyResponse { + api_key: secondary_api_key, + } = client.create_relayer_api_key(&secondary_relayer_id).await?; + + // Set max queued txs + client + .update_relayer( + &secondary_relayer_id, + RelayerUpdate::default().with_max_queued_txs(MAX_QUEUED_TXS as u64), + ) + .await?; + + let provider = setup_provider(anvil.endpoint()).await?; + + // Send a transaction + let value: U256 = parse_units("0.01", "ether")?.into(); + + for _ in 0..=MAX_QUEUED_TXS { + client + .send_tx( + &secondary_api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + data: None, + gas_limit: U256::from(21_000), + priority: TransactionPriority::Regular, + tx_id: None, + }, + ) + .await?; + } + + // Sending one more tx should fail + let result = client + .send_tx( + &secondary_api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + data: None, + gas_limit: U256::from(21_000), + priority: TransactionPriority::Regular, + tx_id: None, + }, + ) + .await; + + assert!( + matches!( + result, + Err(ClientError::TxSitter(ApiError::TooManyTransactions { .. })) + ), + "Result {:?} should be too many transactions", + result + ); + + // Accumulate total value + gas budget + let send_value = value * (MAX_QUEUED_TXS + 1); + let total_required_value = send_value + parse_units("1", "ether")?; + + client + .send_tx( + &api_key, + &SendTxRequest { + to: secondary_relayer_address, + value: total_required_value, + data: None, + gas_limit: U256::from(21_000), + priority: TransactionPriority::Regular, + tx_id: None, + }, + ) + .await?; + + tracing::info!("Waiting for secondary relayer balance"); + await_balance(&provider, total_required_value, secondary_relayer_address) + .await?; + + tracing::info!("Waiting for queued up txs to be processed"); + await_balance(&provider, send_value, ARBITRARY_ADDRESS).await?; + + Ok(()) +}