diff --git a/.changelog/unreleased/improvements/1658-drop-rouille.md b/.changelog/unreleased/improvements/1658-drop-rouille.md new file mode 100644 index 0000000000..45c7ba4274 --- /dev/null +++ b/.changelog/unreleased/improvements/1658-drop-rouille.md @@ -0,0 +1,2 @@ +- Switch away from `rouille` to `axum` in telemetry and REST servers + ([\#1658](https://github.com/informalsystems/hermes/issues/1658)) \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b2f6f40829..3cc058070b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,12 +56,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "aho-corasick" version = "0.7.20" @@ -71,21 +65,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "alloc-no-stdlib" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" - -[[package]] -name = "alloc-stdlib" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" -dependencies = [ - "alloc-no-stdlib", -] - [[package]] name = "ammonia" version = "3.3.0" @@ -120,12 +99,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" -[[package]] -name = "ascii" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" - [[package]] name = "async-stream" version = "0.3.4" @@ -214,7 +187,11 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-layer", "tower-service", @@ -345,27 +322,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "brotli" -version = "3.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor", -] - -[[package]] -name = "brotli-decompressor" -version = "2.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", -] - [[package]] name = "bs58" version = "0.4.0" @@ -384,16 +340,6 @@ dependencies = [ "serde", ] -[[package]] -name = "buf_redux" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" -dependencies = [ - "memchr", - "safemem", -] - [[package]] name = "bumpalo" version = "3.12.0" @@ -500,12 +446,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "chunked_transfer" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cca491388666e04d7248af3f60f0c40cfb0991c72205595d7c396e3510207d1a" - [[package]] name = "clap" version = "3.2.23" @@ -679,15 +619,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc32fast" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "crossbeam-channel" version = "0.4.4" @@ -861,16 +792,6 @@ dependencies = [ "parking_lot_core", ] -[[package]] -name = "deflate" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86f7e25f518f4b81808a2cf1c50996a61f5c2eb394b2393bd87f2a4780a432f" -dependencies = [ - "adler32", - "gzip-header", -] - [[package]] name = "der" version = "0.6.1" @@ -1206,16 +1127,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "flate2" -version = "1.0.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "flex-error" version = "0.4.4" @@ -1442,15 +1353,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "gzip-header" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cc527b92e6029a62960ad99aa8a6660faa4555fe5f731aab13aa6a921795a2" -dependencies = [ - "crc32fast", -] - [[package]] name = "h2" version = "0.3.16" @@ -1913,15 +1815,15 @@ dependencies = [ name = "ibc-relayer-rest" version = "0.23.0" dependencies = [ + "axum", "crossbeam-channel 0.5.7", "ibc-relayer", "ibc-relayer-types", - "rouille", + "reqwest", "serde", - "serde_json", + "tokio", "toml", "tracing", - "ureq", ] [[package]] @@ -1961,6 +1863,7 @@ dependencies = [ name = "ibc-telemetry" version = "0.23.0" dependencies = [ + "axum", "dashmap", "ibc-relayer-types", "moka", @@ -1968,10 +1871,10 @@ dependencies = [ "opentelemetry", "opentelemetry-prometheus", "prometheus", - "rouille", "serde", "serde_json", "tendermint", + "tokio", ] [[package]] @@ -2419,24 +2322,6 @@ dependencies = [ "uuid 1.3.0", ] -[[package]] -name = "multipart" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182" -dependencies = [ - "buf_redux", - "httparse", - "log", - "mime", - "mime_guess", - "quick-error", - "rand 0.8.5", - "safemem", - "tempfile", - "twoway", -] - [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -2546,15 +2431,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_threads" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" -dependencies = [ - "libc", -] - [[package]] name = "object" version = "0.30.3" @@ -3011,12 +2887,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quote" version = "1.0.26" @@ -3163,9 +3033,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "reqwest" -version = "0.11.15" +version = "0.11.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ba30cc2c0cd02af1222ed216ba659cdb2f879dfe3181852fe7c50b1d0005949" +checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254" dependencies = [ "base64 0.21.0", "bytes", @@ -3241,31 +3111,6 @@ dependencies = [ "digest 0.10.6", ] -[[package]] -name = "rouille" -version = "3.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f86e4c51a773f953f02bbab5fd049f004bfd384341d62da2a079aff812ab176" -dependencies = [ - "base64 0.13.1", - "brotli", - "chrono", - "deflate", - "filetime", - "multipart", - "num_cpus", - "percent-encoding", - "rand 0.8.5", - "serde", - "serde_derive", - "serde_json", - "sha1", - "threadpool", - "time", - "tiny_http", - "url", -] - [[package]] name = "rustc-demangle" version = "0.1.22" @@ -3441,12 +3286,6 @@ dependencies = [ "safe-regex-compiler", ] -[[package]] -name = "safemem" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" - [[package]] name = "same-file" version = "1.0.6" @@ -3638,6 +3477,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" +dependencies = [ + "serde", +] + [[package]] name = "serde_repr" version = "0.1.12" @@ -4229,23 +4077,12 @@ dependencies = [ "once_cell", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "time" version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" dependencies = [ - "libc", - "num_threads", "serde", "time-core", "time-macros", @@ -4294,18 +4131,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tiny_http" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" -dependencies = [ - "ascii", - "chunked_transfer", - "httpdate", - "log", -] - [[package]] name = "tinyvec" version = "1.6.0" @@ -4651,15 +4476,6 @@ dependencies = [ "webpki 0.22.0", ] -[[package]] -name = "twoway" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1" -dependencies = [ - "memchr", -] - [[package]] name = "typenum" version = "1.16.0" @@ -4738,22 +4554,6 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" -[[package]] -name = "ureq" -version = "2.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "338b31dd1314f68f3aabf3ed57ab922df95ffcd902476ca7ba3c4ce7b908c46d" -dependencies = [ - "base64 0.13.1", - "flate2", - "log", - "once_cell", - "rustls 0.20.8", - "url", - "webpki 0.22.0", - "webpki-roots 0.22.6", -] - [[package]] name = "url" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index 02e52f3290..b699e89ee8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ members = [ "tools/query-events", ] -[patch.crates-io] +# [patch.crates-io] # ibc-proto = { git = "https://github.com/cosmos/ibc-proto-rs.git", branch = "main" } # tendermint = { git = "https://github.com/informalsystems/tendermint-rs", branch = "main" } # tendermint-rpc = { git = "https://github.com/informalsystems/tendermint-rs", branch = "main" } diff --git a/crates/relayer-cli/src/commands/start.rs b/crates/relayer-cli/src/commands/start.rs index e15b593265..9bd49818e6 100644 --- a/crates/relayer-cli/src/commands/start.rs +++ b/crates/relayer-cli/src/commands/start.rs @@ -29,7 +29,12 @@ impl Runnable for StartCmd { fn run(&self) { let config = (*app_config()).clone(); - let supervisor_handle = make_supervisor::(config, self.full_scan) + let options = SupervisorOptions { + force_full_scan: self.full_scan, + health_check: true, + }; + + let supervisor_handle = make_supervisor::(config, options) .unwrap_or_else(|e| { Output::error(format!("Hermes failed to start, last error: {e}")).exit() }); @@ -103,18 +108,40 @@ fn register_signals(tx_cmd: Sender) -> Result<(), io::Error> { #[cfg(feature = "rest-server")] fn spawn_rest_server(config: &Config) -> Option { + use ibc_relayer::util::spawn_blocking; + let _span = tracing::error_span!("rest").entered(); let rest = config.rest.clone(); - if rest.enabled { - let rest_config = ibc_relayer_rest::Config::new(rest.host, rest.port); - let (_, rest_receiver) = ibc_relayer_rest::server::spawn(rest_config); - Some(rest_receiver) - } else { + if !rest.enabled { info!("REST server disabled"); - None + return None; } + + let (tx, rx) = crossbeam_channel::unbounded(); + + spawn_blocking(async move { + let result = ibc_relayer_rest::spawn((rest.host.as_str(), rest.port), tx); + + match result { + Ok(handle) => { + info!( + "REST service running, exposing REST API at http://{}:{}", + rest.host, rest.port + ); + + if let Err(e) = handle.await { + error!("REST service crashed with errror: {e}"); + } + } + Err(e) => { + error!("REST service failed to start: {e}"); + } + } + }); + + Some(rx) } #[cfg(not(feature = "rest-server"))] @@ -134,60 +161,56 @@ fn spawn_rest_server(config: &Config) -> Option { } #[cfg(feature = "telemetry")] -fn spawn_telemetry_server(config: &Config) -> Result<(), Box> { +fn spawn_telemetry_server(config: &Config) { + use ibc_relayer::util::spawn_blocking; + let _span = tracing::error_span!("telemetry").entered(); let state = ibc_telemetry::global(); - let telemetry = config.telemetry.clone(); - if telemetry.enabled { - match ibc_telemetry::spawn((telemetry.host, telemetry.port), state.clone()) { - Ok((addr, _)) => { - info!( - "telemetry service running, exposing metrics at http://{}/metrics", - addr - ); - } - Err(e) => { - error!("telemetry service failed to start: {}", e); - return Err(e); - } - } + + if !telemetry.enabled { + info!("telemetry disabled"); + return; } - Ok(()) + spawn_blocking(async move { + let result = ibc_telemetry::spawn((telemetry.host, telemetry.port), state.clone()); + + match result { + Ok((addr, handle)) => { + info!("telemetry service running, exposing metrics at http://{addr}/metrics"); + + if let Err(e) = handle.await { + error!("telemetry service crashed with errror: {e}"); + } + } + Err(e) => error!("telemetry service failed to start: {e}"), + } + }); } #[cfg(not(feature = "telemetry"))] -fn spawn_telemetry_server(config: &Config) -> Result<(), Box> { +fn spawn_telemetry_server(config: &Config) { if config.telemetry.enabled { warn!( "telemetry enabled in the config but Hermes was built without telemetry support, \ build Hermes with --features=telemetry to enable telemetry support." ); } - - Ok(()) } fn make_supervisor( config: Config, - force_full_scan: bool, + options: SupervisorOptions, ) -> Result> { let registry = SharedRegistry::::new(config.clone()); - spawn_telemetry_server(&config)?; - let rest = spawn_rest_server(&config); + spawn_telemetry_server(&config); - Ok(spawn_supervisor( - config, - registry, - rest, - SupervisorOptions { - health_check: true, - force_full_scan, - }, - )?) + let rest_rx = spawn_rest_server(&config); + + Ok(spawn_supervisor(config, registry, rest_rx, options)?) } #[cfg(test)] diff --git a/crates/relayer-rest/Cargo.toml b/crates/relayer-rest/Cargo.toml index 999a082beb..1be9d38dd2 100644 --- a/crates/relayer-rest/Cargo.toml +++ b/crates/relayer-rest/Cargo.toml @@ -18,11 +18,11 @@ ibc-relayer-types = { version = "0.23.0", path = "../relayer-types" } ibc-relayer = { version = "0.23.0", path = "../relayer" } crossbeam-channel = "0.5" -rouille = "3.6" serde = "1.0" tracing = "0.1" +axum = "0.6" +tokio = "1.26" [dev-dependencies] -serde_json = "1.0.94" +reqwest = { version = "0.11.16", features = ["json"], default-features = false } toml = "0.5.10" -ureq = "2.6.2" diff --git a/crates/relayer-rest/src/config.rs b/crates/relayer-rest/src/config.rs deleted file mode 100644 index f42bc68168..0000000000 --- a/crates/relayer-rest/src/config.rs +++ /dev/null @@ -1,24 +0,0 @@ -use core::fmt::{Display, Error as FmtError, Formatter}; - -/// REST server configuration -#[derive(Clone, Debug)] -pub struct Config { - pub host: String, - pub port: u16, -} - -impl Config { - pub fn new(host: String, port: u16) -> Self { - Self { host, port } - } - - pub fn address(&self) -> (&str, u16) { - (&self.host, self.port) - } -} - -impl Display for Config { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - write!(f, "{}:{}", self.host, self.port) - } -} diff --git a/crates/relayer-rest/src/lib.rs b/crates/relayer-rest/src/lib.rs index 58787b8285..de12c6a707 100644 --- a/crates/relayer-rest/src/lib.rs +++ b/crates/relayer-rest/src/lib.rs @@ -1,9 +1,3 @@ -#[macro_use] -extern crate rouille; - -mod config; -pub use config::Config; - -pub mod server; - -pub(crate) mod handle; +mod handle; +mod server; +pub use server::spawn; diff --git a/crates/relayer-rest/src/server.rs b/crates/relayer-rest/src/server.rs index 89ac24199a..37fb2c6a9f 100644 --- a/crates/relayer-rest/src/server.rs +++ b/crates/relayer-rest/src/server.rs @@ -1,38 +1,29 @@ -use std::thread; +use std::{ + error::Error, + net::{SocketAddr, ToSocketAddrs}, +}; +use axum::{extract::Path, response::IntoResponse, routing::get, Extension, Json, Router, Server}; use crossbeam_channel as channel; use serde::{Deserialize, Serialize}; -use tracing::{info, trace}; - -use ibc_relayer::rest::request::Request; +use tokio::task::JoinHandle; -use crate::{ - handle::{all_chain_ids, assemble_version_info, chain_config, supervisor_state}, - Config, +use ibc_relayer::{ + rest::{request::Request, RestApiError}, + supervisor::dump_state::SupervisorState, }; -pub struct ServerHandle { - join_handle: thread::JoinHandle<()>, - tx_stop: std::sync::mpsc::Sender<()>, -} - -impl ServerHandle { - pub fn join(self) -> std::thread::Result<()> { - self.join_handle.join() - } - - pub fn stop(&self) { - self.tx_stop.send(()).unwrap(); - } -} - -pub fn spawn(config: Config) -> (ServerHandle, channel::Receiver) { - let (req_tx, req_rx) = channel::unbounded::(); +use crate::handle::{all_chain_ids, assemble_version_info, chain_config, supervisor_state}; - info!("starting REST API server listening at http://{}", config); - let handle = run(config, req_tx); +pub type BoxError = Box; - (handle, req_rx) +pub fn spawn( + addr: impl ToSocketAddrs, + sender: channel::Sender, +) -> Result, BoxError> { + let addr = addr.to_socket_addrs()?.next().unwrap(); + let handle = tokio::spawn(run(addr, sender)); + Ok(handle) } #[derive(Debug, Serialize, Deserialize)] @@ -52,45 +43,43 @@ impl From> for JsonResult { } } -#[allow(clippy::manual_strip)] -fn run(config: Config, sender: channel::Sender) -> ServerHandle { - let server = rouille::Server::new(config.address(), move |request| { - router!(request, - (GET) (/version) => { - trace!("[rest/server] GET /version"); - let result = assemble_version_info(&sender); - rouille::Response::json(&result) - }, +async fn get_version(Extension(sender): Extension) -> impl IntoResponse { + let version: Result<_, RestApiError> = Ok(assemble_version_info(&sender)); + Json(JsonResult::from(version)) +} - (GET) (/chains) => { - // TODO(Soares): Add a `into_detail` to consume the error and obtain - // the underlying detail, so that we avoid doing `e.0` - trace!("[rest] GET /chains"); - let result = all_chain_ids(&sender); - rouille::Response::json(&JsonResult::from(result)) - }, +async fn get_chains(Extension(sender): Extension) -> impl IntoResponse { + let chain_ids = all_chain_ids(&sender); + Json(JsonResult::from(chain_ids)) +} - (GET) (/chain/{id: String}) => { - trace!("[rest] GET /chain/{}", id); - let result = chain_config(&sender, &id); - rouille::Response::json(&JsonResult::from(result)) - }, +async fn get_chain( + Path(id): Path, + Extension(sender): Extension, +) -> impl IntoResponse { + let chain = chain_config(&sender, &id); + Json(JsonResult::from(chain)) +} - (GET) (/state) => { - trace!("[rest] GET /state"); - let result = supervisor_state(&sender); - rouille::Response::json(&JsonResult::from(result)) - }, +async fn get_state( + Extension(sender): Extension, +) -> Json> { + let state = supervisor_state(&sender); + Json(JsonResult::from(state)) +} - _ => rouille::Response::empty_404(), - ) - }) - .unwrap(); +type Sender = channel::Sender; - let (join_handle, tx_stop) = server.stoppable(); +async fn run(addr: SocketAddr, sender: Sender) { + let app = Router::new() + .route("/version", get(get_version)) + .route("/chains", get(get_chains)) + .route("/chain/:id", get(get_chain)) + .route("/state", get(get_state)) + .layer(Extension(sender)); - ServerHandle { - join_handle, - tx_stop, - } + Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); } diff --git a/crates/relayer-rest/tests/mock.rs b/crates/relayer-rest/tests/mock.rs index c18fceacd7..084cf76ca1 100644 --- a/crates/relayer-rest/tests/mock.rs +++ b/crates/relayer-rest/tests/mock.rs @@ -1,6 +1,6 @@ -use std::str::FromStr; +use std::{fmt::Debug, str::FromStr, time::Duration}; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use ibc_relayer::{ config::ChainConfig, @@ -9,14 +9,14 @@ use ibc_relayer::{ }; use ibc_relayer_types::core::ics24_host::identifier::ChainId; -use ibc_relayer_rest::{server::spawn, Config}; +use ibc_relayer_rest::spawn; enum TestResult { Success, WrongRequest(Request), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "status", content = "result")] #[serde(rename_all = "lowercase")] enum JsonResult { @@ -24,14 +24,14 @@ enum JsonResult { Error(E), } -fn run_test(port: u16, path: &str, expected: R, handler: F) +async fn run_test(port: u16, path: &str, expected: R, handler: F) where - R: Serialize, + R: Serialize + DeserializeOwned + Debug + PartialEq, F: FnOnce(Request) -> TestResult + Send + 'static, { - let config = Config::new("127.0.0.1".to_string(), port); + let (tx, rx) = crossbeam_channel::unbounded(); - let (handle, rx) = spawn(config); + let handle = spawn(("127.0.0.1", port), tx).unwrap(); std::thread::spawn(move || match rx.recv() { Ok(r) => match handler(r) { @@ -41,21 +41,22 @@ where Err(e) => panic!("got an error: {e}"), }); - let response = ureq::get(&format!("http://127.0.0.1:{port}{path}")) - .call() + tokio::time::sleep(Duration::from_millis(500)).await; + + let response = reqwest::get(&format!("http://127.0.0.1:{port}{path}")) + .await .unwrap() - .into_string() + .json::() + .await .unwrap(); - let expected_json = serde_json::to_string(&expected).unwrap(); - assert_eq!(response, expected_json); + assert_eq!(response, expected); - handle.stop(); - handle.join().unwrap(); + drop(handle); } -#[test] -fn version() { +#[tokio::test] +async fn version() { let version = VersionInfo { name: "mock".to_string(), version: "0.0.0".to_string(), @@ -66,7 +67,7 @@ fn version() { version: "0.23.0".to_string(), }; - let result = vec![version.clone(), rest_api_version]; + let result: JsonResult<_, ()> = JsonResult::Success(vec![version.clone(), rest_api_version]); run_test(19101, "/version", result, |req| match req { Request::Version { reply_to } => { @@ -75,10 +76,11 @@ fn version() { } req => TestResult::WrongRequest(req), }) + .await } -#[test] -fn get_chains() { +#[tokio::test] +async fn get_chains() { let chain_id = ChainId::from_str("mock-0").unwrap(); let result: JsonResult<_, ()> = JsonResult::Success(vec![chain_id.clone()]); @@ -88,7 +90,8 @@ fn get_chains() { TestResult::Success } req => TestResult::WrongRequest(req), - }); + }) + .await; } const MOCK_CHAIN_CONFIG: &str = r#" @@ -110,8 +113,8 @@ trusting_period = '14days' trust_threshold = { numerator = '1', denominator = '3' } "#; -#[test] -fn get_chain() { +#[tokio::test] +async fn get_chain() { let config: ChainConfig = toml::de::from_str(MOCK_CHAIN_CONFIG).unwrap(); let result: JsonResult<_, ()> = JsonResult::Success(config.clone()); @@ -121,11 +124,12 @@ fn get_chain() { TestResult::Success } req => TestResult::WrongRequest(req), - }); + }) + .await; } -#[test] -fn state() { +#[tokio::test] +async fn state() { let state = SupervisorState::new(vec!["mock-0".parse().unwrap()], std::iter::empty()); let result: JsonResult<_, ()> = JsonResult::Success(state.clone()); @@ -135,5 +139,6 @@ fn state() { TestResult::Success } req => TestResult::WrongRequest(req), - }); + }) + .await; } diff --git a/crates/relayer/src/chain.rs b/crates/relayer/src/chain.rs index 2f57a662b6..958a3922d1 100644 --- a/crates/relayer/src/chain.rs +++ b/crates/relayer/src/chain.rs @@ -13,7 +13,7 @@ use serde::{de::Error, Deserialize, Serialize}; // the `Deserialize` implementation below and the tests. // See the NOTE(new) comments below. -#[derive(Copy, Clone, Debug, Serialize)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize)] /// Types of chains the relayer can relay to and from pub enum ChainType { /// Chains based on the Cosmos SDK diff --git a/crates/relayer/src/config.rs b/crates/relayer/src/config.rs index ed6146cda9..baf7e8f792 100644 --- a/crates/relayer/src/config.rs +++ b/crates/relayer/src/config.rs @@ -421,7 +421,7 @@ impl Display for AddressType { } } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct ChainConfig { pub id: ChainId, diff --git a/crates/relayer/src/config/filter.rs b/crates/relayer/src/config/filter.rs index a0a2ec0aa6..9733671b0c 100644 --- a/crates/relayer/src/config/filter.rs +++ b/crates/relayer/src/config/filter.rs @@ -13,7 +13,7 @@ use ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; use ibc_relayer_types::events::IbcEventType; /// Represents all the filtering policies for packets. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct PacketFilter { #[serde(flatten)] pub channel_policy: ChannelPolicy, @@ -51,7 +51,7 @@ impl PacketFilter { } /// Represents the ways in which packets can be filtered. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde( rename_all = "lowercase", tag = "policy", @@ -69,7 +69,7 @@ pub enum ChannelPolicy { /// Represents the policy used to filter incentivized packets. /// Currently only filtering on `recv_fee` is authorized. -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct FeePolicy { recv: Vec, } @@ -91,7 +91,7 @@ impl FeePolicy { /// Represents the minimum fee authorized when filtering. /// If no denom is specified, any denom is allowed. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct MinFee { amount: u64, denom: Option, @@ -130,7 +130,7 @@ impl ChannelPolicy { } /// The internal representation of channel filter policies. -#[derive(Clone, Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize)] #[serde(deny_unknown_fields)] pub struct ChannelFilters(Vec<(PortFilterMatch, ChannelFilterMatch)>); @@ -289,7 +289,7 @@ impl Hash for Wildcard { } /// Represents a single channel to be filtered in a [`ChannelFilters`] list. -#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum FilterPattern { /// A channel specified exactly with its [`PortId`] & [`ChannelId`]. Exact(T), diff --git a/crates/relayer/src/config/gas_multiplier.rs b/crates/relayer/src/config/gas_multiplier.rs index 3ae22076bc..4359e14af9 100644 --- a/crates/relayer/src/config/gas_multiplier.rs +++ b/crates/relayer/src/config/gas_multiplier.rs @@ -12,7 +12,7 @@ flex_error::define_error! { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] pub struct GasMultiplier(f64); impl GasMultiplier { diff --git a/crates/relayer/src/config/types.rs b/crates/relayer/src/config/types.rs index b6c65f951b..def6765f0a 100644 --- a/crates/relayer/src/config/types.rs +++ b/crates/relayer/src/config/types.rs @@ -24,7 +24,7 @@ pub mod max_msg_num { } } - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct MaxMsgNum(usize); impl MaxMsgNum { @@ -108,7 +108,7 @@ pub mod max_tx_size { } } - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct MaxTxSize(usize); impl MaxTxSize { @@ -193,7 +193,7 @@ pub mod memo { /// each transaction it submits. /// The memo can be configured on a per-chain basis. /// - #[derive(Clone, Debug, Default)] + #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct Memo(String); impl Memo { diff --git a/crates/relayer/src/rest/request.rs b/crates/relayer/src/rest/request.rs index d8c3413572..a789931f9c 100644 --- a/crates/relayer/src/rest/request.rs +++ b/crates/relayer/src/rest/request.rs @@ -1,4 +1,4 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; use ibc_relayer_types::core::ics24_host::identifier::ChainId; @@ -11,7 +11,7 @@ pub fn reply_channel() -> (ReplySender, ReplyReceiver) { crossbeam_channel::bounded(1) } -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] pub struct VersionInfo { pub name: String, pub version: String, diff --git a/crates/relayer/src/supervisor/dump_state.rs b/crates/relayer/src/supervisor/dump_state.rs index 82daea673b..96100d69bd 100644 --- a/crates/relayer/src/supervisor/dump_state.rs +++ b/crates/relayer/src/supervisor/dump_state.rs @@ -11,7 +11,7 @@ use crate::{ worker::{WorkerData, WorkerHandle, WorkerId}, }; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct WorkerDesc { pub id: WorkerId, pub object: Object, @@ -24,7 +24,7 @@ impl WorkerDesc { } } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct SupervisorState { pub chains: Vec, pub workers: BTreeMap>, diff --git a/crates/relayer/src/util.rs b/crates/relayer/src/util.rs index d46f37ca95..e33fe52b57 100644 --- a/crates/relayer/src/util.rs +++ b/crates/relayer/src/util.rs @@ -1,5 +1,5 @@ mod block_on; -pub use block_on::block_on; +pub use block_on::{block_on, spawn_blocking}; pub mod collate; pub mod diff; diff --git a/crates/relayer/src/util/block_on.rs b/crates/relayer/src/util/block_on.rs index 93459ece34..3c4e1aa963 100644 --- a/crates/relayer/src/util/block_on.rs +++ b/crates/relayer/src/util/block_on.rs @@ -1,5 +1,7 @@ //! Utility function to execute a future synchronously +use std::thread::JoinHandle; + use futures::Future; /// Spawns a new tokio runtime and use it to block on the given future. @@ -10,3 +12,18 @@ pub fn block_on(future: F) -> F::Output { .unwrap() .block_on(future) } + +/// Spawns a new tokio runtime in a new thread and use it to block on the given future. +pub fn spawn_blocking(future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send, +{ + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(future) + }) +} diff --git a/crates/relayer/src/worker/handle.rs b/crates/relayer/src/worker/handle.rs index 7927b14c66..0f01787795 100644 --- a/crates/relayer/src/worker/handle.rs +++ b/crates/relayer/src/worker/handle.rs @@ -19,7 +19,7 @@ use crate::{event::monitor::EventBatch, object::Object}; use super::{WorkerCmd, WorkerId}; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "type")] pub enum WorkerData { Client { misbehaviour: bool, refresh: bool }, diff --git a/crates/telemetry/Cargo.toml b/crates/telemetry/Cargo.toml index 6b7fc9ffdc..79da2a4440 100644 --- a/crates/telemetry/Cargo.toml +++ b/crates/telemetry/Cargo.toml @@ -19,11 +19,12 @@ once_cell = "1.17.0" opentelemetry = { version = "0.18.0", features = ["metrics"] } opentelemetry-prometheus = "0.11.0" prometheus = "0.13.2" -rouille = "3.6.1" moka = "0.10.0" dashmap = "5.4.0" -serde_json = "1.0.94" -serde = "1.0.149" +serde_json = "1.0.94" +serde = "1.0.149" +axum = "0.6.12" +tokio = "1.26.0" [dependencies.tendermint] version = "0.30.0" diff --git a/crates/telemetry/src/lib.rs b/crates/telemetry/src/lib.rs index 54771565d0..40fa56bfce 100644 --- a/crates/telemetry/src/lib.rs +++ b/crates/telemetry/src/lib.rs @@ -1,17 +1,14 @@ -extern crate alloc; - pub mod encoder; mod path_identifier; pub mod server; pub mod state; -use alloc::sync::Arc; +use std::error::Error; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::sync::Arc; + use once_cell::sync::Lazy; -use std::{ - error::Error, - net::{SocketAddr, ToSocketAddrs}, - thread::JoinHandle, -}; +use tokio::task::JoinHandle; pub use crate::state::TelemetryState; @@ -25,22 +22,17 @@ pub fn global() -> &'static Arc { &GLOBAL_STATE } +pub type BoxError = Box; + pub fn spawn( - address: A, + addr: A, state: Arc, -) -> Result<(SocketAddr, JoinHandle<()>), Box> +) -> Result<(SocketAddr, JoinHandle>), BoxError> where A: ToSocketAddrs + Send + 'static, { - let server = server::listen(address, state); - - match server { - Ok(server) => { - let address = server.server_addr(); - let handle = std::thread::spawn(move || server.run()); + let addr = addr.to_socket_addrs()?.next().unwrap(); + let handle = tokio::spawn(server::listen(addr, state)); - Ok((address, handle)) - } - Err(e) => Err(e), - } + Ok((addr, handle)) } diff --git a/crates/telemetry/src/server.rs b/crates/telemetry/src/server.rs index bb7a3d83c1..f6ec19bccb 100644 --- a/crates/telemetry/src/server.rs +++ b/crates/telemetry/src/server.rs @@ -1,78 +1,68 @@ -use alloc::sync::Arc; use std::error::Error; -use std::net::ToSocketAddrs; +use std::net::SocketAddr; +use std::sync::Arc; + +use axum::extract::Query; +use axum::response::IntoResponse; +use axum::routing::get; +use axum::{Extension, Router}; use prometheus::{Encoder, TextEncoder}; -use rouille::{Request, Response, Server}; use crate::encoder::JsonEncoder; use crate::state::TelemetryState; +#[derive(Copy, Clone, Debug, Default, serde::Deserialize)] enum Format { + #[serde(rename = "text")] + #[default] Text, + + #[serde(rename = "json")] Json, } -enum Route { - Metrics(Format), - Other, +#[derive(Copy, Clone, Debug, serde::Deserialize)] +struct Metrics { + format: Option, } -impl Route { - fn from_request(request: &Request) -> Route { - if request.url() == "/metrics" { - let format = request - .get_param("format") - .and_then(|f| match f.as_str() { - "json" => Some(Format::Json), - "text" => Some(Format::Text), - _ => None, - }) - .unwrap_or(Format::Text); - - Route::Metrics(format) - } else { - Route::Other - } - } -} +pub async fn listen( + addr: SocketAddr, + state: Arc, +) -> Result<(), Box> { + let app = Router::new() + .route("/metrics", get(get_metrics)) + .layer(Extension(state)); -pub fn listen( - address: impl ToSocketAddrs, - telemetry_state: Arc, -) -> Result Response>, Box> { - let server = Server::new(address, move |request| { - match Route::from_request(request) { - // The prometheus endpoint - Route::Metrics(format) => { - let mut buffer = vec![]; + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await?; - match format { - Format::Json => { - let encoder = JsonEncoder::new(); - encoder - .encode(&telemetry_state.gather(), &mut buffer) - .unwrap(); - - rouille::Response::from_data(encoder.format_type().to_string(), buffer) - } - - Format::Text => { - let encoder = TextEncoder::new(); - encoder - .encode(&telemetry_state.gather(), &mut buffer) - .unwrap(); + Ok(()) +} - rouille::Response::from_data(encoder.format_type().to_string(), buffer) - } - } - } +async fn get_metrics( + Extension(state): Extension>, + Query(query): Query, +) -> impl IntoResponse { + match query.format.unwrap_or_default() { + Format::Text => { + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + encoder.encode(&state.gather(), &mut buffer).unwrap(); - // Any other route - // Return an empty response with a 404 status code. - Route::Other => rouille::Response::empty_404(), + ([("content-type", "text/plain; charset=utf-8")], buffer) } - })?; + Format::Json => { + let encoder = JsonEncoder::new(); + let mut buffer = Vec::new(); + encoder.encode(&state.gather(), &mut buffer).unwrap(); - Ok(server) + ( + [("content-type", "application/javascript; charset=utf-8")], + buffer, + ) + } + } }