diff --git a/Cargo.lock b/Cargo.lock index c35db3adc..f515b5c70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -810,6 +810,7 @@ dependencies = [ "divan", "futures", "hex", + "humantime", "ibc-proto", "ibc-types", "indexmap 2.4.0", @@ -840,6 +841,7 @@ dependencies = [ "tower-actor", "tower-http", "tracing", + "tryhard", ] [[package]] diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index 334c3748e..b54531f54 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -52,6 +52,7 @@ bytes = { workspace = true } divan = { workspace = true, optional = true } futures = { workspace = true } hex = { workspace = true, features = ["serde"] } +humantime = { workspace = true } ibc-types = { workspace = true, features = ["with_serde"] } indexmap = { workspace = true } itertools = { workspace = true } @@ -71,6 +72,7 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["rt", "tracing"] } tonic = { workspace = true } tracing = { workspace = true } +tryhard = { workspace = true } [dev-dependencies] astria-core = { path = "../astria-core", features = [ diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index a9b9e0f76..42a29a637 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use astria_core::generated::{ connect::{ marketmap::v2::query_server::QueryServer as MarketMapQueryServer, @@ -12,6 +14,7 @@ use astria_core::generated::{ use astria_eyre::{ anyhow_to_eyre, eyre::{ + self, eyre, OptionExt as _, Result, @@ -37,6 +40,7 @@ use tokio::{ task::JoinHandle, }; use tonic::transport::{ + Channel, Endpoint, Uri, }; @@ -61,6 +65,8 @@ use crate::{ service, }; +const MAX_RETRIES_TO_CONNECT_TO_ORACLE_SIDECAR: u32 = 36; + pub struct Sequencer; impl Sequencer { @@ -117,32 +123,9 @@ impl Sequencer { .context("failed to query state for base prefix")?; } - let oracle_client = if config.no_oracle { - None - } else { - let uri: Uri = config - .oracle_grpc_addr - .parse() - .context("failed parsing oracle grpc address as Uri")?; - let endpoint = Endpoint::from(uri.clone()).timeout(std::time::Duration::from_millis( - config.oracle_client_timeout_milliseconds, - )); - let mut oracle_client = OracleClient::new( - endpoint - .connect() - .await - .wrap_err("failed to connect to oracle sidecar")?, - ); - - // ensure the oracle sidecar is reachable - // TODO: allow this to retry in case the oracle sidecar is not ready yet - if let Err(e) = oracle_client.prices(QueryPricesRequest::default()).await { - warn!(uri = %uri, error = %e, "oracle sidecar is unreachable"); - } else { - debug!(uri = %uri, "oracle sidecar is reachable"); - }; - Some(oracle_client) - }; + let oracle_client = new_oracle_client(&config) + .await + .wrap_err("failed to create connected oracle client")?; let mempool = Mempool::new(metrics, config.mempool_parked_max_tx_count); let app = App::new( @@ -303,3 +286,104 @@ fn spawn_signal_handler() -> SignalReceiver { stop_rx, } } + +/// Returns a new Connect oracle client or `Ok(None)` if `config.no_oracle` is true. +/// +/// If `config.no_oracle` is false, returns `Ok(Some(...))` as soon as a successful response is +/// received from the oracle sidecar, or returns `Err` after a fixed number of failed re-attempts +/// (roughly equivalent to 5 minutes total). +#[instrument(skip_all, err)] +async fn new_oracle_client(config: &Config) -> Result>> { + if config.no_oracle { + return Ok(None); + } + let uri: Uri = config + .oracle_grpc_addr + .parse() + .context("failed parsing oracle grpc address as Uri")?; + let endpoint = Endpoint::from(uri.clone()).timeout(Duration::from_millis( + config.oracle_client_timeout_milliseconds, + )); + + let retry_config = tryhard::RetryFutureConfig::new(MAX_RETRIES_TO_CONNECT_TO_ORACLE_SIDECAR) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(10)) + .on_retry( + |attempt, next_delay: Option, error: &eyre::Report| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + error = error.as_ref() as &dyn std::error::Error, + attempt, + wait_duration, + "failed to query oracle sidecar; retrying after backoff", + ); + async {} + }, + ); + + let client = tryhard::retry_fn(|| connect_to_oracle(&endpoint, &uri)) + .with_config(retry_config) + .await + .wrap_err_with(|| { + format!( + "failed to query oracle sidecar after {MAX_RETRIES_TO_CONNECT_TO_ORACLE_SIDECAR} \ + retries; giving up" + ) + })?; + Ok(Some(client)) +} + +#[instrument(skip_all, err(level = tracing::Level::WARN))] +async fn connect_to_oracle(endpoint: &Endpoint, uri: &Uri) -> Result> { + let mut oracle_client = OracleClient::new( + endpoint + .connect() + .await + .wrap_err("failed to connect to oracle sidecar")?, + ); + let _ = oracle_client + .prices(QueryPricesRequest::default()) + .await + .wrap_err("oracle sidecar responded with error to query for prices"); + debug!(uri = %uri, "oracle sidecar is reachable"); + Ok(oracle_client) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test(start_paused = true)] + async fn should_wait_while_unable_to_connect() { + // We only care about `no_oracle` and `oracle_grpc_addr` values - the others can be + // meaningless. + let config = Config { + listen_addr: String::new(), + db_filepath: "".into(), + log: String::new(), + grpc_addr: String::new(), + force_stdout: true, + no_otel: true, + no_metrics: true, + metrics_http_listener_addr: String::new(), + pretty_print: true, + no_oracle: false, + oracle_grpc_addr: "http://127.0.0.1:8081".to_string(), + oracle_client_timeout_milliseconds: 1, + mempool_parked_max_tx_count: 1, + }; + + let start = tokio::time::Instant::now(); + let error = new_oracle_client(&config).await.unwrap_err(); + assert!(start.elapsed() > Duration::from_secs(300)); + assert_eq!( + error.to_string(), + format!( + "failed to query oracle sidecar after {MAX_RETRIES_TO_CONNECT_TO_ORACLE_SIDECAR} \ + retries; giving up" + ) + ); + } +}