diff --git a/src/bins/rmb-relay.rs b/src/bins/rmb-relay.rs index 01cadab..f95bfb0 100644 --- a/src/bins/rmb-relay.rs +++ b/src/bins/rmb-relay.rs @@ -11,6 +11,7 @@ use rmb::relay::{ limiter::{FixedWindowOptions, Limiters}, }; use rmb::twin::SubstrateTwinDB; +use tokio::sync::oneshot; /// A peer requires only which rely to connect to, and /// which identity (mnemonics) @@ -92,7 +93,7 @@ fn set_limits() -> Result<()> { Ok(()) } -async fn app(args: Args) -> Result<()> { +async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> { if args.workers == 0 { anyhow::bail!("number of workers cannot be zero"); } @@ -172,10 +173,42 @@ async fn app(args: Args) -> Result<()> { let mut l = events::Listener::new(args.substrate, redis_cache).await?; tokio::spawn(async move { - l.listen() - .await - .context("failed to listen to chain events") - .unwrap(); + let max_retries = 9; // max wait is 2^9 = 512 seconds ( 5 minutes ) + let mut attempt = 0; + let mut backoff = Duration::from_secs(1); + let mut got_hit = false; + + loop { + match l + .listen(&mut got_hit) + .await + .context("failed to listen to chain events") + { + Ok(_) => break, + Err(e) => { + if got_hit { + log::warn!("Listener got a hit, but failed to listen to chain events before no attempts will be reset"); + got_hit = false; + attempt = 0; + backoff = Duration::from_secs(1); + } + attempt += 1; + if attempt > max_retries { + log::error!("Listener failed after {} attempts: {:?}", attempt - 1, e); + let _ = tx.send(()); + break; + } + log::warn!( + "Listener failed on attempt {}: {:?}. Retrying in {:?}...", + attempt, + e, + backoff + ); + tokio::time::sleep(backoff).await; + backoff *= 2; + } + } + } }); r.start(&args.listen).await.unwrap(); @@ -185,8 +218,21 @@ async fn app(args: Args) -> Result<()> { #[tokio::main] async fn main() { let args = Args::parse(); - if let Err(e) = app(args).await { - eprintln!("{:#}", e); - std::process::exit(1); + let (tx, rx) = oneshot::channel(); + let app_handle = tokio::spawn(async move { + if let Err(e) = app(args, tx).await { + eprintln!("{:#}", e); + std::process::exit(1); + } + }); + + tokio::select! { + _ = app_handle => { + log::info!("Application is closing successfully."); + } + _ = rx => { + log::error!("Listener shutdown signal received. Exiting application."); + std::process::exit(1); + } } } diff --git a/src/events/mod.rs b/src/events/mod.rs index b98540e..8a69542 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -60,7 +60,7 @@ where anyhow::bail!("failed to connect to substrate using the provided urls") } - pub async fn listen(&mut self) -> Result<()> { + pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> { loop { // always flush in case some blocks were finalized before reconnecting if let Err(err) = self.cache.flush().await { @@ -73,6 +73,8 @@ where if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { self.api = Self::connect(&mut self.substrate_urls).await?; } + } else { + *got_hit = true } } }